#!/usr/bin/env python3 from json import dump from os import environ from os.path import join from sys import exit from bundlewrap.utils.ui import QUIT_EVENT, io from bundlewrap.utils.text import bold, yellow, validate_name from pynetbox import api as netbox_api BW_REPO_PATH = environ.get('BW_REPO_PATH', '.') netbox = netbox_api( environ.get('NETBOX_HOST', 'https://netbox.franzi.business'), token=environ.get('NETBOX_TOKEN', None), ) result = { # 'my_site_name': { # 'vlans': { # 'my_vlan_name': 10, # 'other_vlan_name': 11, # 'yet_another_vlan_name': 12, # }, # 'devices': { # 'my_switch': { # 'port1': { # 'description': 'foo', # 'type': '1000base-t', # or 'lag' # 'mode': None, # or 'access', 'tagged', 'tagged-all' # 'lag': 'none', # or 'LAG1' # 'vlan': { # 'untagged': 'my_vlan_name', # 'tagged': [ # 'other_vlan_name', # 'yet_another_vlan_name', # ], # }, # }, # }, # }, # }, } errors = False try: io.activate() for site in netbox.dcim.sites.all(): site_name = site.name.lower() result[site_name] = { 'vlans': {}, 'devices': {}, } with io.job(f'{bold(site_name)} getting vlans'): for vlan in netbox.ipam.vlans.filter(site_id=site.id): if vlan.name in result[site_name]['vlans'].keys() and result[site_name]['vlans'][vlan.name] != vlan.id: raise Exception(f"vlan {result[site_name]['vlans'][vlan.name]} and {vlan.id} both have the name {vlan.name}") result[site_name]['vlans'][vlan.name] = vlan.vid for interface in netbox.dcim.interfaces.filter(site_id=site.id): if QUIT_EVENT.is_set(): exit(0) with io.job(f'{bold(site_name)} {bold(interface.device.name)} interface {yellow(interface.name)}'): if not interface.device.name: # Unnamed device. Probably not managed by bw. continue elif not validate_name(interface.device.name): # bundlewrap does not consider this device name to be a valid # node name. Ignore it, we don't manage it continue has_valid_description = False if interface.description: description = interface.description has_valid_description = True elif interface.connected_endpoints: description = f'{sorted(interface.connected_endpoints)[0].device.display} ({sorted(interface.connected_endpoints)[0].display})' has_valid_description = True elif interface.link_peers: description = f'{sorted(interface.link_peers)[0].device.display} ({sorted(interface.link_peers)[0].display})' else: description = '' if not description.isascii(): errors = True io.stderr(f'{bold(interface.device.name)} {bold(interface.name)} description "{description}" contains non-ascii characters, this isn\'t supported') result[site_name]['devices'].setdefault(interface.device.name, {})[interface.name] = { 'description': description, 'enabled': interface.enabled, 'ip_addresses': sorted(set() if interface.count_ipaddresses == 0 else { ip.address for ip in netbox.ipam.ip_addresses.filter(interface_id=interface.id) }), 'mode': interface.mode.value if interface.mode else None, 'type': interface.type.value, 'lag': interface.lag.name if interface.lag else None, 'vlans': { 'untagged': interface.untagged_vlan.name if interface.untagged_vlan else None, 'tagged': sorted(vlan.name for vlan in interface.tagged_vlans), }, } if errors: exit(1) with io.job('dumping result to netbox_dump.json'): with open(join(BW_REPO_PATH, 'netbox_dump.json'), 'w') as f: dump(result, f, indent=4, sort_keys=True) finally: io.deactivate()