import logging from queue import Empty from threading import Thread from time import sleep from sacn import sACNsender import lights class DMXQueue: def __init__(self, config, universe, queue): self.log = logging.getLogger(f"DMXQueue {universe}") self.config = config.universes[universe] self.queue = queue self.lights = [] for classname, addrs in self.config.lights.items(): cls = getattr(lights, classname) for addr in addrs: self.lights.append(cls(addr)) self.worker = Thread(target=self._worker) self.worker_should_be_running = False self.sacn = sACNsender( fps=40, ) def start(self): self.sacn.start() self.sacn.activate_output(self.config.universe) self.sacn[self.config.universe].multicast = self.config.multicast if not self.config.multicast: self.sacn[self.config.universe].destination = self.config.target self.dmx_data = 512 * [0] self.worker_should_be_running = True self.worker.start() def stop(self): self.log.info("Waiting for worker to terminate ...") self.worker_should_be_running = False self.worker.join() self.sacn.stop() def _dmx(self, addr, data): self.dmx_data[addr - 1] = data self.sacn[self.config.universe].dmx_data = tuple(self.dmx_data) def _bulk(self, start_addr, values): for idx, value in enumerate(values): self._dmx(start_addr + idx, value) def _update_all(self, intensity, red, green, blue, white): for light in self.lights: light.intensity = intensity light.red = red light.green = green light.blue = blue light.white = white self._bulk(*light.dump()) def _worker(self): self.log.info("Worker startup") rotation = 0 while self.worker_should_be_running: try: level, component, text = self.queue.get_nowait() if self.config.filters: filtered = True for f in self.config.filters: if re.search(f, component, re.IGNORECASE): filtered = False break # no point in searching further if filtered: # no alert for filtered messages continue self.log.info(f"Got queue item: {level} {component} : {text}") # effect duration should be between 1s and 1.5s if level == "error": self._update_all(0, 0, 0, 0, 0) sleep(0.2) # three instances of two flashes each for i in range(3): for j in range(2): self._update_all( self.config.alert_brightness, 255, 0, 0, 50 ) sleep(0.1) self._update_all(0, 255, 0, 0, 50) sleep(0.1) sleep(0.2) elif level == "warn": self._update_all(0, 0, 0, 0, 0) sleep(0.2) # warning: blink alternate, but slow for i in range(6): for idx, light in enumerate(self.lights): light.red = 255 light.green = 150 light.blue = 0 light.white = 50 if (idx + i) % 2: light.intensity = self.config.alert_brightness else: light.intensity = 0 self._bulk(*light.dump()) sleep(0.2) self._update_all(0, 0, 0, 0, 0) sleep(0.2) elif level == "info": forward = list(range(15)) reverse = list(range(15)) reverse.reverse() if self.config.rainbow_brightness > 0: diff = ( self.config.alert_brightness - self.config.rainbow_brightness ) self.log.debug(diff) if diff >= 50: for idx in forward + reverse: self.log.debug(idx) self.log.debug(diff * idx) self._update_all( int( self.config.rainbow_brightness + ((diff / len(forward)) * idx) ), 0, 50, 255, 50, ) sleep(0.025) else: for idx in forward + reverse: self.log.debug(idx) self._update_all( int( (self.config.alert_brightness / len(forward)) * idx ), 0, 50, 255, 50, ) sleep(0.025) self._update_all(0, 0, 0, 0, 0) self.queue.task_done() except Empty: if self.config.rainbow_brightness > 0: for idx, light in enumerate(self.lights): self._bulk( *light.rainbow( idx, rotation, len(self.lights), 100, self.config.rainbow_brightness, ) ) rotation = rotation + 1 if rotation >= 360: rotation = 0 sleep(0.03) else: sleep(0.2) self.log.info("Worker shutdown")