#!/usr/bin/env python3 from json import dump from os import environ from os.path import dirname, join from sys import exit import bwpass from requests import post from bundlewrap.utils.text import validate_name TOKEN = environ.get("NETBOX_AUTH_TOKEN") # editorconfig-checker-disable QUERY = """{ device_list(tag: "bundlewrap") { name site { id } interfaces { id name enabled description mode type ip_addresses { address } untagged_vlan { name } tagged_vlans { name } link_peers { ... on InterfaceType { name device { name } } ... on FrontPortType { name device { name } } } connected_endpoints { ... on InterfaceType { name device { name } } } } } site_list { id vlans { name vid } } }""" # editorconfig-checker-enable if not TOKEN: try: TOKEN = bwpass.attr("netbox.franzi.business/kunsi", "token") except Exception: print("NETBOX_AUTH_TOKEN is missing") exit(1) r = post( "https://netbox.franzi.business/graphql/", headers={ "Accept": "application/json", "Authorization": f"Token {TOKEN}", }, json={ "query": QUERY, }, ) r.raise_for_status() data = r.json()["data"] site_vlans = {site["id"]: site["vlans"] for site in data["site_list"]} for device in data["device_list"]: if not device["name"] or not validate_name(device["name"]): # invalid node name, ignore continue result = { "interfaces": {}, "vlans": site_vlans[device["site"]["id"]], } for interface in device["interfaces"]: description = "" peers = None if interface["connected_endpoints"]: peers = interface["connected_endpoints"] elif interface["link_peers"]: peers = interface["link_peers"] if interface["description"]: description = interface["description"] elif peers: peer_list = set() for i in peers: peer_list.add( "{} ({})".format( i["device"]["name"], i["name"], ) ) description = "; ".join(sorted(peer_list)) else: description = "" assert description.isascii() result["interfaces"][interface["name"]] = { "description": description, "enabled": interface["enabled"], "mode": interface["mode"], "type": interface["type"], "ips": sorted({i['address'] for i in interface['ip_addresses']}), "untagged_vlan": interface["untagged_vlan"]["name"] if interface["untagged_vlan"] else None, "tagged_vlans": sorted({v["name"] for v in interface["tagged_vlans"]}), } with open( join( dirname(dirname(__file__)), "configs", "netbox_device_{}.json".format(device["name"]), ), "w+", ) as f: dump( result, f, indent=4, sort_keys=True, )