haldis/app/auth/zeus.py

90 lines
2.8 KiB
Python
Raw Permalink Normal View History

2019-09-10 15:17:35 +02:00
"Script containing everything specific to ZeusWPI"
2019-09-08 01:58:21 +02:00
import typing
2022-04-19 22:03:00 +02:00
from flask import (Blueprint, current_app, flash, redirect, request, session,
url_for)
2017-01-06 12:05:31 +01:00
from flask_login import login_user
2022-04-20 01:27:52 +02:00
from flask_oauthlib.client import OAuth, OAuthException, OAuthRemoteApp
from models import User, db
2022-04-19 22:03:00 +02:00
from werkzeug.wrappers import Response
2015-03-31 20:15:22 +02:00
2022-04-20 01:27:52 +02:00
auth_zeus_bp = Blueprint("auth_zeus_bp", __name__)
2015-03-31 20:15:22 +02:00
def zeus_login():
2022-04-20 01:27:52 +02:00
"""Log in using ZeusWPI"""
return current_app.zeus.authorize(
2022-04-20 01:27:52 +02:00
callback=url_for("auth_zeus_bp.authorized", _external=True))
2015-03-31 20:15:22 +02:00
2022-04-20 01:27:52 +02:00
@auth_zeus_bp.route("/login")
def login():
"""Function to handle a user trying to log in"""
return zeus_login()
@auth_zeus_bp.route("/authorized")
2019-09-08 01:58:21 +02:00
def authorized() -> typing.Any:
# type is 'typing.Union[str, Response]', but this errors due to
# https://github.com/python/mypy/issues/7187
2022-04-20 01:27:52 +02:00
"""Check authorized status"""
resp = current_app.zeus.authorized_response()
2015-03-31 20:15:22 +02:00
if resp is None:
2022-04-19 22:03:00 +02:00
# pylint: disable=C0301
return f"Access denied: reason={request.args['error']} error={request.args['error_description']}"
2015-03-31 20:15:22 +02:00
if isinstance(resp, OAuthException):
2019-12-06 15:34:39 +01:00
return f"Access denied: {resp.message}<br>{resp.data}"
2015-03-31 20:15:22 +02:00
2019-09-05 03:33:29 +02:00
session["zeus_token"] = (resp["access_token"], "")
me = current_app.zeus.get("current_user/")
username = me.data.get("username", "").lower()
2015-03-31 20:15:22 +02:00
user = User.query.filter_by(username=username).first()
2019-09-10 15:17:35 +02:00
# pylint: disable=R1705
if username and user:
2015-03-31 20:15:22 +02:00
return login_and_redirect_user(user)
elif username:
2015-03-31 20:15:22 +02:00
user = create_user(username)
return login_and_redirect_user(user)
flash("You're not allowed to enter, please contact a system administrator")
return redirect(url_for("general_bp.home"))
2019-09-05 03:33:29 +02:00
2022-04-20 01:27:52 +02:00
def init_oauth(app) -> OAuthRemoteApp:
"""Initialize the OAuth for ZeusWPI"""
oauth = OAuth(app)
zeus = oauth.remote_app(
2019-09-05 03:33:29 +02:00
"zeus",
consumer_key=app.config["ZEUS_KEY"],
consumer_secret=app.config["ZEUS_SECRET"],
request_token_params={},
2019-09-05 03:33:29 +02:00
base_url="https://adams.ugent.be/oauth/api/",
access_token_method="POST",
access_token_url="https://adams.ugent.be/oauth/oauth2/token/",
authorize_url="https://adams.ugent.be/oauth/oauth2/authorize/",
)
2015-03-31 20:15:22 +02:00
2019-09-10 15:17:35 +02:00
# pylint: disable=W0612
@zeus.tokengetter
def get_zeus_oauth_token():
2019-09-05 03:33:29 +02:00
return session.get("zeus_token")
2015-03-31 20:15:22 +02:00
return zeus
2015-03-31 20:15:22 +02:00
2019-09-08 01:58:21 +02:00
def login_and_redirect_user(user) -> Response:
2022-04-20 01:27:52 +02:00
"""Log in the user and then redirect them"""
2015-03-31 20:15:22 +02:00
login_user(user)
return redirect(url_for("general_bp.home"))
2015-03-31 20:15:22 +02:00
2019-09-08 01:58:21 +02:00
def create_user(username) -> User:
2022-04-20 01:27:52 +02:00
"""Create a temporary user if it is needed"""
2015-03-31 20:15:22 +02:00
user = User()
user.configure(username, False, 1, associations=["zeus"])
2015-03-31 20:15:22 +02:00
db.session.add(user)
db.session.commit()
return user