initial commit
This commit is contained in:
commit
35a1d52840
20 changed files with 238 additions and 0 deletions
9
libs/defaults.py
Normal file
9
libs/defaults.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
hostmaster_email = 'hostmaster@kunbox.net'
|
||||
|
||||
influxdb_bucket = 'encrypt$gAAAAABgg9iMnq0nKpODMiMN4NtUw231iqpbyDXV-O8epOAGDSL4jcf3CaSa2bLZzH2fJFaKWjW-dpVd384x6KqSQU19XpfsWA=='
|
||||
influxdb_org = 'encrypt$gAAAAABgg9hyjz4XtvG8NBw9uYxiumS3v7YKIrtc9tTTABg1f9R22gzn55q8ULP9X3wlsPMUQs_DH7CgGv9neYmvVAriRoyd8g=='
|
||||
influxdb_token = 'encrypt$gAAAAABgg9Ag632Xyuc6SWXaR1uH2tLOChmVKAoBIikhjntSSD2qJFL_eouVQGXCLH2HEuSbSdEXcTPn2qmhOiA9jmFdoDSbVbQUsp0EID1wLsWYG_Um2KOxZSF-tn9eDZlgShQYySjzO3nQRmdlJpVLUnGHsiwv_sHD2FstXGpfzTPZq5_egUqEc0K2X-aN2J6BTYc2fZAN'
|
||||
influxdb_url = 'https://influxdb.kunsmann.eu/'
|
||||
|
||||
security_email = f'mailto:{hostmaster_email}'
|
||||
security_lang = {'en', 'de'}
|
88
libs/tools.py
Normal file
88
libs/tools.py
Normal file
|
@ -0,0 +1,88 @@
|
|||
from ipaddress import ip_address, ip_network, IPv4Address, IPv4Network
|
||||
|
||||
from bundlewrap.exceptions import NoSuchGroup, NoSuchNode, BundleError
|
||||
from bundlewrap.utils.text import bold, red
|
||||
from bundlewrap.utils.ui import io
|
||||
|
||||
def resolve_identifier(repo, identifier):
|
||||
"""
|
||||
Try to resolve an identifier (group or node). Return a set of ip
|
||||
addresses valid for this identifier.
|
||||
"""
|
||||
try:
|
||||
nodes = {repo.get_node(identifier)}
|
||||
except NoSuchNode:
|
||||
try:
|
||||
nodes = repo.nodes_in_group(identifier)
|
||||
except NoSuchGroup:
|
||||
try:
|
||||
ip = ip_network(identifier)
|
||||
|
||||
if isinstance(ip, IPv4Network):
|
||||
return {'ipv4': {ip}, 'ipv6': set()}
|
||||
else:
|
||||
return {'ipv4': set(), 'ipv6': {ip}}
|
||||
except Exception as e:
|
||||
io.stderr('{x} {t} Exception while resolving "{i}": {e}'.format(
|
||||
x=red('✘'),
|
||||
t=bold('libs.tools.resolve_identifier'),
|
||||
i=identifier,
|
||||
e=str(e),
|
||||
))
|
||||
raise
|
||||
|
||||
found_ips = set()
|
||||
for node in nodes:
|
||||
for interface, config in node.metadata.get('interfaces', {}).items():
|
||||
for ip in config.get('ips', set()):
|
||||
if '/' in ip:
|
||||
found_ips.add(ip_address(ip.split('/')[0]))
|
||||
else:
|
||||
found_ips.add(ip_address(ip))
|
||||
|
||||
if node.metadata.get('external_ipv4', None):
|
||||
found_ips.add(ip_address(node.metadata.get('external_ipv4')))
|
||||
|
||||
ip_dict = {
|
||||
'ipv4': set(),
|
||||
'ipv6': set(),
|
||||
}
|
||||
|
||||
for ip in found_ips:
|
||||
if isinstance(ip, IPv4Address):
|
||||
ip_dict['ipv4'].add(ip)
|
||||
else:
|
||||
ip_dict['ipv6'].add(ip)
|
||||
|
||||
return ip_dict
|
||||
|
||||
|
||||
def remove_more_specific_subnets(input_subnets) -> list:
|
||||
final_subnets = []
|
||||
|
||||
for subnet in sorted(input_subnets):
|
||||
source = ip_network(subnet)
|
||||
|
||||
if not source in final_subnets:
|
||||
subnet_found = False
|
||||
|
||||
for dest_subnet in final_subnets:
|
||||
if source.subnet_of(dest_subnet):
|
||||
subnet_found = True
|
||||
|
||||
if not subnet_found:
|
||||
final_subnets.append(source)
|
||||
|
||||
out = []
|
||||
for net in final_subnets:
|
||||
out.append(str(net))
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def require_bundle(node, bundle, hint=''):
|
||||
# It's considered bad style to use assert statements outside of tests.
|
||||
# That's why this little helper function exists, so we have an easy
|
||||
# way of defining bundle requirements in other bundles.
|
||||
if not node.has_bundle(bundle):
|
||||
raise BundleError(f'{node.name} requires bundle {bundle}, but wasn\'t found! {hint}')
|
Loading…
Add table
Add a link
Reference in a new issue