bundlewrap/scripts/netbox-dump

240 lines
7 KiB
Python
Executable file

#!/usr/bin/env python3
from argparse import ArgumentParser
from json import dump
from os import environ, makedirs, remove, scandir
from os.path import abspath, dirname, join
from sys import exit
import bwpass
from requests import post
from bundlewrap.utils.text import bold, red, validate_name
from bundlewrap.utils.ui import io
TOKEN = environ.get("NETBOX_AUTH_TOKEN")
if not TOKEN:
try:
TOKEN = bwpass.attr("netbox.franzi.business/kunsi", "token")
except Exception:
print("NETBOX_AUTH_TOKEN missing")
exit(1)
TARGET_PATH = join(dirname(dirname(abspath(__file__))), "configs", "netbox")
QUERY_SITES = """{
site_list {
name
id
vlans {
name
vid
}
}
}"""
QUERY_DEVICES = """{
device_list(filters: {tag: "bundlewrap", site_id: "SITE_ID"}) {
name
id
}
}"""
QUERY_DEVICE_DETAILS = """{
device(id: DEVICE_ID) {
name
interfaces {
id
name
enabled
description
mode
type
ip_addresses {
address
}
untagged_vlan {
name
}
tagged_vlans {
name
}
link_peers {
... on InterfaceType {
name
device {
name
}
}
... on FrontPortType {
name
device {
name
}
}
}
connected_endpoints {
... on InterfaceType {
name
device {
name
}
}
}
}
}
}"""
def graphql(query):
r = post(
"https://netbox.franzi.business/graphql/",
headers={
"Accept": "application/json",
"Authorization": f"Token {TOKEN}",
},
json={
"query": query,
},
)
r.raise_for_status()
return r.json()["data"]
def filter_results(results, filter_by):
if filter_by is None:
return results
out = []
for result in results:
if str(result["id"]) in filter_by or result["name"] in filter_by:
out.append(result)
return out
parser = ArgumentParser()
parser.add_argument("--only-site", nargs="+", type=str)
parser.add_argument("--only-device", nargs="+", type=str)
args = parser.parse_args()
try:
io.activate()
filenames_used = set()
with io.job("getting sites"):
sites = filter_results(
graphql(QUERY_SITES).get("site_list", []), args.only_site
)
io.stdout(f"Processing {len(sites)} sites in total")
for site in sites:
with io.job(f"{bold(site['name'])} getting devices"):
devices = filter_results(
graphql(QUERY_DEVICES.replace("SITE_ID", site["id"])).get(
"device_list", []
),
args.only_device,
)
io.stdout(f"Site {bold(site['name'])} has {len(devices)} devices to process")
for device in devices:
if not device["name"] or not validate_name(device["name"]):
# invalid node name, ignore
continue
with io.job(
f"{bold(site['name'])} {bold(device['name'])} getting interfaces"
):
details = graphql(
QUERY_DEVICE_DETAILS.replace("DEVICE_ID", device["id"])
)["device"]
result = {
"interfaces": {},
"vlans": site["vlans"],
}
for interface in details["interfaces"]:
peers = None
if interface["connected_endpoints"]:
peers = interface["connected_endpoints"]
elif interface["link_peers"]:
peers = interface["link_peers"]
if interface["description"]:
description = interface["description"]
elif peers:
peer_list = set()
for i in peers:
peer_list.add(
"{} ({})".format(
i["device"]["name"],
i["name"],
)
)
description = "; ".join(sorted(peer_list))
else:
description = ""
assert description.isascii()
result["interfaces"][interface["name"]] = {
"description": description,
"enabled": interface["enabled"],
"mode": interface["mode"],
"type": interface["type"],
"ips": sorted(
{i["address"] for i in interface["ip_addresses"]}
),
"untagged_vlan": (
interface["untagged_vlan"]["name"]
if interface["untagged_vlan"]
else None
),
"tagged_vlans": sorted(
{v["name"] for v in interface["tagged_vlans"]}
),
}
if result["interfaces"]:
filename = f"{device['name']}.json"
filenames_used.add(filename)
file_with_path = join(TARGET_PATH, filename)
with io.job(
f"{bold(site['name'])} {bold(device['name'])} writing to {file_with_path}"
):
with open(
file_with_path,
"w+",
) as f:
dump(
result,
f,
indent=4,
sort_keys=True,
)
else:
io.stdout(
f"device {bold(device['name'])} has no interfaces, {red('not')} dumping!"
)
if not args.only_site and not args.only_device and filenames_used:
with io.job(f"cleaning leftover files from {TARGET_PATH}"):
for direntry in scandir(TARGET_PATH):
filename = direntry.name
if filename.startswith("."):
continue
if not direntry.is_file():
io.stderr(
f"found non-file {filename} in {TARGET_PATH}, please check what's going on!"
)
continue
if filename not in filenames_used:
remove(join(TARGET_PATH, filename))
finally:
io.deactivate()