107 lines
2.8 KiB
Python
107 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from ipaddress import IPv6Address, ip_address
|
|
from subprocess import check_output
|
|
from sys import argv, exit
|
|
|
|
BLOCKLISTS = {
|
|
'0spam.fusionzero.com': set(),
|
|
'bl.mailspike.org': set(),
|
|
'bl.spamcop.net': set(),
|
|
'blackholes.brainerd.net': set(),
|
|
'dnsbl-1.uceprotect.net': set(),
|
|
'l2.spews.dnsbl.sorbs.net': set(),
|
|
'list.dsbl.org': set(),
|
|
'multihop.dsbl.org': set(),
|
|
'ns1.unsubscore.com': set(),
|
|
'opm.blitzed.org': set(),
|
|
'psbl.surriel.com': set(),
|
|
'rbl.efnet.org': set(),
|
|
'rbl.schulte.org': set(),
|
|
'spamguard.leadmon.net': set(),
|
|
'ubl.unsubscore.com': set(),
|
|
'unconfirmed.dsbl.org': set(),
|
|
'virbl.dnsbl.bit.nl': set(),
|
|
'zen.spamhaus.org': {
|
|
# https://www.spamhaus.org/news/article/807/using-our-public-mirrors-check-your-return-codes-now.
|
|
'127.255.255.252', # Typing Error
|
|
'127.255.255.254', # public resolver / generic rdns
|
|
'127.255.255.255', # rate limited
|
|
},
|
|
}
|
|
|
|
def check_list(ip_list, blocklist, warn_ips):
|
|
dns_name = '{}.{}'.format(
|
|
'.'.join(ip_list),
|
|
blocklist,
|
|
)
|
|
|
|
returncode = 0
|
|
msgs = []
|
|
|
|
try:
|
|
result = check_output([
|
|
'dig',
|
|
'+tries=2',
|
|
'+time=10',
|
|
'+short',
|
|
dns_name
|
|
]).decode().splitlines()
|
|
for item in result:
|
|
if item.startswith(';;'):
|
|
msgs.append('{} - {}'.format(
|
|
blocklist,
|
|
item,
|
|
))
|
|
else:
|
|
msgs.append('{} listed in {} as {}'.format(
|
|
ip,
|
|
blocklist,
|
|
item,
|
|
))
|
|
if (item in warn_ips or item.startswith(';;')) and returncode < 2:
|
|
returncode = 1
|
|
else:
|
|
returncode = 2
|
|
except Exception as e:
|
|
if e.returncode == 9:
|
|
# no reply from server
|
|
return [], 0
|
|
|
|
return [repr(e)], 3
|
|
|
|
return msgs, returncode
|
|
|
|
try:
|
|
ip = ip_address(argv[1])
|
|
except Exception:
|
|
print('usage: {} <ip>'.format(argv[0]))
|
|
exit(3)
|
|
|
|
if isinstance(ip, IPv6Address):
|
|
ip_list = list(ip.exploded.replace(':', ''))
|
|
else:
|
|
ip_list = ip.exploded.split('.')
|
|
|
|
ip_list.reverse()
|
|
exitcode = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor:
|
|
futures = set()
|
|
|
|
for blocklist, warn_ips in BLOCKLISTS.items():
|
|
futures.add(executor.submit(check_list, ip_list, blocklist, warn_ips))
|
|
|
|
for future in as_completed(futures):
|
|
msgs, this_exitcode = future.result()
|
|
|
|
for msg in msgs:
|
|
print(msg)
|
|
|
|
exitcode = max(exitcode, this_exitcode)
|
|
|
|
if exitcode == 0:
|
|
print('OK')
|
|
|
|
exit(exitcode)
|