sdm630_mqtt/telegraf.py

72 lines
1.7 KiB
Python

import logging
from sys import argv
from tomllib import load
import paho.mqtt.client as mqtt
with open(argv[1], "rb") as f:
config = load(f)
MQTT_SUB = "{}/".format(config["mqtt"]["prefix"])
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
def out(keys, value):
print(
"sdm630,identifier={},{} value={}".format(
config["telegraf"]["identifier"],
",".join([f"{k}={v}" for k, v in keys.items()]),
value,
),
flush=True,
)
def on_connect(client, userdata, flags, rc):
LOG.info(f"Connected to mqtt server")
mqtt.subscribe(f"{MQTT_SUB}#")
LOG.info(f"subscribing to {MQTT_SUB}")
def on_disconnect(client, userdata, rc):
LOG.info(f"Disconnected from mqtt server")
def on_message(client, userdata, msg):
try:
topic = msg.topic[len(MQTT_SUB) :]
if "/" in topic:
k1, k2 = topic.split("/", 1)
out(
{
"measurement": k1,
"line": k2,
},
msg.payload.decode(),
)
else:
out(
{
"measurement": topic,
},
msg.payload.decode(),
)
except Exception:
LOG.exception(f"could not parse {msg.topic!r} {msg.payload!r}")
try:
LOG.info("mooooin!")
mqtt = mqtt.Client()
mqtt.on_connect = on_connect
mqtt.on_disconnect = on_disconnect
mqtt.on_message = on_message
mqtt.connect(config["mqtt"]["host"], config["mqtt"]["port"], 10)
LOG.info("starting loop")
mqtt.loop_forever()
LOG.info("bye")
except Exception:
LOG.exception("oops")