import logging from colorsys import hsv_to_rgb from queue import Empty from threading import Thread from time import sleep from sacn import sACNsender LOG = logging.getLogger('DMXQueue') class DMXQueue: def __init__(self, args, queue, lights): self.args = args self.queue = queue self.lights = lights self.worker = Thread(target=self._worker) self.worker_should_be_running = False self.sacn = sACNsender() def start(self): self.sacn.start() self.sacn.activate_output(1) self.sacn[1].multicast = self.args.sacn_multicast if not self.args.sacn_multicast: self.sacn[1].destination = self.args.sacn_target self.dmx_data = 512 * [0] for i in range(1, 513): self._dmx(i, 0) self.worker_should_be_running = True self.worker.start() def stop(self): LOG.info('Waiting for worker to terminate ...') self.worker_should_be_running = False self.worker.join() self.sacn.stop() def _dmx(self, addr, data): self.dmx_data[addr - 1] = data self.sacn[1].dmx_data = tuple(self.dmx_data) def _bulk(self, start_addr, values): for idx, value in enumerate(values): self._dmx(start_addr + idx, value) def _update_all(self, intensity, red, green, blue, white=0): for light in self.lights: light.intensity = intensity light.red = red light.green = green light.blue = blue light.white = white self._bulk(*light.dump()) def _worker(self): LOG.info('Worker startup') rotation = 0 while self.worker_should_be_running: try: level, component, text = self.queue.get_nowait() LOG.info(f'Got queue item: {level} {component} : {text}') forward = list(range(17)) reverse = list(range(17)) reverse.reverse() if level == 'error': # three instances of two flashes each for i in range(3): for j in range(4): self._update_all(255, 255, 0, 0, 50) sleep(0.1) self._update_all(0, 255, 0, 0) sleep(0.1) sleep(0.2) elif level == 'warn': # warning: blink alternate, but slow for i in range(6): for idx, light in enumerate(self.lights): light.red = 255 light.green = 150 light.blue = 0 light.white = 0 if (idx + i) % 2: light.intensity = 255 else: light.intensity = 0 self._bulk(*light.dump()) sleep(0.5) self._update_all(0, 0, 0, 0) elif level == 'info': for i in range(2): for intensity_multiplier in forward + reverse: self._update_all(intensity_multiplier * 15, 0, 50, 255) sleep(0.03) self.queue.task_done() except Empty: degrees_per_step = 360 / len(self.lights) for idx, light in enumerate(self.lights): light_degrees_dec = ( (rotation + (idx * degrees_per_step)) % 360 / 360 ) r, g, b = hsv_to_rgb( light_degrees_dec, 1, self.args.rainbow_intensity / 100 ) light.red = int(r * 255) light.green = int(g * 255) light.blue = int(b * 200) light.intensity = self.args.rainbow_brightness self._bulk(*light.dump()) rotation = rotation + 1 if rotation >= 360: rotation = 0 sleep(self.args.rainbow_speed / 1000) LOG.info('Worker shutdown')