move configuration to config file

This commit is contained in:
Franzi 2023-08-07 20:05:20 +02:00
parent 127a293cb6
commit 381dc916d6
Signed by: kunsi
GPG key ID: 12E3D2136B818350
7 changed files with 170 additions and 97 deletions

3
.gitignore vendored
View file

@ -158,3 +158,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
config.toml

48
conf.py Normal file
View file

@ -0,0 +1,48 @@
try:
# python 3.11
from tomllib import loads as toml_load
except ImportError:
from rtoml import load as toml_load
import logging
from sys import exit
LOG = logging.getLogger('Config')
def load_and_validate_config(path):
try:
with open(path, 'r') as cf:
config = toml_load(cf.read())
except Exception as e:
LOG.error(f'{path} is no valid toml configuration file')
exit(1)
# validate options exist.
for section, option in (
('mqtt', 'host'),
('mqtt', 'topic'),
('sacn', 'multicast'),
('alerts', 'brightness'),
('rainbow', 'enable'),
('rainbow', 'intensity'),
('rainbow', 'brightness'),
('rainbow', 'speed'),
):
if config.get(section, {}).get(option) is None:
LOG.error(
f'configuration option "{section}" "{option}" is missing in config'
)
exit(1)
# dmx values
for section, option in (
('alerts', 'brightness'),
('rainbow', 'intensity'),
('rainbow', 'brightness'),
):
if int(config[section][option]) < 10:
LOG.error(f'value of "{section}" "{option}" must be atleast 10')
exit(1)
return config

59
config.example.toml Normal file
View file

@ -0,0 +1,59 @@
# C3VOC viri MQTT to sACN DMX
# There are no default values. Every option listed in this example
# config must be provided, even if the feature is disabled!
# The only exception to this is the 'lights' section on the bottom,
# you only have to list the fixtures you really have.
# This section contains the login information to your mqtt server.
# user and password are ignored if both are missing or empty.
[mqtt]
host = "mqtt.c3voc.de"
user = ""
password = ""
topic = "/voc/alert"
[sacn]
# Wether to enable sACN multicast. Default is off.
multicast = false
# If sACN multicast is disabled, this specifies the address sACN unicast
# is sent to.
target = "127.0.0.1"
[alerts]
# This specifies the maximum DMX dimmer value that's sent to your lights
# when alerts occur. This must be atleast the same or more as the
# rainbow brightness (see below).
brightness = 255
[rainbow]
# Wether to enable the rainbow 'no alerts' loop. If false, all other
# options in here will be ignored.
enable = true
# This directly controls the 'value' part of the HSV equation in the
# rainbow 'no alerts' break loop. Value must be between 10% and 100%.
intensity = 100
# DMX dimmer value when displaying the rainbow pattern. Must be equal
# or below the generic 'brightness' value above.
brightness = 150
# Speed of the rainbow pattern. This is specified as "miliseconds
# between rotating the hue wheel by 1 degree". Minimum value is 25,
# because sACN does not support more than 40 fps. Setting it any lower
# will disable the animation altogehter, resulting in static lights.
speed = 25
# This contains the DMX start addresses of your light fixtures. You
# have to add atleast one fixture for the software to work.
[lights]
ignition_wal_l710 = []
varytec_hero_wash_712_zoom = []
wled_multi_rgb = []

View file

