Franziska Kunsmann
6b90d568cf
All checks were successful
bundlewrap/pipeline/head This commit looks good
The library isn't available as a debian package, so we would have to manually install that every time the python package updates its minor version number.
98 lines
2.1 KiB
Python
98 lines
2.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from ipaddress import ip_address, IPv6Address
|
|
from subprocess import check_output
|
|
from sys import argv, exit
|
|
|
|
|
|
|
|
BLOCKLISTS = [
|
|
'0spam.fusionzero.com',
|
|
'bl.mailspike.org',
|
|
'bl.spamcop.net',
|
|
'blackholes.brainerd.net',
|
|
'dnsbl-1.uceprotect.net',
|
|
'dnsbl-2.uceprotect.net',
|
|
'dnsbl-3.uceprotect.net',
|
|
'l2.spews.dnsbl.sorbs.net',
|
|
'list.dsbl.org',
|
|
'map.spam-rbl.com',
|
|
'multihop.dsbl.org',
|
|
'ns1.unsubscore.com',
|
|
'opm.blitzed.org',
|
|
'psbl.surriel.com',
|
|
'rbl.efnet.org',
|
|
'rbl.schulte.org',
|
|
'spamguard.leadmon.net',
|
|
'ubl.unsubscore.com',
|
|
'unconfirmed.dsbl.org',
|
|
'virbl.dnsbl.bit.nl',
|
|
'zen.spamhaus.org',
|
|
]
|
|
|
|
def check_list(ip_list, blocklist):
|
|
dns_name = '{}.{}'.format(
|
|
'.'.join(ip_list),
|
|
blocklist,
|
|
)
|
|
|
|
returncode = 0
|
|
msgs = []
|
|
|
|
try:
|
|
result = check_output([
|
|
'dig',
|
|
'+tries=2',
|
|
'+time=5',
|
|
'+short',
|
|
dns_name
|
|
]).decode().splitlines()
|
|
for item in result:
|
|
msgs.append('{} listed in {} as {}'.format(
|
|
ip,
|
|
blocklist,
|
|
item,
|
|
))
|
|
returncode = 2
|
|
except Exception as e:
|
|
if e.returncode == 9:
|
|
# no reply from server
|
|
pass
|
|
|
|
return [repr(e)], 3
|
|
|
|
return msgs, returncode
|
|
|
|
try:
|
|
ip = ip_address(argv[1])
|
|
except Exception:
|
|
print('usage: {} <ip>'.format(argv[0]))
|
|
exit(3)
|
|
|
|
if isinstance(ip, IPv6Address):
|
|
ip_list = list(ip.exploded.replace(':', ''))
|
|
else:
|
|
ip_list = ip.exploded.split('.')
|
|
|
|
ip_list.reverse()
|
|
exitcode = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor:
|
|
futures = set()
|
|
|
|
for blocklist in BLOCKLISTS:
|
|
futures.add(executor.submit(check_list, ip_list, blocklist))
|
|
|
|
for future in as_completed(futures):
|
|
msgs, this_exitcode = future.result()
|
|
|
|
for msg in msgs:
|
|
print(msg)
|
|
|
|
exitcode = max(exitcode, this_exitcode)
|
|
|
|
if exitcode == 0:
|
|
print('OK')
|
|
|
|
exit(exitcode)
|