diff --git a/bundles/icinga2/files/check_omm.py b/bundles/icinga2/files/check_omm.py new file mode 100644 index 0000000..25bf9b0 --- /dev/null +++ b/bundles/icinga2/files/check_omm.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +import re +from hashlib import md5 +from sys import argv, exit + +# Supress SSL certificate warnings for ssl_verify=False +import urllib3 +from lxml import html +from requests import Session + +USERNAME_FIELD = "g2" +PASSWORD_FIELD = "g3" +CRSF_FIELD = "password" + +STATUS_OK = 0 +STATUS_WARNING = 1 +STATUS_CRITICAL = 2 +STATUS_UNKNOWN = 3 + + +class OMMCrawler: + def __init__(self, hostname, username, password): + self.session = Session() + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + self.session.verify = False + + self.url = f"https://{hostname}" + self.login_data = { + USERNAME_FIELD: username, + PASSWORD_FIELD: password, + CRSF_FIELD: md5(password.encode()).hexdigest(), + } + self.logged_in = False + + def login(self): + # if we have multiple dect masters, find out which one is the current master + current_master_url = self.session.get(self.url, verify=False).url + self.hostname = re.search(r"^(.*[\\\/])", current_master_url).group(0)[:-1] + + response = self.session.post(f"{self.url}/login_set.html", data=self.login_data) + response.raise_for_status() + + # set cookie + pass_value = re.search(r"(?<=pass=)\d+(?=;)", response.text).group(0) + self.session.cookies.set("pass", pass_value) + self.logged_in = True + + def get_station_status(self): + if not self.logged_in: + self.login() + + data = {} + response = self.session.get(f"{self.url}/fp_pnp_status.html") + response.raise_for_status() + tree = html.fromstring(response.text) + xpath_results = tree.xpath('//tr[@class="l0" or @class="l1"]') + + for result in xpath_results: + bubble_is_in_inactive_cluster = False + bubble_is_connected = False + bubble_is_active = False + + bubble_name = result.xpath("td[4]/text()")[0] + try: + bubble_is_connected = result.xpath("td[11]/img/@alt")[0] == "yes" + + if bubble_is_connected: + try: + bubble_is_active = result.xpath("td[12]/img/@alt")[0] == "yes" + except IndexError: + # If an IndexError occurs, there is no image in the + # 12th td. This means this bubble is in the not inside + # an active DECT cluster, but is a backup bubble. + # This is probably fine. + bubble_is_active = False + bubble_is_in_inactive_cluster = True + else: + bubble_is_active = False + except: + # There is no Image in the 11th td. This usually means there + # is a warning message in the 10th td. We do not care about + # that, currently. + pass + + data[bubble_name] = { + "is_connected": bubble_is_connected, + "is_active": bubble_is_active, + "is_in_inactive_cluster": bubble_is_in_inactive_cluster, + } + return data + + def handle_station_data(self): + try: + data = self.get_station_status() + except Exception as e: + print(f"Something went wrong. You should take a look at {self.url}") + print(repr(e)) + exit(STATUS_UNKNOWN) + + critical = False + for name, status in data.items(): + if not status["is_active"] and not status["is_connected"]: + print( + f"Base station {name} is not active or connected! Check manually!" + ) + critical = True + elif not status["is_active"] and not status["is_in_inactive_cluster"]: + # Bubble is part of an active DECT cluster, but not active. + # This shouldn't happen. + print( + f"Base station {name} is not active but connected! Check manually!" + ) + critical = True + elif not status["is_connected"]: + # This should never happen. Seeing this state means OMM + # itself is broken. + print( + f"Base station {name} is not connected but active! Check manually!" + ) + critical = True + + if critical: + exit(STATUS_CRITICAL) + else: + print(f"OK - {len(data)} base stations connected") + exit(STATUS_OK) + + +if __name__ == "__main__": + omm = OMMCrawler(argv[1], argv[2], argv[3]) + omm.handle_station_data()