#!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor, as_completed from ipaddress import IPv6Address, ip_address from subprocess import check_output from sys import argv, exit BLOCKLISTS = { '0spam.fusionzero.com': set(), 'bl.mailspike.org': set(), 'bl.spamcop.net': set(), 'blackholes.brainerd.net': set(), 'dnsbl-1.uceprotect.net': set(), 'l2.spews.dnsbl.sorbs.net': set(), 'list.dsbl.org': set(), 'map.spam-rbl.com': set(), 'multihop.dsbl.org': set(), 'ns1.unsubscore.com': set(), 'opm.blitzed.org': set(), 'psbl.surriel.com': set(), 'rbl.efnet.org': set(), 'rbl.schulte.org': set(), 'spamguard.leadmon.net': set(), 'ubl.unsubscore.com': set(), 'unconfirmed.dsbl.org': set(), 'virbl.dnsbl.bit.nl': set(), 'zen.spamhaus.org': { # https://www.spamhaus.org/news/article/807/using-our-public-mirrors-check-your-return-codes-now. '127.255.255.252', # Typing Error '127.255.255.254', # public resolver / generic rdns '127.255.255.255', # rate limited }, } def check_list(ip_list, blocklist, warn_ips): dns_name = '{}.{}'.format( '.'.join(ip_list), blocklist, ) returncode = 0 msgs = [] try: result = check_output([ 'dig', '+tries=2', '+time=5', '+short', dns_name ]).decode().splitlines() for item in result: if item.startswith(';;'): msgs.append('{} - {}'.format( blocklist, item, )) else: msgs.append('{} listed in {} as {}'.format( ip, blocklist, item, )) if (item in warn_ips or item.startswith(';;')) and returncode < 2: returncode = 1 else: returncode = 2 except Exception as e: if e.returncode == 9: # no reply from server return [], 0 return [repr(e)], 3 return msgs, returncode try: ip = ip_address(argv[1]) except Exception: print('usage: {} <ip>'.format(argv[0])) exit(3) if isinstance(ip, IPv6Address): ip_list = list(ip.exploded.replace(':', '')) else: ip_list = ip.exploded.split('.') ip_list.reverse() exitcode = 0 with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor: futures = set() for blocklist, warn_ips in BLOCKLISTS.items(): futures.add(executor.submit(check_list, ip_list, blocklist, warn_ips)) for future in as_completed(futures): msgs, this_exitcode = future.result() for msg in msgs: print(msg) exitcode = max(exitcode, this_exitcode) if exitcode == 0: print('OK') exit(exitcode)