118 lines
2.7 KiB
Python
118 lines
2.7 KiB
Python
from functools import wraps
|
|
from json import load
|
|
from os import environ
|
|
|
|
from flask import redirect, session, url_for, render_template
|
|
from ldap3 import ALL, Connection, Server
|
|
from ldap3 import ALL_ATTRIBUTES, MODIFY_REPLACE
|
|
from ldap3.core.exceptions import LDAPException
|
|
|
|
with open(environ["APP_CONFIG"]) as f:
|
|
APP_CONFIG = load(f)
|
|
|
|
|
|
def login_required(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if session["is_logged_in"]:
|
|
if try_auth(
|
|
session["username"],
|
|
session["password"],
|
|
):
|
|
ldap = connect()
|
|
|
|
return func(ldap, *args, **kwargs)
|
|
else:
|
|
return redirect(url_for("login"))
|
|
else:
|
|
return redirect(url_for("login"))
|
|
|
|
return wrapper
|
|
|
|
|
|
def admin_required(func):
|
|
@wraps(func)
|
|
@login_required
|
|
def wrapper(*args, **kwargs):
|
|
if session["is_logged_in"]:
|
|
if try_auth(
|
|
session["username"],
|
|
session["password"],
|
|
):
|
|
ldap = connect()
|
|
|
|
return func(ldap, *args, **kwargs)
|
|
else:
|
|
return redirect(url_for("login"))
|
|
else:
|
|
return redirect(url_for("login"))
|
|
|
|
return wrapper
|
|
|
|
|
|
def try_auth(user, password):
|
|
try:
|
|
connect(
|
|
user=APP_CONFIG["template"]["user_dn"].format(user),
|
|
password=password,
|
|
)
|
|
return True
|
|
except LDAPException:
|
|
return False
|
|
|
|
|
|
def connect(user=None, password=None):
|
|
server = Server(APP_CONFIG["ldap"]["server"])
|
|
|
|
if not user and not password:
|
|
user = APP_CONFIG["ldap"]["username"]
|
|
password = APP_CONFIG["ldap"]["password"]
|
|
|
|
conn = Connection(
|
|
server,
|
|
user=user,
|
|
password=password,
|
|
)
|
|
conn.bind()
|
|
|
|
return conn
|
|
|
|
|
|
def get_user(ldap, username):
|
|
ldap.search(
|
|
APP_CONFIG["ldap"]["user_base"],
|
|
APP_CONFIG["template"]["user_search"].format(username),
|
|
attributes=ALL_ATTRIBUTES,
|
|
)
|
|
if len(ldap.entries) == 1:
|
|
return ldap.entries[0]
|
|
else:
|
|
raise UserNotFoundException(username)
|
|
|
|
def update_user(ldap, username, settings):
|
|
attrs = {}
|
|
for attr, value in settings.items():
|
|
attrs[attr] = [(MODIFY_REPLACE, value)]
|
|
|
|
return ldap.modify(
|
|
APP_CONFIG["template"]["user_dn"].format(username),
|
|
attrs,
|
|
)
|
|
|
|
|
|
def template(ldap, name, **kwargs):
|
|
user = None
|
|
if ldap:
|
|
user = get_user(ldap, session["username"])
|
|
|
|
return render_template(
|
|
name,
|
|
APP_CONFIG=APP_CONFIG,
|
|
CURRENT_USER=user,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
class UserNotFoundException(Exception):
|
|
pass
|