bundlewrap/bundles/icinga2/files/check_spam_blocklist

94 lines
2.1 KiB
Python

#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed
from ipaddress import IPv6Address, ip_address
from subprocess import check_output
from sys import argv, exit
BLOCKLISTS = [
'0spam.fusionzero.com',
'bl.mailspike.org',
'bl.spamcop.net',
'blackholes.brainerd.net',
'dnsbl-1.uceprotect.net',
'l2.spews.dnsbl.sorbs.net',
'list.dsbl.org',
'map.spam-rbl.com',
'multihop.dsbl.org',
'ns1.unsubscore.com',
'opm.blitzed.org',
'psbl.surriel.com',
'rbl.efnet.org',
'rbl.schulte.org',
'spamguard.leadmon.net',
'ubl.unsubscore.com',
'unconfirmed.dsbl.org',
'virbl.dnsbl.bit.nl',
'zen.spamhaus.org',
]
def check_list(ip_list, blocklist):
dns_name = '{}.{}'.format(
'.'.join(ip_list),
blocklist,
)
returncode = 0
msgs = []
try:
result = check_output([
'dig',
'+tries=2',
'+time=5',
'+short',
dns_name
]).decode().splitlines()
for item in result:
msgs.append('{} listed in {} as {}'.format(
ip,
blocklist,
item,
))
returncode = 2
except Exception as e:
if e.returncode == 9:
# no reply from server
return [], 0
return [repr(e)], 3
return msgs, returncode
try:
ip = ip_address(argv[1])
except Exception:
print('usage: {} <ip>'.format(argv[0]))
exit(3)
if isinstance(ip, IPv6Address):
ip_list = list(ip.exploded.replace(':', ''))
else:
ip_list = ip.exploded.split('.')
ip_list.reverse()
exitcode = 0
with ThreadPoolExecutor(max_workers=len(BLOCKLISTS)) as executor:
futures = set()
for blocklist in BLOCKLISTS:
futures.add(executor.submit(check_list, ip_list, blocklist))
for future in as_completed(futures):
msgs, this_exitcode = future.result()
for msg in msgs:
print(msg)
exitcode = max(exitcode, this_exitcode)
if exitcode == 0:
print('OK')
exit(exitcode)