@ -10,8 +10,8 @@ LOG = logging.getLogger('DMXQueue')
class DMXQueue: class DMXQueue:
def __init__(self, args, queue, lights): def __init__(self, config, queue, lights):
self.args = args self.config = config
self.queue = queue self.queue = queue
self.lights = lights self.lights = lights
@ -24,13 +24,11 @@ class DMXQueue:
self.sacn.start() self.sacn.start()
self.sacn.activate_output(1) self.sacn.activate_output(1)
self.sacn[1].multicast = self.args.sacn_multicast self.sacn[1].multicast = self.config['sacn']['multicast']
if not self.args.sacn_multicast: if not self.config['sacn']['multicast']:
self.sacn[1].destination = self.args.sacn_target self.sacn[1].destination = self.config['sacn']['target']
self.dmx_data = 512 * [0] self.dmx_data = 512 * [0]
for i in range(1, 513):
self._dmx(i, 0)
self.worker_should_be_running = True self.worker_should_be_running = True
self.worker.start() self.worker.start()
@ -75,7 +73,9 @@ class DMXQueue:
# three instances of two flashes each # three instances of two flashes each
for i in range(3): for i in range(3):
for j in range(4): for j in range(4):
self._update_all(255, 255, 0, 0, 50) self._update_all(
self.config['alerts']['brightness'], 255, 0, 0, 50
)
sleep(0.1) sleep(0.1)
self._update_all(0, 255, 0, 0) self._update_all(0, 255, 0, 0)
sleep(0.1) sleep(0.1)
@ -90,7 +90,7 @@ class DMXQueue:
light.white = 0 light.white = 0
if (idx + i) % 2: if (idx + i) % 2:
light.intensity = 255 light.intensity = self.config['alerts']['brightness']
else: else:
light.intensity = 0 light.intensity = 0
@ -98,31 +98,42 @@ class DMXQueue:
sleep(0.5) sleep(0.5)
self._update_all(0, 0, 0, 0) self._update_all(0, 0, 0, 0)
elif level == 'info': elif level == 'info':
intensity_multiplier = self.config['alerts']['brightness'] / 17
for i in range(2): for i in range(2):
for intensity_multiplier in forward + reverse: for idx in forward + reverse:
self._update_all(intensity_multiplier * 15, 0, 50, 255) self._update_all(
int(intensity_multiplier * idx), 0, 50, 255
)
sleep(0.03) sleep(0.03)
self.queue.task_done() self.queue.task_done()
except Empty: except Empty:
degrees_per_step = 360 / len(self.lights) if self.config['rainbow']['enable']:
degrees_per_step = 360 / len(self.lights)
for idx, light in enumerate(self.lights): for idx, light in enumerate(self.lights):
light_degrees_dec = ( light_degrees_dec = (
(rotation + (idx * degrees_per_step)) % 360 / 360 (rotation + (idx * degrees_per_step)) % 360 / 360
) )
r, g, b = hsv_to_rgb( r, g, b = hsv_to_rgb(
light_degrees_dec, 1, self.args.rainbow_intensity / 100 light_degrees_dec,
) 1,
self.config['rainbow']['intensity'] / 100,
)
light.red = int(r * 255) light.red = int(r * 255)
light.green = int(g * 255) light.green = int(g * 255)
light.blue = int(b * 200) light.blue = int(b * 200)
light.intensity = self.args.rainbow_brightness light.intensity = self.config['rainbow']['brightness']
self._bulk(*light.dump()) self._bulk(*light.dump())
rotation = rotation + 1 if self.config['rainbow']['speed'] >= 25:
if rotation >= 360: rotation = rotation + 1
rotation = 0 if rotation >= 360:
rotation = 0
sleep(self.args.rainbow_speed / 1000) sleep(self.config['rainbow']['speed'] / 1000)
else:
sleep(0.2)
else:
sleep(0.2)
LOG.info('Worker shutdown') LOG.info('Worker shutdown')

69
main.py
View file

