diff --git a/bundles/routeros/metadata.py b/bundles/routeros/metadata.py index 72bc063..13230d6 100644 --- a/bundles/routeros/metadata.py +++ b/bundles/routeros/metadata.py @@ -26,7 +26,7 @@ defaults = { 'routeros/vlans', ) def get_ports_from_netbox_dump(metadata): - with open(join(repo.path, 'configs', f'netbox_device_{node.name}.json')) as f: + with open(join(repo.path, 'configs', 'netbox', f'{node.name}.json')) as f: netbox = load(f) ips = {} @@ -45,7 +45,7 @@ def get_ports_from_netbox_dump(metadata): for ip in conf['ips']: ips[ip] = {'interface': port} - if conf['type'] == 'VIRTUAL': + if conf['type'].lower() == 'virtual': # these are VLAN interfaces (for management IPs) if conf['ips']: # this makes management services available in the VLAN @@ -77,6 +77,8 @@ def get_ports_from_netbox_dump(metadata): if conf.get('ips', []): ports[port]['ips'] = set(conf['ips']) if conf['type'] in ( + '1000base-t', + '10gbase-x-sfpp', 'A_1000BASE_T', 'A_10GBASE_X_SFPP', ): @@ -90,7 +92,7 @@ def get_ports_from_netbox_dump(metadata): # tagged - if conf['mode'] == 'TAGGED_ALL': + if conf['mode'] in ('TAGGED_ALL', 'tagged-all'): tagged = set(vlans.keys()) - {conf['untagged_vlan']} else: tagged = conf['tagged_vlans'] diff --git a/configs/netbox_device_home.switch-rack.json b/configs/netbox/home.switch-rack.json similarity index 77% rename from configs/netbox_device_home.switch-rack.json rename to configs/netbox/home.switch-rack.json index 9e3159d..e5da349 100644 --- a/configs/netbox_device_home.switch-rack.json +++ b/configs/netbox/home.switch-rack.json @@ -4,225 +4,225 @@ "description": "home.router (enp1s0)", "enabled": true, "ips": [], - "mode": "TAGGED_ALL", + "mode": "tagged-all", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": null }, "ether10": { "description": "home.mitel-rfp35 (LAN)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether11": { "description": "home.usv01 (LAN)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether12": { "description": "home.rechenmonster (IPMI)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether13": { "description": "", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether14": { "description": "home.rechenmonster (LAN)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether15": { "description": "", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether16": { "description": "", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether17": { "description": "", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether18": { "description": "", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether19": { "description": "home.lgtv-wohnzimmer", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether2": { "description": "Fritz!Box (LAN1)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.wan" }, "ether20": { "description": "Franzi Laptop", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether21": { "description": "Sophie Laptop", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether22": { "description": "Sophie Desktop", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether23": { "description": "Wohnzimmer Kabel", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether24": { "description": "home.snom-wohnzimmer", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether3": { "description": "home.aruba325-schlafzimmer", "enabled": true, "ips": [], - "mode": "TAGGED", + "mode": "tagged", "tagged_vlans": [ "ffwi.client", "home.v6only" ], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether4": { "description": "home.aruba325-wohnzimmer", "enabled": true, "ips": [], - "mode": "TAGGED", + "mode": "tagged", "tagged_vlans": [ "ffwi.client", "home.v6only" ], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether5": { "description": "home.nas (eno1)", "enabled": true, "ips": [], - "mode": "TAGGED_ALL", + "mode": "tagged-all", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": null }, "ether6": { "description": "home.aruba325-office", "enabled": true, "ips": [], - "mode": "TAGGED", + "mode": "tagged", "tagged_vlans": [ "ffwi.client", "home.v6only" ], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether7": { "description": "RIPE-Probe #28280 (LAN)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.dmz" }, "ether8": { "description": "home.drucker-sophie", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "ether9": { "description": "info-beamer 12199 (LAN)", "enabled": true, "ips": [], - "mode": "ACCESS", + "mode": "access", "tagged_vlans": [], - "type": "A_1000BASE_T", + "type": "1000base-t", "untagged_vlan": "home.clients" }, "home.clients": { @@ -231,27 +231,27 @@ "ips": [ "172.19.138.4/24" ], - "mode": null, + "mode": "", "tagged_vlans": [], - "type": "VIRTUAL", + "type": "virtual", "untagged_vlan": null }, "sfp-sfpplus1": { "description": "", "enabled": true, "ips": [], - "mode": null, + "mode": "", "tagged_vlans": [], - "type": "A_10GBASE_X_SFPP", + "type": "10gbase-x-sfpp", "untagged_vlan": null }, "sfp-sfpplus2": { "description": "", "enabled": true, "ips": [], - "mode": null, + "mode": "", "tagged_vlans": [], - "type": "A_10GBASE_X_SFPP", + "type": "10gbase-x-sfpp", "untagged_vlan": null } }, diff --git a/scripts/netbox-dump b/scripts/netbox-dump index f3c79a6..8486653 100755 --- a/scripts/netbox-dump +++ b/scripts/netbox-dump @@ -1,158 +1,240 @@ #!/usr/bin/env python3 +from argparse import ArgumentParser from json import dump -from os import environ -from os.path import dirname, join +from os import environ, makedirs, remove, scandir +from os.path import abspath, dirname, join from sys import exit import bwpass from requests import post -from bundlewrap.utils.text import validate_name +from bundlewrap.utils.text import bold, red, validate_name +from bundlewrap.utils.ui import io TOKEN = environ.get("NETBOX_AUTH_TOKEN") - -# editorconfig-checker-disable -QUERY = """{ - device_list(tag: "bundlewrap") { - name - site { - id - } - interfaces { - id - name - enabled - description - mode - type - ip_addresses { - address - } - untagged_vlan { - name - } - tagged_vlans { - name - } - link_peers { - ... on InterfaceType { - name - device { - name - } - } - ... on FrontPortType { - name - device { - name - } - } - } - connected_endpoints { - ... on InterfaceType { - name - device { - name - } - } - } - } - } - site_list { - id - vlans { - name - vid - } - } -}""" -# editorconfig-checker-enable - if not TOKEN: try: TOKEN = bwpass.attr("netbox.franzi.business/kunsi", "token") except Exception: - print("NETBOX_AUTH_TOKEN is missing") + print("NETBOX_AUTH_TOKEN missing") exit(1) -r = post( - "https://netbox.franzi.business/graphql/", - headers={ - "Accept": "application/json", - "Authorization": f"Token {TOKEN}", - }, - json={ - "query": QUERY, - }, -) -r.raise_for_status() +TARGET_PATH = join(dirname(dirname(abspath(__file__))), "configs", "netbox") -data = r.json()["data"] - -site_vlans = {site["id"]: site["vlans"] for site in data["site_list"]} - -for device in data["device_list"]: - if not device["name"] or not validate_name(device["name"]): - # invalid node name, ignore - continue - - result = { - "interfaces": {}, - "vlans": site_vlans[device["site"]["id"]], +QUERY_SITES = """{ + site_list { + name + id + vlans { + name + vid + } } +}""" - for interface in device["interfaces"]: - description = "" - peers = None +QUERY_DEVICES = """{ + device_list(filters: {tag: "bundlewrap", site_id: "SITE_ID"}) { + name + id + } +}""" - if interface["connected_endpoints"]: - peers = interface["connected_endpoints"] - elif interface["link_peers"]: - peers = interface["link_peers"] +QUERY_DEVICE_DETAILS = """{ + device(id: DEVICE_ID) { + name + interfaces { + id + name + enabled + description + mode + type + ip_addresses { + address + } + untagged_vlan { + name + } + tagged_vlans { + name + } + link_peers { + ... on InterfaceType { + name + device { + name + } + } + ... on FrontPortType { + name + device { + name + } + } + } + connected_endpoints { + ... on InterfaceType { + name + device { + name + } + } + } + } + } +}""" - if interface["description"]: - description = interface["description"] - elif peers: - peer_list = set() - for i in peers: - peer_list.add( - "{} ({})".format( - i["device"]["name"], - i["name"], - ) +def graphql(query): + r = post( + "https://netbox.franzi.business/graphql/", + headers={ + "Accept": "application/json", + "Authorization": f"Token {TOKEN}", + }, + json={ + "query": query, + }, + ) + r.raise_for_status() + return r.json()["data"] + + +def filter_results(results, filter_by): + if filter_by is None: + return results + + out = [] + for result in results: + if str(result["id"]) in filter_by or result["name"] in filter_by: + out.append(result) + return out + + +parser = ArgumentParser() +parser.add_argument("--only-site", nargs="+", type=str) +parser.add_argument("--only-device", nargs="+", type=str) +args = parser.parse_args() + +try: + io.activate() + filenames_used = set() + + with io.job("getting sites"): + sites = filter_results( + graphql(QUERY_SITES).get("site_list", []), args.only_site + ) + + io.stdout(f"Processing {len(sites)} sites in total") + + for site in sites: + with io.job(f"{bold(site['name'])} getting devices"): + devices = filter_results( + graphql(QUERY_DEVICES.replace("SITE_ID", site["id"])).get( + "device_list", [] + ), + args.only_device, + ) + io.stdout(f"Site {bold(site['name'])} has {len(devices)} devices to process") + + for device in devices: + if not device["name"] or not validate_name(device["name"]): + # invalid node name, ignore + continue + + with io.job( + f"{bold(site['name'])} {bold(device['name'])} getting interfaces" + ): + details = graphql( + QUERY_DEVICE_DETAILS.replace("DEVICE_ID", device["id"]) + )["device"] + + result = { + "interfaces": {}, + "vlans": site["vlans"], + } + + for interface in details["interfaces"]: + peers = None + + if interface["connected_endpoints"]: + peers = interface["connected_endpoints"] + elif interface["link_peers"]: + peers = interface["link_peers"] + + if interface["description"]: + description = interface["description"] + elif peers: + peer_list = set() + + for i in peers: + peer_list.add( + "{} ({})".format( + i["device"]["name"], + i["name"], + ) + ) + + description = "; ".join(sorted(peer_list)) + else: + description = "" + + assert description.isascii() + + result["interfaces"][interface["name"]] = { + "description": description, + "enabled": interface["enabled"], + "mode": interface["mode"], + "type": interface["type"], + "ips": sorted( + {i["address"] for i in interface["ip_addresses"]} + ), + "untagged_vlan": ( + interface["untagged_vlan"]["name"] + if interface["untagged_vlan"] + else None + ), + "tagged_vlans": sorted( + {v["name"] for v in interface["tagged_vlans"]} + ), + } + + if result["interfaces"]: + filename = f"{device['name']}.json" + filenames_used.add(filename) + file_with_path = join(TARGET_PATH, filename) + + with io.job( + f"{bold(site['name'])} {bold(device['name'])} writing to {file_with_path}" + ): + with open( + file_with_path, + "w+", + ) as f: + dump( + result, + f, + indent=4, + sort_keys=True, + ) + else: + io.stdout( + f"device {bold(device['name'])} has no interfaces, {red('not')} dumping!" ) - description = "; ".join(sorted(peer_list)) - else: - description = "" - - assert description.isascii() - - result["interfaces"][interface["name"]] = { - "description": description, - "enabled": interface["enabled"], - "mode": interface["mode"], - "type": interface["type"], - "ips": sorted({i['address'] for i in interface['ip_addresses']}), - "untagged_vlan": interface["untagged_vlan"]["name"] - if interface["untagged_vlan"] - else None, - "tagged_vlans": sorted({v["name"] for v in interface["tagged_vlans"]}), - } - - with open( - join( - dirname(dirname(__file__)), - "configs", - "netbox_device_{}.json".format(device["name"]), - ), - "w+", - ) as f: - dump( - result, - f, - indent=4, - sort_keys=True, - ) + if not args.only_site and not args.only_device and filenames_used: + with io.job(f"cleaning leftover files from {TARGET_PATH}"): + for direntry in scandir(TARGET_PATH): + filename = direntry.name + if filename.startswith("."): + continue + if not direntry.is_file(): + io.stderr( + f"found non-file {filename} in {TARGET_PATH}, please check what's going on!" + ) + continue + if filename not in filenames_used: + remove(join(TARGET_PATH, filename)) +finally: + io.deactivate()