73 lines
1.7 KiB
Python
73 lines
1.7 KiB
Python
|
import logging
|
||
|
from sys import argv
|
||
|
from tomllib import load
|
||
|
|
||
|
import paho.mqtt.client as mqtt
|
||
|
|
||
|
with open(argv[1], "rb") as f:
|
||
|
config = load(f)
|
||
|
|
||
|
MQTT_SUB = "{}/".format(config["mqtt"]["prefix"])
|
||
|
|
||
|
logging.basicConfig(level=logging.DEBUG)
|
||
|
LOG = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def out(keys, value):
|
||
|
print(
|
||
|
"{},{} value={}".format(
|
||
|
config["telegraf"]["measurement"],
|
||
|
",".join([f"{k}={v}" for k, v in keys.items()]),
|
||
|
value,
|
||
|
),
|
||
|
flush=True,
|
||
|
)
|
||
|
|
||
|
|
||
|
def on_connect(client, userdata, flags, rc):
|
||
|
LOG.info(f"Connected to mqtt server")
|
||
|
mqtt.subscribe(f"{MQTT_SUB}#")
|
||
|
LOG.info(f"subscribing to {MQTT_SUB}")
|
||
|
|
||
|
|
||
|
def on_disconnect(client, userdata, rc):
|
||
|
LOG.info(f"Disconnected from mqtt server")
|
||
|
|
||
|
|
||
|
def on_message(client, userdata, msg):
|
||
|
try:
|
||
|
topic = msg.topic[len(MQTT_SUB) :]
|
||
|
if "/" in topic:
|
||
|
k1, k2 = topic.split("/", 1)
|
||
|
out(
|
||
|
{
|
||
|
"measurement": k1,
|
||
|
"line": k2,
|
||
|
},
|
||
|
msg.payload.decode(),
|
||
|
)
|
||
|
else:
|
||
|
out(
|
||
|
{
|
||
|
"measurement": topic,
|
||
|
},
|
||
|
msg.payload.decode(),
|
||
|
)
|
||
|
except Exception:
|
||
|
LOG.exception(f"could not parse {msg.topic!r} {msg.payload!r}")
|
||
|
|
||
|
|
||
|
try:
|
||
|
LOG.info("mooooin!")
|
||
|
mqtt = mqtt.Client()
|
||
|
mqtt.on_connect = on_connect
|
||
|
mqtt.on_disconnect = on_disconnect
|
||
|
mqtt.on_message = on_message
|
||
|
mqtt.connect(config["mqtt"]["host"], config["mqtt"]["port"], 10)
|
||
|
|
||
|
LOG.info("starting loop")
|
||
|
mqtt.loop_forever()
|
||
|
LOG.info("bye")
|
||
|
except Exception:
|
||
|
LOG.exception("oops")
|