245 lines
7 KiB
Python
245 lines
7 KiB
Python
from json import load
|
|
from os import environ
|
|
|
|
from flask import Flask, flash, redirect, request, session, url_for
|
|
from flask_babel import Babel, gettext
|
|
from flask_wtf.csrf import CSRFError, CSRFProtect
|
|
from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD, MODIFY_DELETE
|
|
from ldap3.core.exceptions import LDAPException
|
|
from ldap3.utils.dn import escape_rdn
|
|
|
|
from .helpers.flask import template
|
|
from .helpers.ldap import (
|
|
admin_required,
|
|
get_user,
|
|
login_required,
|
|
try_auth,
|
|
update_user,
|
|
update_user_password,
|
|
)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = environ.get("FLASK_SECRET_KEY", default="test")
|
|
app.config["LANGUAGES"] = {"en": "English", "de": "Deutsch"}
|
|
|
|
babel = Babel(app)
|
|
csrf = CSRFProtect(app)
|
|
|
|
with open(environ["APP_CONFIG"]) as f:
|
|
APP_CONFIG = load(f)
|
|
|
|
|
|
@babel.localeselector
|
|
def get_locale():
|
|
return request.accept_languages.best_match(app.config["LANGUAGES"].keys())
|
|
|
|
|
|
@app.errorhandler(CSRFError)
|
|
def handle_csrf_error(e):
|
|
flash(
|
|
gettext("CRSF validation error. For your own safety, you have been logged out.")
|
|
)
|
|
|
|
session["is_logged_in"] = False
|
|
session["username"] = ""
|
|
session["password"] = ""
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/")
|
|
def slash():
|
|
if session.get("is_logged_in"):
|
|
return redirect(url_for("selfservice"))
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
session["is_logged_in"] = False
|
|
|
|
if request.method == "POST":
|
|
if try_auth(
|
|
escape_rdn(request.form["username"]),
|
|
request.form["password"],
|
|
):
|
|
session["is_logged_in"] = True
|
|
session["username"] = escape_rdn(request.form["username"])
|
|
session["password"] = request.form["password"]
|
|
|
|
flash(gettext("logged in"))
|
|
|
|
return redirect(url_for("selfservice"))
|
|
else:
|
|
flash(gettext("username or password is wrong"))
|
|
|
|
return template(None, "login.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session["is_logged_in"] = False
|
|
session["username"] = ""
|
|
session["password"] = ""
|
|
|
|
flash(gettext("you have been logged out"))
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/selfservice", methods=["GET", "POST"])
|
|
@login_required
|
|
def selfservice(ldap):
|
|
if request.method == "POST":
|
|
if request.form.get("userdata"):
|
|
try:
|
|
update_user(
|
|
ldap,
|
|
session["username"],
|
|
{
|
|
"givenName": request.form["givenName"],
|
|
"sn": request.form["sn"],
|
|
"cn": "{} {}".format(
|
|
request.form["givenName"],
|
|
request.form["sn"],
|
|
),
|
|
"externalMail": request.form["externalMail"],
|
|
},
|
|
)
|
|
flash(gettext("user data was updated"))
|
|
except LDAPException as e:
|
|
app.logger.error(
|
|
"Updating {} failed: {}\n{}".format(
|
|
APP_CONFIG["template"]["user_dn"].format(session["username"]),
|
|
repr(e),
|
|
repr(request.form),
|
|
),
|
|
)
|
|
flash(e)
|
|
elif request.form.get("passwordchange"):
|
|
validated = (True,)
|
|
if not try_auth(
|
|
session["username"],
|
|
request.form["current"],
|
|
):
|
|
validated = False
|
|
flash(gettext("current password does not match the one stored"))
|
|
|
|
if request.form["new"] != request.form["repeat"]:
|
|
validated = False
|
|
flash(gettext("new passwords do not match"))
|
|
|
|
if len(request.form["new"]) < 12:
|
|
validated = False
|
|
flash(gettext("new password must be atleast 12 characters long"))
|
|
|
|
if validated:
|
|
try:
|
|
update_user_password(
|
|
ldap,
|
|
session["username"],
|
|
request.form["new"],
|
|
)
|
|
session["password"] = request.form["new"]
|
|
flash(gettext("your password was changed"))
|
|
except LDAPException as e:
|
|
app.logger.error(
|
|
"Updating {} failed: {}".format(
|
|
APP_CONFIG["template"]["user_dn"].format(
|
|
session["username"]
|
|
),
|
|
repr(e),
|
|
),
|
|
)
|
|
flash(e)
|
|
|
|
return redirect(url_for("selfservice"))
|
|
|
|
return template(ldap, "selfservice.html")
|
|
|
|
|
|
@app.route("/groups", methods=["GET"])
|
|
@login_required
|
|
def groups(ldap):
|
|
ldap.search(
|
|
APP_CONFIG["ldap"]["group_base"],
|
|
"(objectclass=groupOfNames)",
|
|
attributes=ALL_ATTRIBUTES,
|
|
)
|
|
|
|
return template(
|
|
ldap,
|
|
"groups/list.html",
|
|
groups=ldap.entries,
|
|
)
|
|
|
|
|
|
@app.route("/groups/<ou>", methods=["GET", "POST"])
|
|
@admin_required
|
|
def group_edit(ldap, ou):
|
|
ou = escape_rdn(ou)
|
|
|
|
if request.method == "POST":
|
|
if request.form.get("remove"):
|
|
ldap.modify(
|
|
APP_CONFIG["template"]["group_dn"].format(ou),
|
|
{
|
|
"member": [
|
|
(
|
|
MODIFY_DELETE,
|
|
APP_CONFIG["template"]["user_dn"].format(
|
|
escape_rdn(request.form["remove"])
|
|
),
|
|
)
|
|
]
|
|
},
|
|
)
|
|
flash(
|
|
gettext(
|
|
"%(user)s was removed from %(ou)s",
|
|
user=request.form["remove"],
|
|
ou=ou,
|
|
)
|
|
)
|
|
elif request.form.get("add"):
|
|
ldap.modify(
|
|
APP_CONFIG["template"]["group_dn"].format(ou),
|
|
{
|
|
"member": [
|
|
(
|
|
MODIFY_ADD,
|
|
APP_CONFIG["template"]["user_dn"].format(
|
|
escape_rdn(request.form["add"])
|
|
),
|
|
)
|
|
]
|
|
},
|
|
)
|
|
flash(
|
|
gettext(
|
|
"%(user)s was added to %(ou)s", user=request.form["add"], ou=ou
|
|
)
|
|
)
|
|
|
|
return redirect(url_for("group_edit", ou=ou))
|
|
|
|
ldap.search(
|
|
APP_CONFIG["ldap"]["user_base"],
|
|
APP_CONFIG["template"]["group_nonmembers"].format(ou),
|
|
attributes=["cn", "uid"],
|
|
)
|
|
users = ldap.entries
|
|
|
|
ldap.search(
|
|
APP_CONFIG["ldap"]["user_base"],
|
|
APP_CONFIG["template"]["group_members"].format(ou),
|
|
attributes=ALL_ATTRIBUTES,
|
|
)
|
|
|
|
return template(
|
|
ldap,
|
|
"groups/members.html",
|
|
members=ldap.entries,
|
|
ou=ou,
|
|
other_users=users,
|
|
)
|