bundlewrap/scripts/netbox-dump

241 lines
7 KiB
Plaintext
Raw Normal View History

2023-03-22 20:19:41 +00:00
#!/usr/bin/env python3
from argparse import ArgumentParser
2023-03-22 20:19:41 +00:00
from json import dump
from os import environ, makedirs, remove, scandir
from os.path import abspath, dirname, join
2023-03-22 20:19:41 +00:00
from sys import exit
import bwpass
from requests import post
from bundlewrap.utils.text import bold, red, validate_name
from bundlewrap.utils.ui import io
2023-03-22 20:19:41 +00:00
TOKEN = environ.get("NETBOX_AUTH_TOKEN")
if not TOKEN:
try:
TOKEN = bwpass.attr("netbox.franzi.business/kunsi", "token")
except Exception:
print("NETBOX_AUTH_TOKEN missing")
exit(1)
2023-03-22 20:19:41 +00:00
TARGET_PATH = join(dirname(dirname(abspath(__file__))), "configs", "netbox")
QUERY_SITES = """{
site_list {
2023-03-22 20:19:41 +00:00
name
id
vlans {
2023-03-22 20:19:41 +00:00
name
vid
2023-03-22 20:19:41 +00:00
}
}
}"""
QUERY_DEVICES = """{
device_list(filters: {tag: "bundlewrap", site_id: "SITE_ID"}) {
name
id
}
}"""
QUERY_DEVICE_DETAILS = """{
device(id: DEVICE_ID) {
name
interfaces {
id
2023-03-22 20:19:41 +00:00
name
enabled
description
mode
type
ip_addresses {
address
}
untagged_vlan {
name
}
tagged_vlans {
name
}
link_peers {
... on InterfaceType {
name
device {
name
}
}
... on FrontPortType {
name
device {
name
}
}
}
connected_endpoints {
... on InterfaceType {
name
device {
name
}
}
}
2023-03-22 20:19:41 +00:00
}
}
}"""
def graphql(query):
r = post(
"https://netbox.franzi.business/graphql/",
headers={
"Accept": "application/json",
"Authorization": f"Token {TOKEN}",
},
json={
"query": query,
},
)
r.raise_for_status()
return r.json()["data"]
def filter_results(results, filter_by):
if filter_by is None:
return results
out = []
for result in results:
if str(result["id"]) in filter_by or result["name"] in filter_by:
out.append(result)
return out
parser = ArgumentParser()
parser.add_argument("--only-site", nargs="+", type=str)
parser.add_argument("--only-device", nargs="+", type=str)
args = parser.parse_args()
try:
io.activate()
filenames_used = set()
with io.job("getting sites"):
sites = filter_results(
graphql(QUERY_SITES).get("site_list", []), args.only_site
)
2023-03-22 20:19:41 +00:00
io.stdout(f"Processing {len(sites)} sites in total")
for site in sites:
with io.job(f"{bold(site['name'])} getting devices"):
devices = filter_results(
graphql(QUERY_DEVICES.replace("SITE_ID", site["id"])).get(
"device_list", []
),
args.only_device,
)
io.stdout(f"Site {bold(site['name'])} has {len(devices)} devices to process")
for device in devices:
if not device["name"] or not validate_name(device["name"]):
# invalid node name, ignore
continue
with io.job(
f"{bold(site['name'])} {bold(device['name'])} getting interfaces"
):
details = graphql(
QUERY_DEVICE_DETAILS.replace("DEVICE_ID", device["id"])
)["device"]
result = {
"interfaces": {},
"vlans": site["vlans"],
}
for interface in details["interfaces"]:
peers = None
if interface["connected_endpoints"]:
peers = interface["connected_endpoints"]
elif interface["link_peers"]:
peers = interface["link_peers"]
if interface["description"]:
description = interface["description"]
elif peers:
peer_list = set()
for i in peers:
peer_list.add(
"{} ({})".format(
i["device"]["name"],
i["name"],
)
)
description = "; ".join(sorted(peer_list))
else:
description = ""
assert description.isascii()
result["interfaces"][interface["name"]] = {
"description": description,
"enabled": interface["enabled"],
"mode": interface["mode"],
"type": interface["type"],
"ips": sorted(
{i["address"] for i in interface["ip_addresses"]}
),
"untagged_vlan": (
interface["untagged_vlan"]["name"]
if interface["untagged_vlan"]
else None
),
"tagged_vlans": sorted(
{v["name"] for v in interface["tagged_vlans"]}
),
}
if result["interfaces"]:
filename = f"{device['name']}.json"
filenames_used.add(filename)
file_with_path = join(TARGET_PATH, filename)
with io.job(
f"{bold(site['name'])} {bold(device['name'])} writing to {file_with_path}"
):
with open(
file_with_path,
"w+",
) as f:
dump(
result,
f,
indent=4,
sort_keys=True,
)
else:
io.stdout(
f"device {bold(device['name'])} has no interfaces, {red('not')} dumping!"
2023-03-22 20:19:41 +00:00
)
if not args.only_site and not args.only_device and filenames_used:
with io.job(f"cleaning leftover files from {TARGET_PATH}"):
for direntry in scandir(TARGET_PATH):
filename = direntry.name
if filename.startswith("."):
continue
if not direntry.is_file():
io.stderr(
f"found non-file {filename} in {TARGET_PATH}, please check what's going on!"
)
continue
if filename not in filenames_used:
remove(join(TARGET_PATH, filename))
finally:
io.deactivate()