import json import logging import re from queue import Queue import paho.mqtt.client as mqtt LOG = logging.getLogger('MQTTQueue') class MQTTQueue: def __init__(self, config, queue): self.config = config self.client = mqtt.Client() self.queue = queue self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_mqtt_message def start(self): if self.config.mqtt.user and self.config.mqtt.password: self.client.username_pw_set( self.config.mqtt.user, self.config.mqtt.password ) self.client.connect(self.config.mqtt.host, 1883, 60) self.client.loop_start() def stop(self): self.client.loop_stop() self.client.disconnect() def on_connect(self, client, userdata, flags, rc): LOG.info(f'Connected to {self.config.mqtt.host} with code {rc}') self.client.subscribe(self.config.mqtt.topic) LOG.info(f'Subscribed') def on_disconnect(self, client, userdata, rc): LOG.warning(f'Disconnected from {self.config.mqtt.host} with code {rc}') def on_mqtt_message(self, client, userdata, msg): try: data = json.loads(msg.payload.decode('utf-8')) text = re.sub(r'\<[a-z\/]+\>', '', data['msg']) add_to_queue = True if self.config.alerts.filters: add_to_queue = False for f in self.config.alerts.filters: if re.search(f, data['component'], re.IGNORECASE): add_to_queue = True break # no point in searching further if add_to_queue: self.queue.put( ( data['level'].lower(), data['component'], text, ) ) LOG.info(f'Put queue item: {data["level"].lower()} {data["component"]} : {text}') else: LOG.info(f'Ignoring message for {data["component"]} because it was filtered') except Exception as e: LOG.exception(msg.payload)