From b6b57783ed0a4744a644c67e97fc8debbc0548fa Mon Sep 17 00:00:00 2001 From: Franziska Kunsmann Date: Tue, 8 Aug 2023 06:26:35 +0200 Subject: [PATCH] rework config parser, defaults for almost everything --- conf.py | 66 +++++++++++++++++++++++++++------------------ config.example.toml | 12 ++++----- dmx_queue.py | 30 +++++++++++---------- main.py | 6 ++--- mqtt_queue.py | 12 ++++----- 5 files changed, 71 insertions(+), 55 deletions(-) diff --git a/conf.py b/conf.py index 9856c9e..d431420 100644 --- a/conf.py +++ b/conf.py @@ -10,6 +10,13 @@ from sys import exit LOG = logging.getLogger('Config') +class ConfigWrapper: + # simple class to wrap a dict into a object + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + def load_and_validate_config(path): try: with open(path, 'r') as cf: @@ -18,31 +25,38 @@ def load_and_validate_config(path): LOG.error(f'{path} is no valid toml configuration file') exit(1) - # validate options exist. - for section, option in ( - ('mqtt', 'host'), - ('mqtt', 'topic'), - ('sacn', 'multicast'), - ('alerts', 'brightness'), - ('rainbow', 'enable'), - ('rainbow', 'intensity'), - ('rainbow', 'brightness'), - ('rainbow', 'speed'), - ): - if config.get(section, {}).get(option) is None: - LOG.error( - f'configuration option "{section}" "{option}" is missing in config' - ) - exit(1) + if not config.get('mqtt', {}).get('host'): + LOG.error( + f'configuration option "mqtt" "host" is missing in config, but required to exist' + ) + exit(1) - # dmx values - for section, option in ( - ('alerts', 'brightness'), - ('rainbow', 'intensity'), - ('rainbow', 'brightness'), - ): - if int(config[section][option]) < 10: - LOG.error(f'value of "{section}" "{option}" must be atleast 10') - exit(1) + conf = ConfigWrapper( + mqtt=ConfigWrapper( + host=config['mqtt']['host'], + user=config['mqtt'].get('user'), + password=config['mqtt'].get('password'), + topic=config['mqtt'].get('topic', '/voc/alert'), + ), + sacn=ConfigWrapper( + multicast=bool(config.get('sacn', {}).get('multicast', False) is True), + target=config.get('sacn', {}).get('target', '127.0.0.1'), + universe=int(config.get('sacn', {}).get('universe', 1)), + ), + alerts=ConfigWrapper( + brightness=max(int(config.get('alerts', {}).get('brightness', 255)), 10), + ), + rainbow=ConfigWrapper( + enable=bool(config.get('rainbow', {}).get('enable', True) is True), + intensity=max(int(config.get('rainbow', {}).get('intensity', 100)), 10), + brightness=max(int(config.get('rainbow', {}).get('brightness', 150)), 10), + speed=int(config.get('rainbow', {}).get('speed', 25)), + ), + lights=config.get('lights', {}), + ) - return config + if conf.alerts.brightness < conf.rainbow.brightness: + LOG.error('alerts brightness must be equal or above rainbow brightness') + exit(1) + + return conf diff --git a/config.example.toml b/config.example.toml index 7be539f..8e449f0 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,15 +1,12 @@ # C3VOC viri MQTT to sACN DMX -# There are no default values. Every option listed in this example -# config must be provided, even if the feature is disabled! -# The only exception to this is the 'lights' section on the bottom, -# you only have to list the fixtures you really have. - +# All options are optional, except for mqtt host. The options below +# reflect the default values if missing. # This section contains the login information to your mqtt server. # user and password are ignored if both are missing or empty. [mqtt] -host = "mqtt.c3voc.de" +host = "" user = "" password = "" topic = "/voc/alert" @@ -23,6 +20,9 @@ multicast = false # is sent to. target = "127.0.0.1" +# which universe to address +universe = 1 + [alerts] # This specifies the maximum DMX dimmer value that's sent to your lights diff --git a/dmx_queue.py b/dmx_queue.py index 74cbab8..e9fe3ed 100644 --- a/dmx_queue.py +++ b/dmx_queue.py @@ -18,15 +18,17 @@ class DMXQueue: self.worker = Thread(target=self._worker) self.worker_should_be_running = False - self.sacn = sACNsender() + self.sacn = sACNsender( + fps=40, + ) def start(self): self.sacn.start() - self.sacn.activate_output(1) + self.sacn.activate_output(self.config.sacn.universe) - self.sacn[1].multicast = self.config['sacn']['multicast'] - if not self.config['sacn']['multicast']: - self.sacn[1].destination = self.config['sacn']['target'] + self.sacn[self.config.sacn.universe].multicast = self.config.sacn.multicast + if not self.config.sacn.multicast: + self.sacn[self.config.sacn.universe].destination = self.config.sacn.target self.dmx_data = 512 * [0] @@ -41,7 +43,7 @@ class DMXQueue: def _dmx(self, addr, data): self.dmx_data[addr - 1] = data - self.sacn[1].dmx_data = tuple(self.dmx_data) + self.sacn[self.config.sacn.universe].dmx_data = tuple(self.dmx_data) def _bulk(self, start_addr, values): for idx, value in enumerate(values): @@ -74,7 +76,7 @@ class DMXQueue: for i in range(3): for j in range(4): self._update_all( - self.config['alerts']['brightness'], 255, 0, 0, 50 + self.config.alerts.brightness, 255, 0, 0, 50 ) sleep(0.1) self._update_all(0, 255, 0, 0) @@ -90,7 +92,7 @@ class DMXQueue: light.white = 0 if (idx + i) % 2: - light.intensity = self.config['alerts']['brightness'] + light.intensity = self.config.alerts.brightness else: light.intensity = 0 @@ -98,7 +100,7 @@ class DMXQueue: sleep(0.5) self._update_all(0, 0, 0, 0) elif level == 'info': - intensity_multiplier = self.config['alerts']['brightness'] / 17 + intensity_multiplier = self.config.alerts.brightness / 17 for i in range(2): for idx in forward + reverse: self._update_all( @@ -107,7 +109,7 @@ class DMXQueue: sleep(0.03) self.queue.task_done() except Empty: - if self.config['rainbow']['enable']: + if self.config.rainbow.enable: degrees_per_step = 360 / len(self.lights) for idx, light in enumerate(self.lights): @@ -117,21 +119,21 @@ class DMXQueue: r, g, b = hsv_to_rgb( light_degrees_dec, 1, - self.config['rainbow']['intensity'] / 100, + self.config.rainbow.intensity / 100, ) light.red = int(r * 255) light.green = int(g * 255) light.blue = int(b * 200) - light.intensity = self.config['rainbow']['brightness'] + light.intensity = self.config.rainbow.brightness self._bulk(*light.dump()) - if self.config['rainbow']['speed'] >= 25: + if self.config.rainbow.speed >= 25: rotation = rotation + 1 if rotation >= 360: rotation = 0 - sleep(self.config['rainbow']['speed'] / 1000) + sleep(self.config.rainbow.speed / 1000) else: sleep(0.2) else: diff --git a/main.py b/main.py index 2ad59d5..176c3a3 100755 --- a/main.py +++ b/main.py @@ -35,11 +35,11 @@ def main(): queue = Queue() lights = [] - for addr in config['lights'].get('ignition_wal_l710', []): + for addr in config.lights.get('ignition_wal_l710', []): lights.append(IgnitionWALL710(addr)) - for addr in config['lights'].get('varytec_hero_wash_712_zoom', []): + for addr in config.lights.get('varytec_hero_wash_712_zoom', []): lights.append(VarytecHeroWashZoom712(addr)) - for addr in config['lights'].get('wled_multi_rgb', []): + for addr in config.lights.get('wled_multi_rgb', []): lights.append(WLED(addr)) if not lights: diff --git a/mqtt_queue.py b/mqtt_queue.py index b3144c7..2120bc4 100644 --- a/mqtt_queue.py +++ b/mqtt_queue.py @@ -19,12 +19,12 @@ class MQTTQueue: self.client.on_message = self.on_mqtt_message def start(self): - if self.config['mqtt'].get('user') and self.config['mqtt'].get('password'): + if self.config.mqtt.user and self.config.mqtt.password: self.client.username_pw_set( - self.config['mqtt']['user'], self.config['mqtt']['password'] + self.config.mqtt.user, self.config.mqtt.password ) - self.client.connect(self.config['mqtt']['host'], 1883, 60) + self.client.connect(self.config.mqtt.host, 1883, 60) self.client.loop_start() def stop(self): @@ -32,13 +32,13 @@ class MQTTQueue: self.client.disconnect() def on_connect(self, client, userdata, flags, rc): - LOG.info(f'Connected to MQTT with code {rc}') + LOG.info(f'Connected to {self.config.mqtt.host} with code {rc}') - self.client.subscribe(self.config['mqtt']['topic']) + self.client.subscribe(self.config.mqtt.topic) LOG.info(f'Subscribed') def on_disconnect(self, client, userdata, rc): - LOG.warning(f'Disconnected from MQTT with code {rc}') + LOG.warning(f'Disconnected from {self.config.mqtt.host} with code {rc}') def on_mqtt_message(self, client, userdata, msg): try: