#!/usr/bin/env python3 import logging from json import dumps from time import sleep import paho.mqtt.client as mqtt from requests import RequestException, get try: # python 3.11 from tomllib import loads as toml_load except ImportError: from rtoml import load as toml_load with open("config.toml") as f: CONFIG = toml_load(f.read()) logging.basicConfig( format="[%(levelname)s %(name)s] %(message)s", level=logging.INFO, ) LOG = logging.getLogger("main") MLOG = logging.getLogger("mqtt") state = None client = mqtt.Client() client.username_pw_set(CONFIG["mqtt"]["user"], CONFIG["mqtt"]["password"]) client.connect(CONFIG["mqtt"]["host"], 1883, 60) client.loop_start() def mqtt_out(message, level="INFO", device=None): key = "infobeamer" if device: key += f"/{device['id']}" message = f"[{device['description']}] {message}" client.publish( CONFIG["mqtt"]["topic"], dumps( { "level": level, "component": key, "msg": message, } ), ) def mqtt_dump_state(device): mqtt_out( "Sync status: {} - Location: {} - Running Setup: {} ({}) - Resolution: {}".format( "yes" if device["is_synced"] else "unknown", device["location"], device["setup"]["name"], device["setup"]["id"], device["run"].get("resolution", "unknown"), ), device=device, ) mqtt_out("Monitor starting up") while True: try: try: r = get("https://info-beamer.com/api/v1/device/list", auth=("", CONFIG["api_key"])) r.raise_for_status() ib_state = r.json()["devices"] except RequestException as e: LOG.exception("Could not get data from info-beamer") mqtt_out( f"Could not get data from info-beamer: {e!r}", level="WARN", ) else: new_state = {} for device in ib_state: did = str(device["id"]) if did in new_state: mqtt_out("DUPLICATE DETECTED!", level="ERROR", device=device) continue new_state[did] = device must_dump_state = False if state is not None: if did not in state: LOG.info( "new device found: {} [{}]".format( did, device["description"], ) ) mqtt_out( 'new device found with name "{}"!'.format( device["description"] ), device=device, ) if device["is_online"]: must_dump_state = True else: if device["is_online"] != state[did]["is_online"]: online_status = ( "online from {}".format(device["run"]["public_addr"]) if device["is_online"] else "offline" ) LOG.info("device {} is now {}".format(did, online_status)) mqtt_out( f"online status changed to {online_status}", device=device, ) if device["is_online"]: must_dump_state = True if device["is_online"]: if device["maintenance"]: mqtt_out( "maintenance required: {}".join( sorted(device["maintenance"]) ), level="WARN", device=device, ) if ( device["is_synced"] != state[did]["is_synced"] or device["location"] != state[did]["location"] or device["setup"]["name"] != state[did]["setup"]["name"] or device["run"].get("resolution") != state[did]["run"].get("resolution") ): must_dump_state = True if must_dump_state: mqtt_dump_state(device) else: LOG.info("adding device {} to empty state".format(device["id"])) state = new_state sleep(30) except KeyboardInterrupt: break mqtt_out("Monitor exiting")