ldap-frontend/ldap_frontend/helpers/ldap.py

134 lines
3.1 KiB
Python
Raw Permalink Normal View History

2021-12-21 07:30:36 +00:00
from functools import wraps
from json import load
from os import environ
2021-12-21 08:27:25 +00:00
from flask import redirect, session, url_for
2021-12-21 08:58:57 +00:00
from ldap3 import (
ALL_ATTRIBUTES,
HASHED_SALTED_SHA512,
MODIFY_REPLACE,
Connection,
Server,
)
2021-12-21 07:30:36 +00:00
from ldap3.core.exceptions import LDAPException
2021-12-21 08:58:57 +00:00
from ldap3.utils.hashed import hashed
2021-12-21 07:30:36 +00:00
with open(environ["APP_CONFIG"]) as f:
APP_CONFIG = load(f)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
2021-12-21 09:47:25 +00:00
if session.get("is_logged_in"):
2021-12-21 07:30:36 +00:00
if try_auth(
session["username"],
session["password"],
):
ldap = connect()
2021-12-21 08:27:25 +00:00
return func(ldap, **kwargs)
2021-12-21 07:30:36 +00:00
else:
return redirect(url_for("login"))
else:
return redirect(url_for("login"))
return wrapper
def admin_required(func):
@wraps(func)
@login_required
def wrapper(*args, **kwargs):
2021-12-21 09:47:25 +00:00
if session.get("is_logged_in"):
2021-12-21 07:30:36 +00:00
if try_auth(
session["username"],
session["password"],
):
ldap = connect()
2021-12-21 08:27:25 +00:00
ldap.search(
APP_CONFIG["ldap"]["user_base"],
APP_CONFIG["template"]["group_admin"].format(session["username"]),
attributes=["uid"],
)
if len(ldap.entries) == 1:
return func(ldap, **kwargs)
else:
return redirect(url_for("selfservice"))
2021-12-21 07:30:36 +00:00
else:
return redirect(url_for("login"))
else:
return redirect(url_for("login"))
return wrapper
def try_auth(user, password):
try:
connect(
user=APP_CONFIG["template"]["user_dn"].format(user),
password=password,
)
return True
except LDAPException:
return False
def connect(user=None, password=None):
server = Server(APP_CONFIG["ldap"]["server"])
if not user and not password:
user = APP_CONFIG["ldap"]["username"]
password = APP_CONFIG["ldap"]["password"]
conn = Connection(
server,
user=user,
password=password,
)
conn.bind()
if conn.result["result"] != 0:
raise LDAPException(conn.result["description"])
2021-12-21 07:30:36 +00:00
return conn
def get_user(ldap, username):
ldap.search(
APP_CONFIG["ldap"]["user_base"],
APP_CONFIG["template"]["user_search"].format(username),
attributes=ALL_ATTRIBUTES,
)
if len(ldap.entries) == 1:
return ldap.entries[0]
else:
raise UserNotFoundException(username)
2021-12-21 08:27:25 +00:00
2021-12-21 07:30:36 +00:00
def update_user(ldap, username, settings):
attrs = {}
for attr, value in settings.items():
attrs[attr] = [(MODIFY_REPLACE, value)]
return ldap.modify(
APP_CONFIG["template"]["user_dn"].format(username),
attrs,
)
2021-12-21 08:58:57 +00:00
def update_user_password(ldap, username, password):
return update_user(
ldap,
username,
{
"userPassword": hashed(HASHED_SALTED_SHA512, password),
},
)
2021-12-21 07:30:36 +00:00
class UserNotFoundException(Exception):
pass