viri-leds-dmx-sacn/mqtt_queue.py

56 lines
1.6 KiB
Python
Raw Normal View History

2023-08-07 11:39:15 +00:00
import json
import logging
import re
from queue import Queue
import paho.mqtt.client as mqtt
LOG = logging.getLogger('MQTTQueue')
2023-08-07 12:48:18 +00:00
2023-08-07 11:39:15 +00:00
class MQTTQueue:
def __init__(self, args, queue):
self.args = args
self.client = mqtt.Client()
self.queue = queue
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_mqtt_message
def start(self):
if self.args.mqtt_user and self.args.mqtt_pass:
self.client.username_pw_set(self.args.mqtt_user, self.args.mqtt_pass)
self.client.connect(self.args.mqtt_host, self.args.mqtt_port, 60)
self.client.loop_start()
def stop(self):
self.client.loop_stop()
self.client.disconnect()
def on_connect(self, client, userdata, flags, rc):
LOG.info(f'Connected to {self.args.mqtt_host} with code {rc}')
self.client.subscribe(self.args.mqtt_topic)
LOG.info(f'Subscribed to {self.args.mqtt_topic}')
def on_disconnect(self, client, userdata, rc):
LOG.warning(f'Disconnected from {self.args.mqtt_host} with code {rc}')
def on_mqtt_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload.decode('utf-8'))
text = re.sub(r'\<[a-z\/]+\>', '', data['msg'])
2023-08-07 12:48:18 +00:00
self.queue.put(
(
data['level'].lower(),
data['component'],
text,
)
)
2023-08-07 11:39:15 +00:00
except Exception as e:
LOG.exception(msg.payload)