@ -6,6 +6,7 @@ from queue import Queue
from sys import exit from sys import exit
from time import sleep from time import sleep
from conf import load_and_validate_config
from dmx_queue import DMXQueue from dmx_queue import DMXQueue
from lights.ignition_wal_l710 import IgnitionWALL710 from lights.ignition_wal_l710 import IgnitionWALL710
from lights.varytec_hero_wash_zoom_712 import VarytecHeroWashZoom712 from lights.varytec_hero_wash_zoom_712 import VarytecHeroWashZoom712
@ -22,75 +23,23 @@ LOG = logging.getLogger('main')
def main(): def main():
parser = ArgumentParser() parser = ArgumentParser()
# rainbow is output if we have no effects
parser.add_argument( parser.add_argument(
'--rainbow-intensity', '--config',
type=int, default='config.toml',
default=50,
help='intensity of the rainbow colours in percent ("value" in HSV)',
) )
parser.add_argument(
'--rainbow-brightness',
type=int,
default=150,
help='brightness of the rainbow colour (dmx value of dimmer)',
)
parser.add_argument(
'--rainbow-speed',
type=int,
default=10,
help='speed of rainbow colour change (ms per degree hue)',
)
# MQTT
parser.add_argument('--mqtt-host', required=True)
parser.add_argument('--mqtt-port', type=int, default=1883)
parser.add_argument('--mqtt-user')
parser.add_argument('--mqtt-pass', '--mqtt-password')
parser.add_argument(
'--mqtt-topic',
default='/voc/alert',
help='mqtt topic to listen on, default /voc/alert',
)
# sACN
sacn = parser.add_mutually_exclusive_group(required=True)
sacn.add_argument(
'--sacn-multicast', action='store_true', help='use sACN multicast'
)
sacn.add_argument('--sacn-target', help='send sACN unicast to specified address')
# Lights
parser.add_argument(
'--ignition-wal-l710',
nargs='+',
type=int,
help='dmx start addresses of Ignition WAL-L710',
)
parser.add_argument(
'--varytec-wash-zoom-712',
nargs='+',
type=int,
help='dmx start addresses of Varytec Hero Wash 712 Z',
)
parser.add_argument(
'--wled', nargs='+', type=int, help='dmx start addresses of WLED receivers'
)
args = parser.parse_args() args = parser.parse_args()
config = load_and_validate_config(args.config)
LOG.info('Welcome to Voc2DMX') LOG.info('Welcome to Voc2DMX')
LOG.debug(args)
queue = Queue() queue = Queue()
lights = [] lights = []
for addr in args.ignition_wal_l710 or []: for addr in config['lights'].get('ignition_wal_l710', []):
lights.append(IgnitionWALL710(addr)) lights.append(IgnitionWALL710(addr))
for addr in args.varytec_wash_zoom_712 or []: for addr in config['lights'].get('varytec_hero_wash_712_zoom', []):
lights.append(VarytecHeroWashZoom712(addr)) lights.append(VarytecHeroWashZoom712(addr))
for addr in args.wled or []: for addr in config['lights'].get('wled_multi_rgb', []):
lights.append(WLED(addr)) lights.append(WLED(addr))
if not lights: if not lights:
@ -105,8 +54,8 @@ def main():
LOG.info('Initializing worker queues ...') LOG.info('Initializing worker queues ...')
mqttq = MQTTQueue(args, queue) mqttq = MQTTQueue(config, queue)
dmxq = DMXQueue(args, queue, lights) dmxq = DMXQueue(config, queue, lights)
mqttq.start() mqttq.start()
dmxq.start() dmxq.start()

View file

@ -9,8 +9,8 @@ LOG = logging.getLogger('MQTTQueue')
class MQTTQueue: class MQTTQueue:
def __init__(self, args, queue): def __init__(self, config, queue):
self.args = args self.config = config
self.client = mqtt.Client() self.client = mqtt.Client()
self.queue = queue self.queue = queue
@ -19,10 +19,12 @@ class MQTTQueue:
self.client.on_message = self.on_mqtt_message self.client.on_message = self.on_mqtt_message
def start(self): def start(self):
if self.args.mqtt_user and self.args.mqtt_pass: if self.config['mqtt'].get('user') and self.config['mqtt'].get('password'):
self.client.username_pw_set(self.args.mqtt_user, self.args.mqtt_pass) self.client.username_pw_set(
self.config['mqtt']['user'], self.config['mqtt']['password']
)
self.client.connect(self.args.mqtt_host, self.args.mqtt_port, 60) self.client.connect(self.config['mqtt']['host'], 1883, 60)
self.client.loop_start() self.client.loop_start()
def stop(self): def stop(self):
@ -30,13 +32,13 @@ class MQTTQueue:
self.client.disconnect() self.client.disconnect()
def on_connect(self, client, userdata, flags, rc): def on_connect(self, client, userdata, flags, rc):
LOG.info(f'Connected to {self.args.mqtt_host} with code {rc}') LOG.info(f'Connected to MQTT with code {rc}')
self.client.subscribe(self.args.mqtt_topic) self.client.subscribe(self.config['mqtt']['topic'])
LOG.info(f'Subscribed to {self.args.mqtt_topic}') LOG.info(f'Subscribed')
def on_disconnect(self, client, userdata, rc): def on_disconnect(self, client, userdata, rc):
LOG.warning(f'Disconnected from {self.args.mqtt_host} with code {rc}') LOG.warning(f'Disconnected from MQTT with code {rc}')
def on_mqtt_message(self, client, userdata, msg): def on_mqtt_message(self, client, userdata, msg):
try: try:

View file

@ -1,2 +1,3 @@
paho-mqtt==1.6.1 paho-mqtt==1.6.1
sacn==1.9.0 sacn==1.9.0
rtoml;python_version<'3.11'