bundlewrap/scripts/netbox-dump

158 lines
3.3 KiB
Python
Executable file

#!/usr/bin/env python3
from json import dump
from os import environ
from os.path import dirname, join
from sys import exit
import bwpass
from requests import post
from bundlewrap.utils.text import validate_name
TOKEN = environ.get("NETBOX_AUTH_TOKEN")
# editorconfig-checker-disable
QUERY = """{
device_list(tag: "bundlewrap") {
name
site {
id
}
interfaces {
id
name
enabled
description
mode
type
ip_addresses {
address
}
untagged_vlan {
name
}
tagged_vlans {
name
}
link_peers {
... on InterfaceType {
name
device {
name
}
}
... on FrontPortType {
name
device {
name
}
}
}
connected_endpoints {
... on InterfaceType {
name
device {
name
}
}
}
}
}
site_list {
id
vlans {
name
vid
}
}
}"""
# editorconfig-checker-enable
if not TOKEN:
try:
TOKEN = bwpass.attr("netbox.franzi.business/kunsi", "token")
except Exception:
print("NETBOX_AUTH_TOKEN is missing")
exit(1)
r = post(
"https://netbox.franzi.business/graphql/",
headers={
"Accept": "application/json",
"Authorization": f"Token {TOKEN}",
},
json={
"query": QUERY,
},
)
r.raise_for_status()
data = r.json()["data"]
site_vlans = {site["id"]: site["vlans"] for site in data["site_list"]}
for device in data["device_list"]:
if not device["name"] or not validate_name(device["name"]):
# invalid node name, ignore
continue
result = {
"interfaces": {},
"vlans": site_vlans[device["site"]["id"]],
}
for interface in device["interfaces"]:
description = ""
peers = None
if interface["connected_endpoints"]:
peers = interface["connected_endpoints"]
elif interface["link_peers"]:
peers = interface["link_peers"]
if interface["description"]:
description = interface["description"]
elif peers:
peer_list = set()
for i in peers:
peer_list.add(
"{} ({})".format(
i["device"]["name"],
i["name"],
)
)
description = "; ".join(sorted(peer_list))
else:
description = ""
assert description.isascii()
result["interfaces"][interface["name"]] = {
"description": description,
"enabled": interface["enabled"],
"mode": interface["mode"],
"type": interface["type"],
"ips": sorted({i['address'] for i in interface['ip_addresses']}),
"untagged_vlan": interface["untagged_vlan"]["name"]
if interface["untagged_vlan"]
else None,
"tagged_vlans": sorted({v["name"] for v in interface["tagged_vlans"]}),
}
with open(
join(
dirname(dirname(__file__)),
"configs",
"netbox_device_{}.json".format(device["name"]),
),
"w+",
) as f:
dump(
result,
f,
indent=4,
sort_keys=True,
)