commit afb4290b11823ff16b731185fdcbf30e6c0b2344 Author: Franziska Kunsmann Date: Tue Jan 9 11:08:33 2024 +0100 initial commit diff --git a/hosted.py b/hosted.py new file mode 100644 index 0000000..274d36c --- /dev/null +++ b/hosted.py @@ -0,0 +1,1374 @@ +# +# Part of info-beamer hosted. You can find the latest version +# of this file at: +# +# https://github.com/info-beamer/package-sdk +# +# Copyright (c) 2014-2020 Florian Wesch +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the +# distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +VERSION = "1.9" + +import os, re, sys, json, time, traceback, marshal, hashlib +import errno, socket, select, threading, Queue, ctypes +import pyinotify, requests +from functools import wraps +from collections import namedtuple +from tempfile import NamedTemporaryFile + +types = {} + +def init_types(): + def type(fn): + types[fn.__name__] = fn + return fn + + @type + def color(value): + return value + + @type + def string(value): + return value + + @type + def text(value): + return value + + @type + def section(value): + return value + + @type + def boolean(value): + return value + + @type + def select(value): + return value + + @type + def duration(value): + return value + + @type + def integer(value): + return value + + @type + def float(value): + return value + + @type + def font(value): + return value + + @type + def device(value): + return value + + @type + def resource(value): + return value + + @type + def device_token(value): + return value + + @type + def json(value): + return value + + @type + def custom(value): + return value + + @type + def date(value): + return value + +init_types() + +def log(msg, name='hosted.py'): + sys.stderr.write("[{}] {}\n".format(name, msg)) + +def abort_service(reason): + log("restarting service (%s)" % reason) + os._exit(0) + time.sleep(2) + os.kill(os.getpid(), 2) + time.sleep(2) + os.kill(os.getpid(), 15) + time.sleep(2) + os.kill(os.getpid(), 9) + time.sleep(100) + +CLOCK_MONOTONIC_RAW = 4 # see + +class timespec(ctypes.Structure): + _fields_ = [ + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long), + ] + +librt = ctypes.CDLL('librt.so.1') +clock_gettime = librt.clock_gettime +clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)] + +def monotonic_time(): + t = timespec() + clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t)) + return t.tv_sec + t.tv_nsec * 1e-9 + +class InfoBeamerQueryException(Exception): + pass + +class InfoBeamerQuery(object): + def __init__(self, host='127.0.0.1', port=4444): + self._sock = None + self._conn = None + self._host = host + self._port = port + self._timeout = 2 + self._version = None + + def _reconnect(self): + if self._conn is not None: + return + try: + self._sock = socket.create_connection((self._host, self._port), self._timeout) + self._conn = self._sock.makefile() + intro = self._conn.readline() + except socket.timeout: + self._reset() + raise InfoBeamerQueryException("Timeout while reopening connection") + except socket.error as err: + self._reset() + raise InfoBeamerQueryException("Cannot connect to %s:%s: %s" % ( + self._host, self._port, err)) + m = re.match("^Info Beamer PI ([^ ]+)", intro) + if not m: + self._reset() + raise InfoBeamerQueryException("Invalid handshake. Not info-beamer?") + self._version = m.group(1) + + def _parse_line(self): + line = self._conn.readline() + if not line: + return None + return line.rstrip() + + def _parse_multi_line(self): + lines = [] + while 1: + line = self._conn.readline() + if not line: + return None + line = line.rstrip() + if not line: + break + lines.append(line) + return '\n'.join(lines) + + def _send_cmd(self, min_version, cmd, multiline=False): + for retry in (1, 2): + self._reconnect() + if self._version <= min_version: + raise InfoBeamerQueryException( + "This query is not implemented in your version of info-beamer. " + "%s or higher required, %s found" % (min_version, self._version) + ) + try: + self._conn.write(cmd + "\n") + self._conn.flush() + response = self._parse_multi_line() if multiline else self._parse_line() + if response is None: + self._reset() + continue + return response + except socket.error: + self._reset() + continue + except socket.timeout: + self._reset() + raise InfoBeamerQueryException("Timeout waiting for response") + except Exception: + self._reset() + continue + raise InfoBeamerQueryException("Failed to get a response") + + def _reset(self, close=True): + if close: + try: + if self._conn: self._conn.close() + if self._sock: self._sock.close() + except: + pass + self._conn = None + self._sock = None + + @property + def addr(self): + return "%s:%s" % (self._host, self._port) + + def close(self): + self._reset() + + @property + def ping(self): + "tests if info-beamer is reachable" + return self._send_cmd( + "0.6", "*query/*ping", + ) == "pong" + + @property + def uptime(self): + "returns the uptime in seconds" + return int(self._send_cmd( + "0.6", "*query/*uptime", + )) + + @property + def objects(self): + "returns the number of allocated info-beamer objects" + return int(self._send_cmd( + "0.9.4", "*query/*objects", + )) + + @property + def version(self): + "returns the running info-beamer version" + return self._send_cmd( + "0.6", "*query/*version", + ) + + @property + def fps(self): + "returns the FPS of the top level node" + return float(self._send_cmd( + "0.6", "*query/*fps", + )) + + @property + def display(self): + "returns the display configuration" + return json.loads(self._send_cmd( + "1.0", "*query/*display", + )) + + ResourceUsage = namedtuple("ResourceUsage", "user_time system_time memory") + @property + def resources(self): + "returns information about used resources" + return self.ResourceUsage._make(int(v) for v in self._send_cmd( + "0.6", "*query/*resources", + ).split(',')) + + ScreenSize = namedtuple("ScreenSize", "width height") + @property + def screen(self): + "returns the native screen size" + return self.ScreenSize._make(int(v) for v in self._send_cmd( + "0.8.1", "*query/*screen", + ).split(',')) + + @property + def runid(self): + "returns a unique run id that changes with every restart of info-beamer" + return self._send_cmd( + "0.9.0", "*query/*runid", + ) + + @property + def nodes(self): + "returns a list of nodes" + nodes = self._send_cmd( + "0.9.3", "*query/*nodes", + ).split(',') + return [] if not nodes[0] else nodes + + class Node(object): + def __init__(self, ib, path): + self._ib = ib + self._path = path + + @property + def mem(self): + "returns the Lua memory usage of this node" + return int(self._ib._send_cmd( + "0.6", "*query/*mem/%s" % self._path + )) + + @property + def fps(self): + "returns the framerate of this node" + return float(self._ib._send_cmd( + "0.6", "*query/*fps/%s" % self._path + )) + + def io(self, raw=True): + "creates a tcp connection to this node" + status = self._ib._send_cmd( + "0.6", "%s%s" % ("*raw/" if raw else '', self._path), + ) + if status != 'ok!': + raise InfoBeamerQueryException("Cannot connect to node %s" % self._path) + sock = self._ib._sock + sock.settimeout(None) + return self._ib._conn + + @property + def has_error(self): + "queries the error flag" + return bool(int(self._ib._send_cmd( + "0.8.2", "*query/*has_error/%s" % self._path, + ))) + + @property + def error(self): + "returns the last Lua traceback" + return self._ib._send_cmd( + "0.8.2", "*query/*error/%s" % self._path, multiline=True + ) + + def __repr__(self): + return "%s/%s" % (self._ib, self._path) + + def node(self, node): + return self.Node(self, node) + + def __repr__(self): + return "" % self.addr + + +class Configuration(object): + def __init__(self): + self._restart = False + self._options = [] + self._config = {} + self._parsed = {} + self.parse_node_json(do_update=False) + self.parse_config_json() + + def restart_on_update(self): + log("going to restart when config is updated") + self._restart = True + + def parse_node_json(self, do_update=True): + with open("node.json") as f: + self._options = json.load(f).get('options', []) + if do_update: + self.update_config() + + def parse_config_json(self, do_update=True): + with open("config.json") as f: + self._config = json.load(f) + if do_update: + self.update_config() + + def update_config(self): + if self._restart: + return abort_service("restart_on_update set") + + def parse_recursive(options, config, target): + # print 'parsing', config + for option in options: + if not 'name' in option: + continue + if option['type'] == 'list': + items = [] + for item in config[option['name']]: + parsed = {} + parse_recursive(option['items'], item, parsed) + items.append(parsed) + target[option['name']] = items + continue + target[option['name']] = types[option['type']](config[option['name']]) + + parsed = {} + parse_recursive(self._options, self._config, parsed) + log("updated config") + self._parsed = parsed + + @property + def raw(self): + return self._config + + @property + def metadata(self): + return self._config['__metadata'] + + def __getitem__(self, key): + return self._parsed[key] + + def __getattr__(self, key): + return self._parsed[key] + +def setup_inotify(configuration): + class EventHandler(pyinotify.ProcessEvent): + def process_default(self, event): + basename = os.path.basename(event.pathname) + if basename == 'node.json': + log("node.json changed") + configuration.parse_node_json() + elif basename == 'config.json': + log("config.json changed!") + configuration.parse_config_json() + elif basename.endswith('.py'): + abort_service("python file changed") + + wm = pyinotify.WatchManager() + + notifier = pyinotify.ThreadedNotifier(wm, EventHandler()) + notifier.daemon = True + notifier.start() + + wm.add_watch('.', pyinotify.IN_MOVED_TO) + +class RPC(object): + def __init__(self, path, callbacks): + self._path = path + self._callbacks = callbacks + self._lock = threading.Lock() + self._con = None + thread = threading.Thread(target=self._listen_thread) + thread.daemon = True + thread.start() + + def _get_connection(self): + if self._con is None: + try: + self._con = InfoBeamerQuery().node( + self._path + "/rpc/python" + ).io(raw=True) + except InfoBeamerQueryException: + return None + return self._con + + def _close_connection(self): + with self._lock: + if self._con: + try: + self._con.close() + except: + pass + self._con = None + + def _send(self, line): + with self._lock: + con = self._get_connection() + if con is None: + return + try: + con.write(line + '\n') + con.flush() + return True + except: + self._close_connection() + return False + + def _recv(self): + with self._lock: + con = self._get_connection() + try: + return con.readline() + except: + self._close_connection() + + def _listen_thread(self): + while 1: + line = self._recv() + if not line: + self._close_connection() + time.sleep(0.5) + continue + try: + args = json.loads(line) + method = args.pop(0) + callback = self._callbacks.get(method) + if callback: + callback(*args) + else: + log("callback '%s' not found" % (method,)) + except: + traceback.print_exc() + + def register(self, name, fn): + self._callbacks[name] = fn + + def call(self, fn): + self.register(fn.__name__, fn) + + def __getattr__(self, method): + def call(*args): + args = list(args) + args.insert(0, method) + return self._send(json.dumps( + args, + ensure_ascii=False, + separators=(',',':'), + ).encode('utf8')) + return call + +class Cache(object): + def __init__(self, scope='default'): + self._touched = set() + self._prefix = 'cache-%s-' % scope + + def key_to_fname(self, key): + return self._prefix + hashlib.md5(key).hexdigest() + + def has(self, key, max_age=None): + try: + stat = os.stat(self.key_to_fname(key)) + if max_age is not None: + now = time.time() + if now > stat.st_mtime + max_age: + return False + return True + except: + return False + + def get(self, key, max_age=None): + try: + with open(self.file_ref(key)) as f: + if max_age is not None: + stat = os.fstat(f.fileno()) + now = time.time() + if now > stat.st_mtime + max_age: + return None + return f.read() + except: + return None + + def get_json(self, key, max_age=None): + data = self.get(key, max_age) + if data is None: + return None + return json.loads(data) + + def set(self, key, value): + with open(self.file_ref(key), "wb") as f: + f.write(value) + + def set_json(self, key, data): + self.set(key, json.dumps(data)) + + def file_ref(self, key): + fname = self.key_to_fname(key) + self._touched.add(fname) + return fname + + def start(self): + self._touched = set() + + def prune(self): + existing = set() + for fname in os.listdir("."): + if not fname.startswith(self._prefix): + continue + existing.add(fname) + prunable = existing - self._touched + for fname in prunable: + try: + log("pruning %s" % fname) + os.unlink(fname) + except: + pass + + def clear(self): + self.start() + self.prune() + + def call(self, max_age=None): + def deco(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + key = marshal.dumps((fn.__name__, args, kwargs), 2) + cached = self.get(key, max_age) + if cached is not None: + return marshal.loads(cached) + val = fn(*args, **kwargs) + self.set(key, marshal.dumps(val, 2)) + return val + return wrapper + return deco + + def file_producer(self, max_age=None): + def deco(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + key = marshal.dumps((fn.__name__, args, kwargs), 2) + if self.has(key, max_age): + return self.file_ref(key) + val = fn(*args, **kwargs) + if val is None: + return None + self.set(key, val) + return self.file_ref(key) + return wrapper + return deco + +class Node(object): + def __init__(self, node): + self._node = node + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + def send_raw(self, raw): + log("sending %r" % (raw,)) + self._sock.sendto(raw, ('127.0.0.1', 4444)) + + def send(self, data): + self.send_raw(self._node + data) + + def send_json(self, path, data): + self.send('%s:%s' % (path, json.dumps( + data, + ensure_ascii=False, + separators=(',',':'), + ).encode('utf8'))) + + @property + def is_top_level(self): + return self._node == "root" + + @property + def path(self): + return self._node + + def write_file(self, filename, content): + f = NamedTemporaryFile(prefix='.hosted-py-tmp', dir=os.getcwd()) + try: + f.write(content) + except: + traceback.print_exc() + f.close() + raise + else: + f.delete = False + f.close() + os.rename(f.name, filename) + + def write_json(self, filename, data): + self.write_file(filename, json.dumps( + data, + ensure_ascii=False, + separators=(',',':'), + ).encode('utf8')) + + class Sender(object): + def __init__(self, node, path): + self._node = node + self._path = path + + def __call__(self, data): + if isinstance(data, (dict, list)): + raw = "%s:%s" % (self._path, json.dumps( + data, + ensure_ascii=False, + separators=(',',':'), + ).encode('utf8')) + else: + raw = "%s:%s" % (self._path, data) + self._node.send_raw(raw) + + def __getitem__(self, path): + return self.Sender(self, self._node + path) + + def __call__(self, data): + return self.Sender(self, self._node)(data) + + def connect(self, suffix=""): + ib = InfoBeamerQuery() + return ib.node(self.path + suffix).io(raw=True) + + def rpc(self, **callbacks): + return RPC(self.path, callbacks) + + def cache(self, scope='default'): + return Cache(scope) + + def scratch_cached(self, filename, generator): + cached = os.path.join(os.environ['SCRATCH'], filename) + + if not os.path.exists(cached): + f = NamedTemporaryFile(prefix='scratch-cached-tmp', dir=os.environ['SCRATCH']) + try: + generator(f) + except: + raise + else: + f.delete = False + f.close() + os.rename(f.name, cached) + + if os.path.exists(filename): + try: + os.unlink(filename) + except: + pass + os.symlink(cached, filename) + +class APIError(Exception): + pass + +class APIProxy(object): + def __init__(self, apis, api_name): + self._apis = apis + self._api_name = api_name + + @property + def url(self): + index = self._apis.get_api_index() + if not self._api_name in index: + raise APIError("api '%s' not available" % (self._api_name,)) + return index[self._api_name]['url'] + + def unwrap(self, r): + r.raise_for_status() + if r.status_code == 304: + return None + if r.headers['content-type'] == 'application/json': + resp = r.json() + if not resp['ok']: + raise APIError(u"api call failed: %s" % ( + resp.get('error', ''), + )) + return resp.get(self._api_name) + else: + return r.content + + def add_default_args(self, kwargs): + if not 'timeout' in kwargs: + kwargs['timeout'] = 10 + return kwargs + + def get(self, **kwargs): + try: + return self.unwrap(self._apis.session.get( + url = self.url, + **self.add_default_args(kwargs) + )) + except APIError: + raise + except Exception as err: + raise APIError(err) + + def post(self, **kwargs): + try: + return self.unwrap(self._apis.session.post( + url = self.url, + **self.add_default_args(kwargs) + )) + except APIError: + raise + except Exception as err: + raise APIError(err) + + def delete(self, **kwargs): + try: + return self.unwrap(self._apis.session.delete( + url = self.url, + **self.add_default_args(kwargs) + )) + except APIError: + raise + except Exception as err: + raise APIError(err) + + +class OnDeviceAPIs(object): + def __init__(self, config): + self._config = config + self._index = None + self._valid_until = 0 + self._lock = threading.Lock() + self._session = requests.Session() + self._session.headers.update({ + 'User-Agent': 'hosted.py version/%s' % (VERSION,) + }) + + def update_apis(self): + log("fetching api index") + r = self._session.get( + url = self._config.metadata['api'], + timeout = 5, + ) + r.raise_for_status() + resp = r.json() + if not resp['ok']: + raise APIError("cannot retrieve api index") + self._index = resp['apis'] + self._valid_until = resp['valid_until'] - 300 + + def get_api_index(self): + with self._lock: + now = time.time() + if now > self._valid_until: + self.update_apis() + return self._index + + @property + def session(self): + return self._session + + def list(self): + try: + index = self.get_api_index() + return sorted(index.keys()) + except Exception as err: + raise APIError(err) + + def __getitem__(self, api_name): + return APIProxy(self, api_name) + + def __getattr__(self, api_name): + return APIProxy(self, api_name) + +class HostedAPI(object): + def __init__(self, api, on_device_token): + self._api = api + self._on_device_token = on_device_token + self._lock = threading.Lock() + self._next_refresh = 0 + self._api_key = None + self._uses = 0 + self._expire = 0 + self._base_url = None + self._session = requests.Session() + self._session.headers.update({ + 'User-Agent': 'hosted.py version/%s - on-device' % (VERSION,) + }) + + def use_api_key(self): + with self._lock: + now = time.time() + self._uses -= 1 + if self._uses <= 0: + log('hosted API adhoc key used up') + self._api_key = None + elif now > self._expire: + log('hosted API adhoc key expired') + self._api_key = None + else: + log('hosted API adhoc key usage: %d uses, %ds left' %( + self._uses, self._expire - now + )) + if self._api_key is None: + if time.time() < self._next_refresh: + return None + log('refreshing hosted API adhoc key') + self._next_refresh = time.time() + 15 + try: + r = self._api['api_key'].get( + params = dict( + on_device_token = self._on_device_token + ), + timeout = 5, + ) + except: + return None + self._api_key = r['api_key'] + self._uses = r['uses'] + self._expire = now + r['expire'] - 1 + self._base_url = r['base_url'] + return self._api_key + + def add_default_args(self, kwargs): + if not 'timeout' in kwargs: + kwargs['timeout'] = 10 + return kwargs + + def ensure_api_key(self, kwargs): + api_key = self.use_api_key() + if api_key is None: + raise APIError('cannot retrieve API key') + kwargs['auth'] = ('', api_key) + + def get(self, endpoint, **kwargs): + try: + self.ensure_api_key(kwargs) + r = self._session.get( + url = self._base_url + endpoint, + **self.add_default_args(kwargs) + ) + r.raise_for_status() + return r.json() + except APIError: + raise + except Exception as err: + raise APIError(err) + + def post(self, endpoint, **kwargs): + try: + self.ensure_api_key(kwargs) + r = self._session.post( + url = self._base_url + endpoint, + **self.add_default_args(kwargs) + ) + r.raise_for_status() + return r.json() + except APIError: + raise + except Exception as err: + raise APIError(err) + + def delete(self, endpoint, **kwargs): + try: + self.ensure_api_key(kwargs) + r = self._session.delete( + url = self._base_url + endpoint, + **self.add_default_args(kwargs) + ) + r.raise_for_status() + return r.json() + except APIError: + raise + except Exception as err: + raise APIError(err) + +class DeviceKV(object): + def __init__(self, api): + self._api = api + self._cache = {} + self._cache_complete = False + self._use_cache = True + + def cache_enabled(self, enabled): + self._use_cache = enabled + self._cache = {} + self._cache_complete = False + + def __setitem__(self, key, value): + if self._use_cache: + if key in self._cache and self._cache[key] == value: + return + self._api['kv'].post( + data = { + key: value + } + ) + if self._use_cache: + self._cache[key] = value + + def __getitem__(self, key): + if self._use_cache: + if key in self._cache: + return self._cache[key] + result = self._api['kv'].get( + params = dict( + keys = key, + ), + timeout = 5, + )['v'] + if key not in result: + raise KeyError(key) + value = result[key] + if self._use_cache: + self._cache[key] = value + return value + + # http api cannot reliably determine if a key has + # been deleted, so __delitem__ always succeeds and + # does not throw KeyError for missing keys. + def __delitem__(self, key): + if self._use_cache and self._cache_complete: + if key not in self._cache: + return + self._api['kv'].delete( + params = dict( + keys = key, + ), + timeout = 5, + ) + if self._use_cache and key in self._cache: + if key in self._cache: + del self._cache[key] + + def update(self, dct): + if self._use_cache: + for key, value in dct.items(): + if key in self._cache and self._cache[key] == value: + dct.pop(key) + if not dct: + return + self._api['kv'].post( + data = dct + ) + if self._use_cache: + for key, value in dct.iteritems(): + self._cache[key] = value + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return default + + def items(self): + if self._use_cache and self._cache_complete: + return self._cache.items() + result = self._api['kv'].get( + timeout = 5, + )['v'] + if self._use_cache: + for key, value in result.iteritems(): + self._cache[key] = value + self._cache_complete = True + return result.items() + + iteritems = items + + def clear(self): + self._api['kv'].delete() + if self._use_cache: + self._cache = {} + self._cache_complete = False + +class GPIO(object): + def __init__(self): + self._pin_fd = {} + self._state = {} + self._fd_2_pin = {} + self._poll = select.poll() + self._lock = threading.Lock() + + def setup_pin(self, pin, direction="in", invert=False): + if not os.path.exists("/sys/class/gpio/gpio%d" % pin): + with open("/sys/class/gpio/export", "wb") as f: + f.write(str(pin)) + # mdev is giving the newly create GPIO directory correct permissions. + for i in range(10): + try: + with open("/sys/class/gpio/gpio%d/active_low" % pin, "wb") as f: + f.write("1" if invert else "0") + break + except IOError as err: + if err.errno != errno.EACCES: + raise + time.sleep(0.1) + log("waiting for GPIO permissions") + else: + raise IOError(errno.EACCES, "Cannot access GPIO") + with open("/sys/class/gpio/gpio%d/direction" % pin, "wb") as f: + f.write(direction) + + def set_pin_value(self, pin, high): + with open("/sys/class/gpio/gpio%d/value" % pin, "wb") as f: + f.write("1" if high else "0") + + def monitor(self, pin, invert=False): + if pin in self._pin_fd: + return + self.setup_pin(pin, direction="in", invert=invert) + with open("/sys/class/gpio/gpio%d/edge" % pin, "wb") as f: + f.write("both") + fd = os.open("/sys/class/gpio/gpio%d/value" % pin, os.O_RDONLY) + self._state[pin] = bool(int(os.read(fd, 5))) + self._fd_2_pin[fd] = pin + self._pin_fd[pin] = fd + self._poll.register(fd, select.POLLPRI | select.POLLERR) + + def poll(self, timeout=1000): + changes = [] + for fd, evt in self._poll.poll(timeout): + os.lseek(fd, 0, 0) + state = bool(int(os.read(fd, 5))) + pin = self._fd_2_pin[fd] + with self._lock: + prev_state, self._state[pin] = self._state[pin], state + if state != prev_state: + changes.append((pin, state)) + return changes + + def poll_forever(self): + while 1: + for event in self.poll(): + yield event + + def on(self, pin): + with self._lock: + return self._state.get(pin, False) + +class SyncerAPI(object): + def __init__(self): + self._session = requests.Session() + + def unwrap(self, r): + r.raise_for_status() + return r.json() + + def get(self, path, params={}): + return self.unwrap(self._session.get( + 'http://127.0.0.1:81%s' % path, + params=params, timeout=10 + )) + + def post(self, path, data={}): + return self.unwrap(self._session.post( + 'http://127.0.0.1:81%s' % path, + data=data, timeout=10 + )) + +class ProofOfPlay(object): + def __init__(self, api, dirname): + self._api = api + self._prefix = os.path.join(os.environ['SCRATCH'], dirname) + try: + os.makedirs(self._prefix) + except: + pass + + pop_info = self._api.pop.get() + + self._max_delay = pop_info['max_delay'] + self._max_lines = pop_info['max_lines'] + self._submission_min_delay = pop_info['submission']['min_delay'] + self._submission_error_delay = pop_info['submission']['error_delay'] + + self._q = Queue.Queue() + self._log = None + + thread = threading.Thread(target=self._submit_thread) + thread.daemon = True + thread.start() + + thread = threading.Thread(target=self._writer_thread) + thread.daemon = True + thread.start() + + def _submit(self, fname, queue_size): + with open(fname, 'rb') as f: + return self._api.pop.post( + timeout = 10, + data = { + 'queue_size': queue_size, + }, + files={ + 'pop-v1': f, + } + ) + + def _submit_thread(self): + time.sleep(3) + while 1: + delay = self._submission_min_delay + try: + log('[pop][submit] gathering files') + files = [ + fname for fname + in os.listdir(self._prefix) + if fname.startswith('submit-') + ] + log('[pop][submit] %d files' % len(files)) + for fname in files: + fullname = os.path.join(self._prefix, fname) + if os.stat(fullname).st_size == 0: + os.unlink(fullname) + continue + try: + log('[pop][submit] submitting %s' % fullname) + status = self._submit(fullname, len(files)) + if status['disabled']: + log('[pop][submit] WARNING: Proof of Play disabled for this device. Submission discarded') + else: + log('[pop][submit] success') + except APIError as err: + log('[pop][submit] failure to submit log %s: %s' % ( + fullname, err + )) + delay = self._submission_error_delay + break + os.unlink(fullname) + break + if not files: + delay = 10 + except Exception as err: + log('[pop][submit] error: %s' % err) + log('[pop][submit] sleeping %ds' % delay) + time.sleep(delay) + + def reopen_log(self): + log_name = os.path.join(self._prefix, 'current.log') + if self._log is not None: + self._log.close() + self._log = None + if os.path.exists(log_name): + os.rename(log_name, os.path.join( + self._prefix, 'submit-%s.log' % os.urandom(16).encode('hex') + )) + self._log = open(log_name, 'wb') + return self._log + + def _writer_thread(self): + submit, log_file, lines = monotonic_time() + self._max_delay, self.reopen_log(), 0 + while 1: + reopen = False + max_wait = max(0.1, submit - monotonic_time()) + log('[pop] got %d lines. waiting %ds for more log lines' % (lines, max_wait)) + try: + line = self._q.get(block=True, timeout=max_wait) + log_file.write(line + '\n') + log_file.flush() + os.fsync(log_file.fileno()) + lines += 1 + log('[pop] line added: %r' % line) + except Queue.Empty: + if lines == 0: + submit += self._max_delay # extend deadline + else: + reopen = True + except Exception as err: + log("[pop] error writing pop log line") + if lines >= self._max_lines: + reopen = True + if reopen: + log('[pop] closing log of %d lines' % lines) + submit, log_file, lines = monotonic_time() + self._max_delay, self.reopen_log(), 0 + + def log(self, play_start, duration, asset_id, asset_filename): + uuid = "%08x%s" % ( + time.time(), os.urandom(12).encode('hex') + ) + self._q.put(json.dumps([ + uuid, + play_start, + duration, + 0 if asset_id is None else asset_id, + asset_filename, + ], + ensure_ascii = False, + separators = (',',':'), + ).encode('utf8')) + +class Device(object): + def __init__(self, kv, api): + self._socket = None + self._gpio = GPIO() + self._kv = kv + self._api = api + + @property + def kv(self): + return self._kv + + @property + def gpio(self): + return self._gpio + + @property + def serial(self): + return os.environ['SERIAL'] + + @property + def screen_resolution(self): + with open("/sys/class/graphics/fb0/virtual_size", "rb") as f: + return [int(val) for val in f.read().strip().split(',')] + + @property + def screen_w(self): + return self.screen_resolution[0] + + @property + def screen_h(self): + return self.screen_resolution[1] + + @property + def syncer_api(self): + return SyncerAPI() + + def ensure_connected(self): + if self._socket: + return True + try: + log("establishing upstream connection") + self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._socket.connect(os.getenv('SYNCER_SOCKET', "/tmp/syncer")) + return True + except Exception as err: + log("cannot connect to upstream socket: %s" % (err,)) + return False + + def send_raw(self, raw): + try: + if self.ensure_connected(): + self._socket.send(raw + '\n') + except Exception as err: + log("cannot send to upstream: %s" % (err,)) + if self._socket: + self._socket.close() + self._socket = None + + def send_upstream(self, **data): + self.send_raw(json.dumps(data)) + + def turn_screen_off(self): + self.send_raw("tv off") + + def turn_screen_on(self): + self.send_raw("tv on") + + def screen(self, on=True): + if on: + self.turn_screen_on() + else: + self.turn_screen_off() + + def reboot(self): + self.send_raw("system reboot") + + def halt_until_powercycled(self): + self.send_raw("system halt") + + def restart_infobeamer(self): + self.send_raw("infobeamer restart") + + def verify_cache(self): + self.send_raw("syncer verify_cache") + + def pop(self, dirname='pop'): + return ProofOfPlay(self._api, dirname) + + def hosted_api(self, on_device_token): + return HostedAPI(self._api, on_device_token) + +if __name__ == "__main__": + print("nothing to do here") + sys.exit(1) +else: + log("starting version %s" % (VERSION,)) + + node = NODE = Node(os.environ['NODE']) + config = CONFIG = Configuration() + api = API = OnDeviceAPIs(CONFIG) + device = DEVICE = Device( + kv = DeviceKV(api), + api = api, + ) + + setup_inotify(CONFIG) + log("ready to go!") diff --git a/node.json b/node.json new file mode 100644 index 0000000..de39e99 --- /dev/null +++ b/node.json @@ -0,0 +1,7 @@ +{ + "name": "AfRA status", + "permissions": { + "network": "Needs to fetch afra open/close state" + }, + "options": [] +} diff --git a/node.lua b/node.lua new file mode 100644 index 0000000..57ee823 --- /dev/null +++ b/node.lua @@ -0,0 +1 @@ +-- unused diff --git a/package.json b/package.json new file mode 100644 index 0000000..5c19155 --- /dev/null +++ b/package.json @@ -0,0 +1,10 @@ +{ + "name": "AfRA status power switcher", + "author": "kunsi", + "desc": "switches display power according to space status", + "platforms": ["pi/epoch-1", "pi/epoch-2"], + "offline": { + "support": "no", + "info": "Needs to fetch space status" + } +} diff --git a/package.png b/package.png new file mode 100644 index 0000000..a935a01 Binary files /dev/null and b/package.png differ diff --git a/pinguin-afra-export.svg b/pinguin-afra-export.svg new file mode 100644 index 0000000..e91b78a --- /dev/null +++ b/pinguin-afra-export.svg @@ -0,0 +1,159 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/service b/service new file mode 100644 index 0000000..0da3f26 --- /dev/null +++ b/service @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +import logging +from datetime import datetime, timedelta +from time import sleep + +from pytz import utc +from requests import get + +from hosted import device + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("afra-status") + +last_open = datetime.now(utc) + +while True: + try: + r = get("https://spaceapi.afra-berlin.de/v1/status.json") + r.raise_for_status() + status = r.json() + except Exception as e: + log.exception("error while fetching status") + status = { + "state": { + "open": None, + }, + } + + try: + if status["state"]["open"]: + last_open = datetime.now(utc) + except KeyError: + log.exception("open status was not in state json") + + if datetime.now(utc) - last_open > timedelta(minutes=2): + device.screen(on=False) + else: + device.screen(on=True) + + sleep(30)