viri-leds-dmx-sacn/dmx_queue.py

104 lines
3.3 KiB
Python
Raw Normal View History

2023-08-07 11:39:15 +00:00
import logging
from queue import Empty
from threading import Thread
from time import sleep
from sacn import sACNsender
LOG = logging.getLogger('DMXQueue')
class DMXQueue:
def __init__(self, args, queue, lights):
self.args = args
self.queue = queue
self.lights = lights
self.worker = Thread(target=self._worker)
self.worker_should_be_running = False
self.sacn = sACNsender()
def start(self):
self.sacn.start()
self.sacn.activate_output(1)
self.sacn[1].multicast = self.args.sacn_multicast
if not self.args.sacn_multicast:
self.sacn[1].destination = self.args.sacn_target
self.dmx_data = 512*[0]
for i in range(1, 513):
self._dmx(i, 0)
self.worker_should_be_running = True
self.worker.start()
def stop(self):
LOG.info('Waiting for worker to terminate ...')
self.worker_should_be_running = False
self.worker.join()
self.sacn.stop()
def _dmx(self, addr, data):
self.dmx_data[addr-1] = data
self.sacn[1].dmx_data = tuple(self.dmx_data)
def _bulk(self, start_addr, values):
for idx, value in enumerate(values):
self._dmx(start_addr+idx, value)
def _update_all(self, intensity, red, green, blue, white=0):
for light in self.lights:
light.intensity = intensity
light.red = red
light.green = green
light.blue = blue
light.white = white
self._bulk(*light.dump())
def _worker(self):
LOG.info('Worker startup')
while self.worker_should_be_running:
try:
level, component, text = self.queue.get_nowait()
forward = list(range(17))
reverse = list(range(17))
reverse.reverse()
if level == 'error':
# three instances of two flashes each
for i in range(3):
for j in range(4):
self._update_all(255, 255, 0, 0, 50)
sleep(0.1)
self._update_all(0, 255, 0, 0)
sleep(0.1)
sleep(0.2)
elif level == 'warn':
# warning: blink alternate, but slow
for i in range(6):
for idx, light in enumerate(self.lights):
light.red = 255
light.green = 150
light.blue = 0
light.white = 0
if (idx + i) % 2:
light.intensity = 255
else:
light.intensity = 0
self._bulk(*light.dump())
sleep(0.5)
self._update_all(0, 0, 0, 0)
elif level == 'info':
for i in range(2):
for intensity_multiplier in forward + reverse:
self._update_all(intensity_multiplier*15, 0, 50, 255)
sleep(0.03)
self.queue.task_done()
except Empty:
sleep(0.1)
LOG.info('Worker shutdown')