import logging from colorsys import hsv_to_rgb from queue import Empty from threading import Thread from time import sleep from sacn import sACNsender LOG = logging.getLogger('DMXQueue') class DMXQueue: def __init__(self, config, queue, lights): self.config = config self.queue = queue self.lights = lights self.worker = Thread(target=self._worker) self.worker_should_be_running = False self.sacn = sACNsender( fps=40, ) def start(self): self.sacn.start() self.sacn.activate_output(self.config.sacn.universe) self.sacn[self.config.sacn.universe].multicast = self.config.sacn.multicast if not self.config.sacn.multicast: self.sacn[self.config.sacn.universe].destination = self.config.sacn.target self.dmx_data = 512 * [0] self.worker_should_be_running = True self.worker.start() def stop(self): LOG.info('Waiting for worker to terminate ...') self.worker_should_be_running = False self.worker.join() self.sacn.stop() def _dmx(self, addr, data): self.dmx_data[addr - 1] = data self.sacn[self.config.sacn.universe].dmx_data = tuple(self.dmx_data) def _bulk(self, start_addr, values): for idx, value in enumerate(values): self._dmx(start_addr + idx, value) def _update_all(self, intensity, red, green, blue, white=0): for light in self.lights: light.intensity = intensity light.red = red light.green = green light.blue = blue light.white = white self._bulk(*light.dump()) def _worker(self): LOG.info('Worker startup') rotation = 0 while self.worker_should_be_running: try: level, component, text = self.queue.get_nowait() LOG.info(f'Got queue item: {level} {component} : {text}') self._update_all(0, 0, 0, 0) sleep(0.2) # effect duration should be between 1s and 1.5s if level == 'error': # three instances of two flashes each for i in range(3): for j in range(2): self._update_all( self.config.alerts.brightness, 255, 0, 0, 50 ) sleep(0.1) self._update_all(0, 255, 0, 0) sleep(0.1) sleep(0.2) elif level == 'warn': # warning: blink alternate, but slow for i in range(6): for idx, light in enumerate(self.lights): light.red = 255 light.green = 150 light.blue = 0 light.white = 0 if (idx + i) % 2: light.intensity = self.config.alerts.brightness else: light.intensity = 0 self._bulk(*light.dump()) sleep(0.2) self._update_all(0, 0, 0, 0) elif level == 'info': forward = list(range(15)) reverse = list(range(15)) reverse.reverse() # info: two times slow fade intensity_multiplier = self.config.alerts.brightness / 15 for i in range(2): for idx in forward + reverse: self._update_all( int(intensity_multiplier * idx), 0, 50, 255 ) sleep(0.025) sleep(0.2) self.queue.task_done() except Empty: if self.config.rainbow.enable: degrees_per_step = 360 / len(self.lights) for idx, light in enumerate(self.lights): light_degrees_dec = ( (rotation + (idx * degrees_per_step)) % 360 / 360 ) r, g, b = hsv_to_rgb( light_degrees_dec, 1, self.config.rainbow.intensity / 100, ) light.red = int(r * 255) light.green = int(g * 255) light.blue = int(b * 200) light.intensity = self.config.rainbow.brightness self._bulk(*light.dump()) if self.config.rainbow.speed >= 25: rotation = rotation + 1 if rotation >= 360: rotation = 0 sleep(self.config.rainbow.speed / 1000) else: sleep(0.2) else: sleep(0.2) LOG.info('Worker shutdown')