Franziska Kunsmann
d44c87e8a7
All checks were successful
kunsi/bundlewrap/pipeline/head This commit looks good
somehow, we tend to get false positives if we run that check on the node itself.
98 lines
2.1 KiB
Python
98 lines
2.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from ipaddress import ip_address, IPv6Address
|
|
from subprocess import check_output
|
|
from sys import argv, exit
|
|
|
|
|
|
|
|
BLOCKLISTS = [
|
|
'0spam.fusionzero.com',
|
|
'bl.mailspike.org',
|
|
'bl.spamcop.net',
|
|
'blackholes.brainerd.net',
|
|
'dnsbl-1.uceprotect.net',
|
|
'dnsbl-2.uceprotect.net',
|
|
'dnsbl-3.uceprotect.net',
|
|
'l2.spews.dnsbl.sorbs.net',
|
|
'list.dsbl.org',
|
|
'map.spam-rbl.com',
|
|
'multihop.dsbl.org',
|
|
'ns1.unsubscore.com',
|
|
'opm.blitzed.org',
|
|
'psbl.surriel.com',
|
|
'rbl.efnet.org',
|
|
'rbl.schulte.org',
|
|
'spamguard.leadmon.net',
|
|
'ubl.unsubscore.com',
|
|
'unconfirmed.dsbl.org',
|
|
'virbl.dnsbl.bit.nl',
|
|
'zen.spamhaus.org',
|
|
]
|
|
|
|
def check_list(ip_list, blocklist):
|
|
dns_name = '{}.{}'.format(
|
|
'.'.join(ip_list),
|
|
blocklist,
|
|
)
|
|
|
|
returncode = 0
|
|
msgs = []
|
|
|
|
try:
|
|
result = check_output([
|
|
'dig',
|
|
'+tries=2',
|
|
'+time=5',
|
|
'+short',
|
|
dns_name
|
|
]).decode().splitlines()
|
|
for item in result:
|
|
msgs.append('{} listed in {} as {}'.format(
|
|
ip,
|
|
blocklist,
|
|
item,
|
|
))
|
|
returncode = 2
|
|
except Exception as e:
|
|
if e.returncode == 9:
|
|
# no reply from server
|
|
return [], 0
|
|
|
|
return [repr(e)], 3
|
|
|
|
return msgs, returncode
|
|
|
|
try:
|
|
ip = ip_address(argv[1])
|
|
except Exception:
|
|
print('usage: {} <ip>'.format(argv[0]))
|
|
exit(3)
|
|
|
|
if isinstance(ip, IPv6Address):
|
|
ip_list = list(ip.exploded.replace(':', ''))
|
|
else:
|
|
ip_list = ip.exploded.split('.')
|
|
|
|
ip_list.reverse()
|
|
exitcode = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor:
|
|
futures = set()
|
|
|
|
for blocklist in BLOCKLISTS:
|
|
futures.add(executor.submit(check_list, ip_list, blocklist))
|
|
|
|
for future in as_completed(futures):
|
|
msgs, this_exitcode = future.result()
|
|
|
|
for msg in msgs:
|
|
print(msg)
|
|
|
|
exitcode = max(exitcode, this_exitcode)
|
|
|
|
if exitcode == 0:
|
|
print('OK')
|
|
|
|
exit(exitcode)
|