from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network from bundlewrap.exceptions import BundleError, NoSuchGroup, NoSuchNode from bundlewrap.utils.text import bold, red from bundlewrap.utils.ui import io def resolve_identifier(repo, identifier, linklocal=False, only_physical=False, allow_private=True): """ Try to resolve an identifier (group or node). Return a set of ip addresses valid for this identifier. """ try: nodes = {repo.get_node(identifier)} except NoSuchNode: try: nodes = repo.nodes_in_group(identifier) except NoSuchGroup: try: ip = ip_network(identifier) if isinstance(ip, IPv4Network): return {'ipv4': {ip}, 'ipv6': set()} else: return {'ipv4': set(), 'ipv6': {ip}} except ValueError: try: return repo.libs.firewall.named_networks[identifier] except KeyError: raise BundleError( f'libs.tools.resolve_identifier(): Could not resolve {identifier}' ) found_ips = set() for node in nodes: for interface, config in node.metadata.get('interfaces', {}).items(): if only_physical and not ( interface.startswith('bond') or interface.startswith('br') or interface.startswith('en') or interface.startswith('et') or interface == 'default' # dummy nodes use these ): continue for ip in config.get('ips', set()): if '/' in ip: found_ips.add(ip_address(ip.split('/')[0])) else: found_ips.add(ip_address(ip)) if node.metadata.get('external_ipv4', None): found_ips.add(ip_address(node.metadata.get('external_ipv4'))) try: found_ips.add(ip_address(node.hostname)) except ValueError: pass ip_dict = { 'ipv4': set(), 'ipv6': set(), } has_public_ips = bool([ip for ip in found_ips if not ip.is_private]) for ip in found_ips: if ip.is_link_local and not linklocal: continue if ip.is_private and not allow_private and has_public_ips: continue if isinstance(ip, IPv4Address): ip_dict['ipv4'].add(ip) else: ip_dict['ipv6'].add(ip) return ip_dict def remove_more_specific_subnets(input_subnets) -> list: final_subnets = [] for subnet in sorted(input_subnets): source = ip_network(subnet) if not source in final_subnets: subnet_found = False for dest_subnet in final_subnets: if source.subnet_of(dest_subnet): subnet_found = True if not subnet_found: final_subnets.append(source) out = [] for net in final_subnets: out.append(str(net)) return out def require_bundle(node, bundle, hint=''): # It's considered bad style to use assert statements outside of tests. # That's why this little helper function exists, so we have an easy # way of defining bundle requirements in other bundles. if not node.has_bundle(bundle): raise BundleError(f'{node.name} requires bundle {bundle}, but wasn\'t found! {hint}')