import logging from sys import argv, exit from time import sleep from tomllib import load import paho.mqtt.client as mqtt from rich.align import Align from rich.layout import Layout from rich.live import Live from rich.panel import Panel from rich.table import Table try: with open(argv[1], "rb") as f: config = load(f) TITLE = config["printout"]["title"] MQTT_SUB = "{}/#".format(config["mqtt"]["prefix"]) except Exception as e: print(f"Usage: {argv[0]} config.toml") exit(1) TABLE_LAYOUT = { "row1": { "voltage": { "name": "Voltage", "suffix": "V", }, "current": { "name": "Current", "suffix": "A", }, "current-demand": { "name": "Current Demand", "suffix": "A", }, }, "row2": { "active-power": { "name": "Apparent Power", "suffix": "W", }, "reactive-power": { "name": "Reactive Power", "suffix": "VAr", }, "power-factor": { "name": "Power Factor", "suffix": "", }, }, } logging.basicConfig(level=logging.DEBUG) LOG = logging.getLogger(__name__) mqtt_data = {} def on_connect(client, userdata, flags, rc): LOG.info(f"Connected to mqtt server") mqtt.subscribe(MQTT_SUB) LOG.info(f"subscribing to {MQTT_SUB}") def on_disconnect(client, userdata, rc): LOG.info(f"Disconnected from mqtt server") def on_message(client, userdata, msg): value = msg.payload.decode() try: value = round(float(value), 2) except Exception: pass mqtt_data[msg.topic[len(MQTT_SUB) :]] = value def generate_layout(): layout = Layout() layout.split_column( Layout(Panel(Align.center(TITLE)), name="header"), Layout(name="main"), ) layout["header"].size = 3 layout["main"].split_column(*[Layout(name=k) for k in TABLE_LAYOUT.keys()]) for k, i in TABLE_LAYOUT.items(): layout["main"][k].split_row(*[Layout(name=v) for v in i.keys()]) for k, i in TABLE_LAYOUT.items(): for k2, v in i.items(): table = Table( title=v["name"], show_header=False, box=None, ) for topic, value in sorted(mqtt_data.items()): if topic.startswith(f"{k2}/"): table.add_row(topic[len(f"{k2}/") :], f"{value} {v['suffix']}") layout["main"][k][k2].update(Panel(Align.center(table))) return layout try: LOG.info('mooooin!') mqtt = mqtt.Client() mqtt.on_connect = on_connect mqtt.on_disconnect = on_disconnect mqtt.on_message = on_message mqtt.connect(config["mqtt"]["host"], config["mqtt"]["port"], 10) LOG.info('starting loop') mqtt.loop_start() with Live(generate_layout(), screen=True) as live: while True: live.update(generate_layout()) sleep(0.5) mqtt.loop_stop() LOG.info('bye') except Exception: LOG.exception("oops")