from functools import wraps from json import load from os import environ from flask import redirect, session, url_for from ldap3 import ( ALL_ATTRIBUTES, HASHED_SALTED_SHA512, MODIFY_REPLACE, Connection, Server, ) from ldap3.core.exceptions import LDAPException from ldap3.utils.hashed import hashed with open(environ["APP_CONFIG"]) as f: APP_CONFIG = load(f) def login_required(func): @wraps(func) def wrapper(*args, **kwargs): if session.get("is_logged_in"): if try_auth( session["username"], session["password"], ): ldap = connect() return func(ldap, **kwargs) else: return redirect(url_for("login")) else: return redirect(url_for("login")) return wrapper def admin_required(func): @wraps(func) @login_required def wrapper(*args, **kwargs): if session.get("is_logged_in"): if try_auth( session["username"], session["password"], ): ldap = connect() ldap.search( APP_CONFIG["ldap"]["user_base"], APP_CONFIG["template"]["group_admin"].format(session["username"]), attributes=["uid"], ) if len(ldap.entries) == 1: return func(ldap, **kwargs) else: return redirect(url_for("selfservice")) else: return redirect(url_for("login")) else: return redirect(url_for("login")) return wrapper def try_auth(user, password): try: connect( user=APP_CONFIG["template"]["user_dn"].format(user), password=password, ) return True except LDAPException: return False def connect(user=None, password=None): server = Server(APP_CONFIG["ldap"]["server"]) if not user and not password: user = APP_CONFIG["ldap"]["username"] password = APP_CONFIG["ldap"]["password"] conn = Connection( server, user=user, password=password, ) conn.bind() if conn.result["result"] != 0: raise LDAPException(conn.result["description"]) return conn def get_user(ldap, username): ldap.search( APP_CONFIG["ldap"]["user_base"], APP_CONFIG["template"]["user_search"].format(username), attributes=ALL_ATTRIBUTES, ) if len(ldap.entries) == 1: return ldap.entries[0] else: raise UserNotFoundException(username) def update_user(ldap, username, settings): attrs = {} for attr, value in settings.items(): attrs[attr] = [(MODIFY_REPLACE, value)] return ldap.modify( APP_CONFIG["template"]["user_dn"].format(username), attrs, ) def update_user_password(ldap, username, password): return update_user( ldap, username, { "userPassword": hashed(HASHED_SALTED_SHA512, password), }, ) class UserNotFoundException(Exception): pass