bundlewrap/bundles/infobeamer-monitor/files/monitor.py

183 lines
5.9 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import logging
from datetime import datetime, timezone
from json import dumps
from time import sleep
import paho.mqtt.client as mqtt
from requests import RequestException, get
try:
# python 3.11
from tomllib import loads as toml_load
except ImportError:
from rtoml import load as toml_load
with open("config.toml") as f:
CONFIG = toml_load(f.read())
logging.basicConfig(
format="[%(levelname)s %(name)s] %(message)s",
level=logging.INFO,
)
LOG = logging.getLogger("main")
MLOG = logging.getLogger("mqtt")
state = None
client = mqtt.Client()
client.username_pw_set(CONFIG["mqtt"]["user"], CONFIG["mqtt"]["password"])
client.connect(CONFIG["mqtt"]["host"], 1883, 60)
client.loop_start()
def mqtt_out(message, level="INFO", device=None):
key = "infobeamer"
if device:
key += f"/{device['id']}"
message = f"[{device['description']}] {message}"
client.publish(
CONFIG["mqtt"]["topic"],
dumps(
{
"level": level,
"component": key,
"msg": message,
}
),
)
def mqtt_dump_state(device):
if not device["is_online"]:
return
mqtt_out(
"Sync status: {} - Location: {} - Running Setup: {} ({}) - Resolution: {}".format(
"yes" if device["is_synced"] else "syncing",
device["location"],
device["setup"]["name"],
device["setup"]["id"],
device["run"].get("resolution", "unknown"),
),
device=device,
)
mqtt_out("Monitor starting up")
while True:
try:
try:
r = get(
"https://info-beamer.com/api/v1/device/list",
auth=("", CONFIG["api_key"]),
)
r.raise_for_status()
ib_state = r.json()["devices"]
except RequestException as e:
LOG.exception("Could not get data from info-beamer")
mqtt_out(
f"Could not get data from info-beamer: {e!r}",
level="WARN",
)
else:
new_state = {}
online_devices = set()
for device in ib_state:
did = str(device["id"])
if did in new_state:
mqtt_out("DUPLICATE DETECTED!", level="ERROR", device=device)
continue
new_state[did] = device
must_dump_state = False
if state is not None:
if did not in state:
LOG.info(
"new device found: {} [{}]".format(
did,
device["description"],
)
)
mqtt_out(
"new device found!",
device=device,
)
must_dump_state = True
else:
if device["is_online"] != state[did]["is_online"]:
online_status = (
"online from {}".format(device["run"]["public_addr"])
if device["is_online"]
else "offline"
)
LOG.info("device {} is now {}".format(did, online_status))
mqtt_out(
f"status changed to {online_status}",
level="INFO" if device["is_online"] else "WARN",
device=device,
)
must_dump_state = True
if device["description"] != state[did]["description"]:
LOG.info(
"device {} changed name to {}".format(
did, device["description"]
)
)
must_dump_state = True
if device["is_online"]:
if device["maintenance"]:
mqtt_out(
"maintenance required: {}".join(
sorted(device["maintenance"])
),
level="WARN",
device=device,
)
must_dump_state = True
if (
device["is_synced"] != state[did]["is_synced"]
or device["location"] != state[did]["location"]
or device["setup"]["id"] != state[did]["setup"]["id"]
or device["run"].get("resolution")
!= state[did]["run"].get("resolution")
):
must_dump_state = True
if must_dump_state:
mqtt_dump_state(device)
else:
LOG.info("adding device {} to empty state".format(device["id"]))
if device["is_online"]:
online_devices.add(
"{} ({})".format(
device["id"],
device["description"],
)
)
state = new_state
if (
datetime.now(timezone.utc).strftime("%H%M") == "1312"
and online_devices
and int(datetime.now(timezone.utc).strftime("%S")) < 30
):
mqtt_out("Online Devices: {}".format(", ".join(sorted(online_devices))))
sleep(30)
except KeyboardInterrupt:
break
mqtt_out("Monitor exiting")