sdm630_mqtt/printout.py

110 lines
2.6 KiB
Python
Raw Normal View History

2024-07-27 15:26:51 +00:00
import logging
from time import sleep
import paho.mqtt.client as mqtt
from rich.align import Align
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
MQTT_HOST = "nas.home.kunbox.net"
MQTT_SUB = "sdm630/"
TITLE = "SDM630"
TABLE_LAYOUT = {
"row1": {
"voltage": {
"name": "Voltage",
"suffix": "V",
},
"current": {
"name": "Current",
"suffix": "A",
},
"current-demand": {
"name": "Current Demand",
"suffix": "A",
},
},
"row2": {
"apparent-power": {
"name": "Apparent Power",
"suffix": "W",
},
"reactive-power": {
"name": "Reactive Power",
"suffix": "VAr",
},
"power-factor": {
"name": "Power Factor",
"suffix": "",
},
},
}
# logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
mqtt_data = {}
def on_connect(client, userdata, flags, rc):
LOG.info(f"Connected to mqtt server")
mqtt.subscribe(f"{MQTT_SUB}#")
def on_disconnect(client, userdata, rc):
LOG.info(f"Disconnected from mqtt server")
def on_message(client, userdata, msg):
value = msg.payload.decode()
try:
value = round(float(value), 2)
except Exception:
pass
mqtt_data[msg.topic[len(MQTT_SUB) :]] = value
def generate_layout():
layout = Layout()
layout.split_column(
Layout(Panel(Align.center(TITLE)), name="header"),
Layout(name="main"),
)
layout["header"].size = 3
layout["main"].split_column(*[Layout(name=k) for k in TABLE_LAYOUT.keys()])
for k, i in TABLE_LAYOUT.items():
layout["main"][k].split_row(*[Layout(name=v) for v in i.keys()])
for k, i in TABLE_LAYOUT.items():
for k2, v in i.items():
table = Table(
title=v["name"],
show_header=False,
box=None,
)
for topic, value in sorted(mqtt_data.items()):
if topic.startswith(f"{k2}/"):
table.add_row(topic[len(f"{k2}/") :], f"{value} {v['suffix']}")
layout["main"][k][k2].update(Panel(Align.center(table)))
return layout
try:
mqtt = mqtt.Client()
mqtt.on_connect = on_connect
mqtt.on_disconnect = on_disconnect
mqtt.on_message = on_message
mqtt.connect(MQTT_HOST, 1883, 10)
mqtt.loop_start()
with Live(generate_layout(), screen=True) as live:
while True:
live.update(generate_layout())
sleep(0.5)
except Exception:
LOG.exception("oops")