from json import load from os import environ from flask import Flask, flash, redirect, request, session, url_for from flask_babel import Babel, gettext from flask_wtf.csrf import CSRFError, CSRFProtect from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD, MODIFY_DELETE from ldap3.core.exceptions import LDAPException from ldap3.utils.dn import escape_rdn from .helpers.flask import template from .helpers.ldap import ( admin_required, get_user, login_required, try_auth, update_user, update_user_password, ) app = Flask(__name__) app.secret_key = environ.get("FLASK_SECRET_KEY", default="test") app.config["LANGUAGES"] = {"en": "English", "de": "Deutsch"} babel = Babel(app) csrf = CSRFProtect(app) with open(environ["APP_CONFIG"]) as f: APP_CONFIG = load(f) @babel.localeselector def get_locale(): return request.accept_languages.best_match(app.config["LANGUAGES"].keys()) @app.errorhandler(CSRFError) def handle_csrf_error(e): flash( gettext("CRSF validation error. For your own safety, you have been logged out.") ) session["is_logged_in"] = False session["username"] = "" session["password"] = "" return redirect(url_for("login")) @app.route("/") def slash(): if session.get("is_logged_in"): return redirect(url_for("selfservice")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): session["is_logged_in"] = False if request.method == "POST": if try_auth( escape_rdn(request.form["username"]), request.form["password"], ): session["is_logged_in"] = True session["username"] = escape_rdn(request.form["username"]) session["password"] = request.form["password"] flash(gettext("logged in")) return redirect(url_for("selfservice")) else: flash(gettext("username or password is wrong")) return template(None, "login.html") @app.route("/logout") def logout(): session["is_logged_in"] = False session["username"] = "" session["password"] = "" flash(gettext("you have been logged out")) return redirect(url_for("login")) @app.route("/selfservice", methods=["GET", "POST"]) @login_required def selfservice(ldap): if request.method == "POST": if request.form.get("userdata"): try: update_user( ldap, session["username"], { "givenName": request.form["givenName"], "sn": request.form["sn"], "cn": "{} {}".format( request.form["givenName"], request.form["sn"], ), "mail": request.form["mail"], }, ) flash(gettext("user data was updated")) except LDAPException as e: app.logger.error( "Updating {} failed: {}\n{}".format( APP_CONFIG["template"]["user_dn"].format(session["username"]), repr(e), repr(request.form), ), ) flash(e) elif request.form.get("passwordchange"): validated = (True,) if not try_auth( session["username"], request.form["current"], ): validated = False flash(gettext("current password does not match the one stored")) if request.form["new"] != request.form["repeat"]: validated = False flash(gettext("new passwords do not match")) if len(request.form["new"]) < 12: validated = False flash(gettext("new password must be atleast 12 characters long")) if validated: try: update_user_password( ldap, session["username"], request.form["new"], ) session["password"] = request.form["new"] flash(gettext("your password was changed")) except LDAPException as e: app.logger.error( "Updating {} failed: {}".format( APP_CONFIG["template"]["user_dn"].format( session["username"] ), repr(e), ), ) flash(e) return redirect(url_for("selfservice")) return template(ldap, "selfservice.html") @app.route("/groups", methods=["GET"]) @login_required def groups(ldap): ldap.search( APP_CONFIG["ldap"]["group_base"], "(objectclass=groupOfNames)", attributes=ALL_ATTRIBUTES, ) return template( ldap, "groups/list.html", groups=ldap.entries, ) @app.route("/groups/", methods=["GET", "POST"]) @admin_required def group_edit(ldap, ou): ou = escape_rdn(ou) if request.method == "POST": if request.form.get("remove"): ldap.modify( APP_CONFIG["template"]["group_dn"].format(ou), { "member": [ ( MODIFY_DELETE, APP_CONFIG["template"]["user_dn"].format( escape_rdn(request.form["remove"]) ), ) ] }, ) flash( gettext( "%(user)s was removed from %(ou)s", user=request.form["remove"], ou=ou, ) ) elif request.form.get("add"): ldap.modify( APP_CONFIG["template"]["group_dn"].format(ou), { "member": [ ( MODIFY_ADD, APP_CONFIG["template"]["user_dn"].format( escape_rdn(request.form["add"]) ), ) ] }, ) flash( gettext( "%(user)s was added to %(ou)s", user=request.form["add"], ou=ou ) ) return redirect(url_for("group_edit", ou=ou)) ldap.search( APP_CONFIG["ldap"]["user_base"], APP_CONFIG["template"]["group_nonmembers"].format(ou), attributes=["cn", "uid"], ) users = ldap.entries ldap.search( APP_CONFIG["ldap"]["user_base"], APP_CONFIG["template"]["group_members"].format(ou), attributes=ALL_ATTRIBUTES, ) return template( ldap, "groups/members.html", members=ldap.entries, ou=ou, other_users=users, )