viri-leds-dmx-sacn/dmx_queue.py

181 lines
6.5 KiB
Python

import logging
from queue import Empty
from threading import Thread
from time import sleep
from sacn import sACNsender
import lights
class DMXQueue:
def __init__(self, config, universe, queue):
self.log = logging.getLogger(f"DMXQueue {universe}")
self.config = config.universes[universe]
self.queue = queue
self.lights = []
for classname, addrs in self.config.lights.items():
cls = getattr(lights, classname)
for addr in addrs:
self.lights.append(cls(addr))
self.worker = Thread(target=self._worker)
self.worker_should_be_running = False
self.sacn = sACNsender(
fps=40,
)
def start(self):
self.sacn.start()
self.sacn.activate_output(self.config.universe)
self.sacn[self.config.universe].multicast = self.config.multicast
if not self.config.multicast:
self.sacn[self.config.universe].destination = self.config.target
self.dmx_data = 512 * [0]
self.worker_should_be_running = True
self.worker.start()
def stop(self):
self.log.info("Waiting for worker to terminate ...")
self.worker_should_be_running = False
self.worker.join()
self.sacn.stop()
def _dmx(self, addr, data):
self.dmx_data[addr - 1] = data
self.sacn[self.config.universe].dmx_data = tuple(self.dmx_data)
def _bulk(self, start_addr, values):
for idx, value in enumerate(values):
self._dmx(start_addr + idx, value)
def _update_all(self, intensity, red, green, blue, white):
for light in self.lights:
light.intensity = intensity
light.red = red
light.green = green
light.blue = blue
light.white = white
self._bulk(*light.dump())
def _worker(self):
self.log.info("Worker startup")
rotation = 0
while self.worker_should_be_running:
try:
level, component, text = self.queue.get_nowait()
if self.config.filters:
filtered = True
for f in self.config.filters:
if re.search(f, component, re.IGNORECASE):
filtered = False
break # no point in searching further
if filtered:
# no alert for filtered messages
continue
self.log.info(f"Got queue item: {level} {component} : {text}")
# effect duration should be between 1s and 1.5s
if level == "error":
self._update_all(0, 0, 0, 0, 0)
sleep(0.2)
# three instances of two flashes each
for i in range(3):
for j in range(2):
self._update_all(
self.config.alert_brightness, 255, 0, 0, 50
)
sleep(0.1)
self._update_all(0, 255, 0, 0, 50)
sleep(0.1)
sleep(0.2)
elif level == "warn":
self._update_all(0, 0, 0, 0, 0)
sleep(0.2)
# warning: blink alternate, but slow
for i in range(6):
for idx, light in enumerate(self.lights):
light.red = 255
light.green = 150
light.blue = 0
light.white = 50
if (idx + i) % 2:
light.intensity = self.config.alert_brightness
else:
light.intensity = 0
self._bulk(*light.dump())
sleep(0.2)
self._update_all(0, 0, 0, 0, 0)
sleep(0.2)
elif level == "info":
forward = list(range(15))
reverse = list(range(15))
reverse.reverse()
if self.config.rainbow_brightness > 0:
diff = (
self.config.alert_brightness
- self.config.rainbow_brightness
)
self.log.debug(diff)
if diff >= 50:
for idx in forward + reverse:
self.log.debug(idx)
self.log.debug(diff * idx)
self._update_all(
int(
self.config.rainbow_brightness
+ ((diff / len(forward)) * idx)
),
0,
50,
255,
50,
)
sleep(0.025)
else:
for idx in forward + reverse:
self.log.debug(idx)
self._update_all(
int(
(self.config.alert_brightness / len(forward)) * idx
),
0,
50,
255,
50,
)
sleep(0.025)
self._update_all(0, 0, 0, 0, 0)
self.queue.task_done()
except Empty:
if self.config.rainbow_brightness > 0:
for idx, light in enumerate(self.lights):
self._bulk(
*light.rainbow(
idx,
rotation,
len(self.lights),
100,
self.config.rainbow_brightness,
)
)
rotation = rotation + 1
if rotation >= 360:
rotation = 0
sleep(0.03)
else:
sleep(0.2)
self.log.info("Worker shutdown")