#!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor, as_completed from ipaddress import ip_address, IPv6Address from subprocess import check_output from sys import argv, exit BLOCKLISTS = [ '0spam.fusionzero.com', 'bl.mailspike.org', 'bl.spamcop.net', 'blackholes.brainerd.net', 'dnsbl-1.uceprotect.net', 'dnsbl-2.uceprotect.net', 'dnsbl-3.uceprotect.net', 'l2.spews.dnsbl.sorbs.net', 'list.dsbl.org', 'map.spam-rbl.com', 'multihop.dsbl.org', 'ns1.unsubscore.com', 'opm.blitzed.org', 'psbl.surriel.com', 'rbl.efnet.org', 'rbl.schulte.org', 'spamguard.leadmon.net', 'ubl.unsubscore.com', 'unconfirmed.dsbl.org', 'virbl.dnsbl.bit.nl', 'zen.spamhaus.org', ] def check_list(ip_list, blocklist): dns_name = '{}.{}'.format( '.'.join(ip_list), blocklist, ) returncode = 0 msgs = [] try: result = check_output([ 'dig', '+tries=2', '+time=5', '+short', dns_name ]).decode().splitlines() for item in result: msgs.append('{} listed in {} as {}'.format( ip, blocklist, item, )) returncode = 2 except Exception as e: if e.returncode == 9: # no reply from server return [], 0 return [repr(e)], 3 return msgs, returncode try: ip = ip_address(argv[1]) except Exception: print('usage: {} <ip>'.format(argv[0])) exit(3) if isinstance(ip, IPv6Address): ip_list = list(ip.exploded.replace(':', '')) else: ip_list = ip.exploded.split('.') ip_list.reverse() exitcode = 0 with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor: futures = set() for blocklist in BLOCKLISTS: futures.add(executor.submit(check_list, ip_list, blocklist)) for future in as_completed(futures): msgs, this_exitcode = future.result() for msg in msgs: print(msg) exitcode = max(exitcode, this_exitcode) if exitcode == 0: print('OK') exit(exitcode)