viri-leds-dmx-sacn/dmx_queue.py
2023-08-16 22:47:12 +02:00

162 lines
5.8 KiB
Python

import logging
from colorsys import hsv_to_rgb
from queue import Empty
from threading import Thread
from time import sleep
from sacn import sACNsender
LOG = logging.getLogger('DMXQueue')
class DMXQueue:
def __init__(self, config, queue, lights):
self.config = config
self.queue = queue
self.lights = lights
self.worker = Thread(target=self._worker)
self.worker_should_be_running = False
self.sacn = sACNsender(
fps=40,
)
def start(self):
self.sacn.start()
self.sacn.activate_output(self.config.sacn.universe)
self.sacn[self.config.sacn.universe].multicast = self.config.sacn.multicast
if not self.config.sacn.multicast:
self.sacn[self.config.sacn.universe].destination = self.config.sacn.target
self.dmx_data = 512 * [0]
self.worker_should_be_running = True
self.worker.start()
def stop(self):
LOG.info('Waiting for worker to terminate ...')
self.worker_should_be_running = False
self.worker.join()
self.sacn.stop()
def _dmx(self, addr, data):
self.dmx_data[addr - 1] = data
self.sacn[self.config.sacn.universe].dmx_data = tuple(self.dmx_data)
def _bulk(self, start_addr, values):
for idx, value in enumerate(values):
self._dmx(start_addr + idx, value)
def _update_all(self, intensity, red, green, blue, white):
for light in self.lights:
light.intensity = intensity
light.red = red
light.green = green
light.blue = blue
light.white = white
self._bulk(*light.dump())
def _worker(self):
LOG.info('Worker startup')
rotation = 0
while self.worker_should_be_running:
try:
level, component, text = self.queue.get_nowait()
LOG.info(f'Got queue item: {level} {component} : {text}')
# effect duration should be between 1s and 1.5s
if level == 'error':
self._update_all(0, 0, 0, 0, 0)
sleep(0.2)
# three instances of two flashes each
for i in range(3):
for j in range(2):
self._update_all(
self.config.alerts.brightness, 255, 0, 0, 50
)
sleep(0.1)
self._update_all(0, 255, 0, 0, 50)
sleep(0.1)
sleep(0.2)
elif level == 'warn':
self._update_all(0, 0, 0, 0, 0)
sleep(0.2)
# warning: blink alternate, but slow
for i in range(6):
for idx, light in enumerate(self.lights):
light.red = 255
light.green = 150
light.blue = 0
light.white = 50
if (idx + i) % 2:
light.intensity = self.config.alerts.brightness
else:
light.intensity = 0
self._bulk(*light.dump())
sleep(0.2)
self._update_all(0, 0, 0, 0, 0)
elif level == 'info':
forward = list(range(15))
reverse = list(range(15))
reverse.reverse()
if self.config.rainbow.enable:
diff = (
self.config.alerts.brightness
- self.config.rainbow.brightness
)
LOG.debug(diff)
if diff >= 50:
for idx in forward + reverse:
LOG.debug(idx)
LOG.debug(diff * idx)
self._update_all(
int(
self.config.rainbow.brightness
+ ((diff / len(forward)) * idx)
),
0,
50,
255,
50,
)
sleep(0.025)
self.queue.task_done()
except Empty:
if self.config.rainbow.enable:
degrees_per_step = 360 / len(self.lights)
for idx, light in enumerate(self.lights):
light_degrees_dec = (
(rotation + (idx * degrees_per_step)) % 360 / 360
)
r, g, b = hsv_to_rgb(
light_degrees_dec,
1,
self.config.rainbow.intensity / 100,
)
light.red = int(r * 255)
light.green = int(g * 255)
light.blue = int(b * 200)
light.white = 0
light.intensity = self.config.rainbow.brightness
self._bulk(*light.dump())
if self.config.rainbow.speed >= 25:
rotation = rotation + 1
if rotation >= 360:
rotation = 0
sleep(self.config.rainbow.speed / 1000)
else:
sleep(0.2)
else:
sleep(0.2)
LOG.info('Worker shutdown')