From 263b166c93f7c28fd1fd150dddbf5196bf4b3b95 Mon Sep 17 00:00:00 2001 From: Franziska Kunsmann Date: Tue, 30 Jul 2024 19:23:41 +0200 Subject: [PATCH] add telegraf output script --- telegraf.py | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 telegraf.py diff --git a/telegraf.py b/telegraf.py new file mode 100644 index 0000000..ae821b5 --- /dev/null +++ b/telegraf.py @@ -0,0 +1,72 @@ +import logging +from sys import argv +from tomllib import load + +import paho.mqtt.client as mqtt + +with open(argv[1], "rb") as f: + config = load(f) + +MQTT_SUB = "{}/".format(config["mqtt"]["prefix"]) + +logging.basicConfig(level=logging.DEBUG) +LOG = logging.getLogger(__name__) + + +def out(keys, value): + print( + "{},{} value={}".format( + config["telegraf"]["measurement"], + ",".join([f"{k}={v}" for k, v in keys.items()]), + value, + ), + flush=True, + ) + + +def on_connect(client, userdata, flags, rc): + LOG.info(f"Connected to mqtt server") + mqtt.subscribe(f"{MQTT_SUB}#") + LOG.info(f"subscribing to {MQTT_SUB}") + + +def on_disconnect(client, userdata, rc): + LOG.info(f"Disconnected from mqtt server") + + +def on_message(client, userdata, msg): + try: + topic = msg.topic[len(MQTT_SUB) :] + if "/" in topic: + k1, k2 = topic.split("/", 1) + out( + { + "measurement": k1, + "line": k2, + }, + msg.payload.decode(), + ) + else: + out( + { + "measurement": topic, + }, + msg.payload.decode(), + ) + except Exception: + LOG.exception(f"could not parse {msg.topic!r} {msg.payload!r}") + + +try: + LOG.info("mooooin!") + mqtt = mqtt.Client() + mqtt.on_connect = on_connect + mqtt.on_disconnect = on_disconnect + mqtt.on_message = on_message + mqtt.connect(config["mqtt"]["host"], config["mqtt"]["port"], 10) + + LOG.info("starting loop") + mqtt.loop_forever() + LOG.info("bye") +except Exception: + LOG.exception("oops")