rework config parser, defaults for almost everything

This commit is contained in:
Franzi 2023-08-08 06:26:35 +02:00
parent 381dc916d6
commit b6b57783ed
Signed by: kunsi
GPG key ID: 12E3D2136B818350
5 changed files with 71 additions and 55 deletions

66
conf.py
View file

@ -10,6 +10,13 @@ from sys import exit
LOG = logging.getLogger('Config')
class ConfigWrapper:
# simple class to wrap a dict into a object
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def load_and_validate_config(path):
try:
with open(path, 'r') as cf:
@ -18,31 +25,38 @@ def load_and_validate_config(path):
LOG.error(f'{path} is no valid toml configuration file')
exit(1)
# validate options exist.
for section, option in (
('mqtt', 'host'),
('mqtt', 'topic'),
('sacn', 'multicast'),
('alerts', 'brightness'),
('rainbow', 'enable'),
('rainbow', 'intensity'),
('rainbow', 'brightness'),
('rainbow', 'speed'),
):
if config.get(section, {}).get(option) is None:
LOG.error(
f'configuration option "{section}" "{option}" is missing in config'
)
exit(1)
if not config.get('mqtt', {}).get('host'):
LOG.error(
f'configuration option "mqtt" "host" is missing in config, but required to exist'
)
exit(1)
# dmx values
for section, option in (
('alerts', 'brightness'),
('rainbow', 'intensity'),
('rainbow', 'brightness'),
):
if int(config[section][option]) < 10:
LOG.error(f'value of "{section}" "{option}" must be atleast 10')
exit(1)
conf = ConfigWrapper(
mqtt=ConfigWrapper(
host=config['mqtt']['host'],
user=config['mqtt'].get('user'),
password=config['mqtt'].get('password'),
topic=config['mqtt'].get('topic', '/voc/alert'),
),
sacn=ConfigWrapper(
multicast=bool(config.get('sacn', {}).get('multicast', False) is True),
target=config.get('sacn', {}).get('target', '127.0.0.1'),
universe=int(config.get('sacn', {}).get('universe', 1)),
),
alerts=ConfigWrapper(
brightness=max(int(config.get('alerts', {}).get('brightness', 255)), 10),
),
rainbow=ConfigWrapper(
enable=bool(config.get('rainbow', {}).get('enable', True) is True),
intensity=max(int(config.get('rainbow', {}).get('intensity', 100)), 10),
brightness=max(int(config.get('rainbow', {}).get('brightness', 150)), 10),
speed=int(config.get('rainbow', {}).get('speed', 25)),
),
lights=config.get('lights', {}),
)
return config
if conf.alerts.brightness < conf.rainbow.brightness:
LOG.error('alerts brightness must be equal or above rainbow brightness')
exit(1)
return conf

View file

@ -1,15 +1,12 @@
# C3VOC viri MQTT to sACN DMX
# There are no default values. Every option listed in this example
# config must be provided, even if the feature is disabled!
# The only exception to this is the 'lights' section on the bottom,
# you only have to list the fixtures you really have.
# All options are optional, except for mqtt host. The options below
# reflect the default values if missing.
# This section contains the login information to your mqtt server.
# user and password are ignored if both are missing or empty.
[mqtt]
host = "mqtt.c3voc.de"
host = ""
user = ""
password = ""
topic = "/voc/alert"
@ -23,6 +20,9 @@ multicast = false
# is sent to.
target = "127.0.0.1"
# which universe to address
universe = 1
[alerts]
# This specifies the maximum DMX dimmer value that's sent to your lights

View file

