bundlewrap/bundles/infobeamer-monitor/files/monitor.py

217 lines
7.1 KiB
Python

#!/usr/bin/env python3
import logging
from datetime import datetime
from json import dumps
from time import sleep
from zoneinfo import ZoneInfo
import paho.mqtt.client as mqtt
from requests import RequestException, get
try:
# python 3.11
from tomllib import loads as toml_load
except ImportError:
from rtoml import load as toml_load
with open("config.toml") as f:
CONFIG = toml_load(f.read())
logging.basicConfig(
format="[%(levelname)s %(name)s] %(message)s",
level=logging.INFO,
)
LOG = logging.getLogger("main")
TZ = ZoneInfo("Europe/Berlin")
DUMP_TIME = "0900"
state = None
client = mqtt.Client()
client.username_pw_set(CONFIG["mqtt"]["user"], CONFIG["mqtt"]["password"])
client.connect(CONFIG["mqtt"]["host"], 1883, 60)
client.loop_start()
def mqtt_out(message, level="INFO", device=None):
key = "infobeamer"
if device:
key += f"/{device['id']}"
if device["description"]:
message = f"[{device['description']}] {message}"
else:
message = f"[{device['serial']}] {message}"
client.publish(
CONFIG["mqtt"]["topic"],
dumps(
{
"level": level,
"component": key,
"msg": message,
}
),
)
def mqtt_dump_state(device):
if not device["is_online"]:
return
out = []
if device["location"]:
out.append("Location: {}".format(device["location"]))
out.append("Setup: {} ({})".format(device["setup"]["name"], device["setup"]["id"]))
out.append("Resolution: {}".format(device["run"].get("resolution", "unknown")))
mqtt_out(
" - ".join(out),
device=device,
)
def is_dump_time():
return datetime.now(TZ).strftime("%H%M") == DUMP_TIME
mqtt_out("Monitor starting up")
while True:
try:
try:
r = get(
"https://info-beamer.com/api/v1/device/list",
auth=("", CONFIG["api_key"]),
)
r.raise_for_status()
ib_state = r.json()["devices"]
except RequestException as e:
LOG.exception("Could not get device data from info-beamer")
mqtt_out(
f"Could not get device data from info-beamer: {e!r}",
level="WARN",
)
else:
new_state = {}
for device in ib_state:
did = str(device["id"])
if did in new_state:
mqtt_out("DUPLICATE DETECTED!", level="ERROR", device=device)
continue
new_state[did] = device
# force information output for every online device at 09:00 CE(S)T
must_dump_state = is_dump_time()
if state is not None:
if did not in state:
LOG.info(
"new device found: {} [{}]".format(
did,
device["description"],
)
)
mqtt_out(
"new device found!",
device=device,
)
must_dump_state = True
else:
if device["is_online"] != state[did]["is_online"]:
online_status = (
"online from {}".format(device["run"]["public_addr"])
if device["is_online"]
else "offline"
)
LOG.info("device {} is now {}".format(did, online_status))
mqtt_out(
f"status changed to {online_status}",
level="INFO" if device["is_online"] else "WARN",
device=device,
)
must_dump_state = True
if device["description"] != state[did]["description"]:
LOG.info(
"device {} changed name to {}".format(
did, device["description"]
)
)
must_dump_state = True
if device["is_online"]:
if device["maintenance"]:
mqtt_out(
"maintenance required: {}".format(
" ".join(sorted(device["maintenance"]))
),
level="WARN",
device=device,
)
if (
device["location"] != state[did]["location"]
or device["setup"]["id"] != state[did]["setup"]["id"]
or device["run"].get("resolution")
!= state[did]["run"].get("resolution")
):
must_dump_state = True
if must_dump_state:
mqtt_dump_state(device)
else:
LOG.info("adding device {} to empty state".format(device["id"]))
state = new_state
try:
r = get(
"https://info-beamer.com/api/v1/account",
auth=("", CONFIG["api_key"]),
)
r.raise_for_status()
ib_account = r.json()
except RequestException as e:
LOG.exception("Could not get account data from info-beamer")
mqtt_out(
f"Could not get account data from info-beamer: {e!r}",
level="WARN",
)
else:
available_credits = ib_account["balance"]
if is_dump_time():
mqtt_out(f"Available Credits: {available_credits}")
if available_credits < 50:
mqtt_out(
f"balance has dropped below 50 credits! (available: {available_credits})",
level="ERROR",
)
elif available_credits < 100:
mqtt_out(
f"balance has dropped below 100 credits! (available: {available_credits})",
level="WARN",
)
for quota_name, quota_config in sorted(ib_account["quotas"].items()):
value = quota_config["count"]["value"]
limit = quota_config["count"]["limit"]
if value > limit * 0.9:
mqtt_out(
f"quota {quota_name} is over 90% (limit {limit}, value {value})",
level="ERROR",
)
elif value > limit * 0.8:
mqtt_out(
f"quota {quota_name} is over 80% (limit {limit}, value {value})",
level="WARN",
)
sleep(60)
except KeyboardInterrupt:
break
mqtt_out("Monitor exiting")