2021-12-21 07:30:36 +00:00
|
|
|
from functools import wraps
|
|
|
|
from json import load
|
|
|
|
from os import environ
|
|
|
|
|
2021-12-21 08:27:25 +00:00
|
|
|
from flask import redirect, session, url_for
|
2021-12-21 08:58:57 +00:00
|
|
|
from ldap3 import (
|
|
|
|
ALL_ATTRIBUTES,
|
|
|
|
HASHED_SALTED_SHA512,
|
|
|
|
MODIFY_REPLACE,
|
|
|
|
Connection,
|
|
|
|
Server,
|
|
|
|
)
|
2021-12-21 07:30:36 +00:00
|
|
|
from ldap3.core.exceptions import LDAPException
|
2021-12-21 08:58:57 +00:00
|
|
|
from ldap3.utils.hashed import hashed
|
2021-12-21 07:30:36 +00:00
|
|
|
|
|
|
|
with open(environ["APP_CONFIG"]) as f:
|
|
|
|
APP_CONFIG = load(f)
|
|
|
|
|
|
|
|
|
|
|
|
def login_required(func):
|
|
|
|
@wraps(func)
|
|
|
|
def wrapper(*args, **kwargs):
|
2021-12-21 09:47:25 +00:00
|
|
|
if session.get("is_logged_in"):
|
2021-12-21 07:30:36 +00:00
|
|
|
if try_auth(
|
|
|
|
session["username"],
|
|
|
|
session["password"],
|
|
|
|
):
|
|
|
|
ldap = connect()
|
|
|
|
|
2021-12-21 08:27:25 +00:00
|
|
|
return func(ldap, **kwargs)
|
2021-12-21 07:30:36 +00:00
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
def admin_required(func):
|
|
|
|
@wraps(func)
|
|
|
|
@login_required
|
|
|
|
def wrapper(*args, **kwargs):
|
2021-12-21 09:47:25 +00:00
|
|
|
if session.get("is_logged_in"):
|
2021-12-21 07:30:36 +00:00
|
|
|
if try_auth(
|
|
|
|
session["username"],
|
|
|
|
session["password"],
|
|
|
|
):
|
|
|
|
ldap = connect()
|
|
|
|
|
2021-12-21 08:27:25 +00:00
|
|
|
ldap.search(
|
|
|
|
APP_CONFIG["ldap"]["user_base"],
|
|
|
|
APP_CONFIG["template"]["group_admin"].format(session["username"]),
|
|
|
|
attributes=["uid"],
|
|
|
|
)
|
|
|
|
|
|
|
|
if len(ldap.entries) == 1:
|
|
|
|
return func(ldap, **kwargs)
|
|
|
|
else:
|
|
|
|
return redirect(url_for("selfservice"))
|
2021-12-21 07:30:36 +00:00
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
def try_auth(user, password):
|
|
|
|
try:
|
|
|
|
connect(
|
|
|
|
user=APP_CONFIG["template"]["user_dn"].format(user),
|
|
|
|
password=password,
|
|
|
|
)
|
|
|
|
return True
|
|
|
|
except LDAPException:
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def connect(user=None, password=None):
|
|
|
|
server = Server(APP_CONFIG["ldap"]["server"])
|
|
|
|
|
|
|
|
if not user and not password:
|
|
|
|
user = APP_CONFIG["ldap"]["username"]
|
|
|
|
password = APP_CONFIG["ldap"]["password"]
|
|
|
|
|
|
|
|
conn = Connection(
|
|
|
|
server,
|
|
|
|
user=user,
|
|
|
|
password=password,
|
|
|
|
)
|
|
|
|
conn.bind()
|
|
|
|
|
2021-12-21 08:58:05 +00:00
|
|
|
if conn.result["result"] != 0:
|
|
|
|
raise LDAPException(conn.result["description"])
|
|
|
|
|
2021-12-21 07:30:36 +00:00
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
def get_user(ldap, username):
|
|
|
|
ldap.search(
|
|
|
|
APP_CONFIG["ldap"]["user_base"],
|
|
|
|
APP_CONFIG["template"]["user_search"].format(username),
|
|
|
|
attributes=ALL_ATTRIBUTES,
|
|
|
|
)
|
|
|
|
if len(ldap.entries) == 1:
|
|
|
|
return ldap.entries[0]
|
|
|
|
else:
|
|
|
|
raise UserNotFoundException(username)
|
|
|
|
|
2021-12-21 08:27:25 +00:00
|
|
|
|
2021-12-21 07:30:36 +00:00
|
|
|
def update_user(ldap, username, settings):
|
|
|
|
attrs = {}
|
|
|
|
for attr, value in settings.items():
|
|
|
|
attrs[attr] = [(MODIFY_REPLACE, value)]
|
|
|
|
|
|
|
|
return ldap.modify(
|
|
|
|
APP_CONFIG["template"]["user_dn"].format(username),
|
|
|
|
attrs,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-12-21 08:58:57 +00:00
|
|
|
def update_user_password(ldap, username, password):
|
|
|
|
return update_user(
|
|
|
|
ldap,
|
|
|
|
username,
|
|
|
|
{
|
|
|
|
"userPassword": hashed(HASHED_SALTED_SHA512, password),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-12-21 07:30:36 +00:00
|
|
|
class UserNotFoundException(Exception):
|
|
|
|
pass
|