@ -18,15 +18,17 @@ class DMXQueue:
self.worker = Thread(target=self._worker)
self.worker_should_be_running = False
self.sacn = sACNsender()
self.sacn = sACNsender(
fps=40,
)
def start(self):
self.sacn.start()
self.sacn.activate_output(1)
self.sacn.activate_output(self.config.sacn.universe)
self.sacn[1].multicast = self.config['sacn']['multicast']
if not self.config['sacn']['multicast']:
self.sacn[1].destination = self.config['sacn']['target']
self.sacn[self.config.sacn.universe].multicast = self.config.sacn.multicast
if not self.config.sacn.multicast:
self.sacn[self.config.sacn.universe].destination = self.config.sacn.target
self.dmx_data = 512 * [0]
@ -41,7 +43,7 @@ class DMXQueue:
def _dmx(self, addr, data):
self.dmx_data[addr - 1] = data
self.sacn[1].dmx_data = tuple(self.dmx_data)
self.sacn[self.config.sacn.universe].dmx_data = tuple(self.dmx_data)
def _bulk(self, start_addr, values):
for idx, value in enumerate(values):
@ -74,7 +76,7 @@ class DMXQueue:
for i in range(3):
for j in range(4):
self._update_all(
self.config['alerts']['brightness'], 255, 0, 0, 50
self.config.alerts.brightness, 255, 0, 0, 50
)
sleep(0.1)
self._update_all(0, 255, 0, 0)
@ -90,7 +92,7 @@ class DMXQueue:
light.white = 0
if (idx + i) % 2:
light.intensity = self.config['alerts']['brightness']
light.intensity = self.config.alerts.brightness
else:
light.intensity = 0
@ -98,7 +100,7 @@ class DMXQueue:
sleep(0.5)
self._update_all(0, 0, 0, 0)
elif level == 'info':
intensity_multiplier = self.config['alerts']['brightness'] / 17
intensity_multiplier = self.config.alerts.brightness / 17
for i in range(2):
for idx in forward + reverse:
self._update_all(
@ -107,7 +109,7 @@ class DMXQueue:
sleep(0.03)
self.queue.task_done()
except Empty:
if self.config['rainbow']['enable']:
if self.config.rainbow.enable:
degrees_per_step = 360 / len(self.lights)
for idx, light in enumerate(self.lights):
@ -117,21 +119,21 @@ class DMXQueue:
r, g, b = hsv_to_rgb(
light_degrees_dec,
1,
self.config['rainbow']['intensity'] / 100,
self.config.rainbow.intensity / 100,
)
light.red = int(r * 255)
light.green = int(g * 255)
light.blue = int(b * 200)
light.intensity = self.config['rainbow']['brightness']
light.intensity = self.config.rainbow.brightness
self._bulk(*light.dump())
if self.config['rainbow']['speed'] >= 25:
if self.config.rainbow.speed >= 25:
rotation = rotation + 1
if rotation >= 360:
rotation = 0
sleep(self.config['rainbow']['speed'] / 1000)
sleep(self.config.rainbow.speed / 1000)
else:
sleep(0.2)
else:

View file

@ -35,11 +35,11 @@ def main():
queue = Queue()
lights = []
for addr in config['lights'].get('ignition_wal_l710', []):
for addr in config.lights.get('ignition_wal_l710', []):
lights.append(IgnitionWALL710(addr))
for addr in config['lights'].get('varytec_hero_wash_712_zoom', []):
for addr in config.lights.get('varytec_hero_wash_712_zoom', []):
lights.append(VarytecHeroWashZoom712(addr))
for addr in config['lights'].get('wled_multi_rgb', []):
for addr in config.lights.get('wled_multi_rgb', []):
lights.append(WLED(addr))
if not lights:

View file

@ -19,12 +19,12 @@ class MQTTQueue:
self.client.on_message = self.on_mqtt_message
def start(self):
if self.config['mqtt'].get('user') and self.config['mqtt'].get('password'):
if self.config.mqtt.user and self.config.mqtt.password:
self.client.username_pw_set(
self.config['mqtt']['user'], self.config['mqtt']['password']
self.config.mqtt.user, self.config.mqtt.password
)
self.client.connect(self.config['mqtt']['host'], 1883, 60)
self.client.connect(self.config.mqtt.host, 1883, 60)
self.client.loop_start()
def stop(self):
@ -32,13 +32,13 @@ class MQTTQueue:
self.client.disconnect()
def on_connect(self, client, userdata, flags, rc):
LOG.info(f'Connected to MQTT with code {rc}')
LOG.info(f'Connected to {self.config.mqtt.host} with code {rc}')
self.client.subscribe(self.config['mqtt']['topic'])
self.client.subscribe(self.config.mqtt.topic)
LOG.info(f'Subscribed')
def on_disconnect(self, client, userdata, rc):
LOG.warning(f'Disconnected from MQTT with code {rc}')
LOG.warning(f'Disconnected from {self.config.mqtt.host} with code {rc}')
def on_mqtt_message(self, client, userdata, msg):
try: