import json import logging import re from queue import Queue import paho.mqtt.client as mqtt LOG = logging.getLogger("MQTTQueue") class MQTTQueue: def __init__(self, config, queues): self.config = config self.client = mqtt.Client() self.queues = queues self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_mqtt_message def start(self): if self.config.mqtt.user and self.config.mqtt.password: self.client.username_pw_set( self.config.mqtt.user, self.config.mqtt.password ) self.client.connect(self.config.mqtt.host, 1883, 60) self.client.loop_start() def stop(self): self.client.loop_stop() self.client.disconnect() def on_connect(self, client, userdata, flags, rc): LOG.info(f"Connected to {self.config.mqtt.host} with code {rc}") self.client.subscribe(self.config.mqtt.topic) LOG.info(f"Subscribed") def on_disconnect(self, client, userdata, rc): LOG.warning(f"Disconnected from {self.config.mqtt.host} with code {rc}") def on_mqtt_message(self, client, userdata, msg): try: data = json.loads(msg.payload.decode("utf-8")) text = re.sub(r"\<[a-z\/]+\>", "", data["msg"]) for queue in self.queues: self.queues[queue].put( ( data["level"].lower(), data["component"], text, ) ) LOG.info( f'Put queue {queue} item: {data["level"].lower()} {data["component"]} : {text}' ) except Exception as e: LOG.exception(msg.payload)