2019-09-10 15:17:35 +02:00
|
|
|
"Script containing everything specific to ZeusWPI"
|
2019-09-08 01:58:21 +02:00
|
|
|
import typing
|
|
|
|
|
2022-04-19 22:03:00 +02:00
|
|
|
from flask import (Blueprint, current_app, flash, redirect, request, session,
|
|
|
|
url_for)
|
2017-01-06 12:05:31 +01:00
|
|
|
from flask_login import login_user
|
2019-09-08 01:58:21 +02:00
|
|
|
from flask_oauthlib.client import OAuth, OAuthException
|
2019-08-28 03:46:04 +02:00
|
|
|
from models import User, db
|
2022-04-19 22:03:00 +02:00
|
|
|
from werkzeug.wrappers import Response
|
2015-03-31 20:15:22 +02:00
|
|
|
|
2019-08-28 03:46:04 +02:00
|
|
|
oauth_bp = Blueprint("oauth_bp", __name__)
|
2015-03-31 20:15:22 +02:00
|
|
|
|
|
|
|
|
|
|
|
def zeus_login():
|
2019-09-10 15:17:35 +02:00
|
|
|
"Log in using ZeusWPI"
|
2019-08-28 03:46:04 +02:00
|
|
|
return current_app.zeus.authorize(
|
2022-04-19 22:03:00 +02:00
|
|
|
callback=url_for("oauth_bp.authorized", _external=True))
|
2015-03-31 20:15:22 +02:00
|
|
|
|
|
|
|
|
2019-09-05 03:33:29 +02:00
|
|
|
@oauth_bp.route("/login/zeus/authorized")
|
2019-09-08 01:58:21 +02:00
|
|
|
def authorized() -> typing.Any:
|
|
|
|
# type is 'typing.Union[str, Response]', but this errors due to
|
|
|
|
# https://github.com/python/mypy/issues/7187
|
2019-09-10 15:17:35 +02:00
|
|
|
"Check authorized status"
|
2019-08-28 03:46:04 +02:00
|
|
|
resp = current_app.zeus.authorized_response()
|
2015-03-31 20:15:22 +02:00
|
|
|
if resp is None:
|
2022-04-19 22:03:00 +02:00
|
|
|
# pylint: disable=C0301
|
|
|
|
return f"Access denied: reason={request.args['error']} error={request.args['error_description']}"
|
2015-03-31 20:15:22 +02:00
|
|
|
if isinstance(resp, OAuthException):
|
2019-12-06 15:34:39 +01:00
|
|
|
return f"Access denied: {resp.message}<br>{resp.data}"
|
2015-03-31 20:15:22 +02:00
|
|
|
|
2019-09-05 03:33:29 +02:00
|
|
|
session["zeus_token"] = (resp["access_token"], "")
|
|
|
|
me = current_app.zeus.get("current_user/")
|
|
|
|
username = me.data.get("username", "").lower()
|
2015-03-31 20:15:22 +02:00
|
|
|
|
|
|
|
user = User.query.filter_by(username=username).first()
|
2019-09-10 15:17:35 +02:00
|
|
|
# pylint: disable=R1705
|
|
|
|
if username and user:
|
2015-03-31 20:15:22 +02:00
|
|
|
return login_and_redirect_user(user)
|
2019-10-28 13:51:50 +01:00
|
|
|
elif username:
|
2015-03-31 20:15:22 +02:00
|
|
|
user = create_user(username)
|
|
|
|
return login_and_redirect_user(user)
|
|
|
|
|
|
|
|
flash("You're not allowed to enter, please contact a system administrator")
|
2019-08-28 03:46:04 +02:00
|
|
|
return redirect(url_for("general_bp.home"))
|
|
|
|
|
2019-09-05 03:33:29 +02:00
|
|
|
|
2019-08-28 03:46:04 +02:00
|
|
|
def init_oauth(app):
|
2019-09-10 15:17:35 +02:00
|
|
|
"Initialize the OAuth for ZeusWPI"
|
2019-08-28 03:46:04 +02:00
|
|
|
oauth = OAuth(app)
|
|
|
|
|
|
|
|
zeus = oauth.remote_app(
|
2019-09-05 03:33:29 +02:00
|
|
|
"zeus",
|
|
|
|
consumer_key=app.config["ZEUS_KEY"],
|
|
|
|
consumer_secret=app.config["ZEUS_SECRET"],
|
2019-08-28 03:46:04 +02:00
|
|
|
request_token_params={},
|
2019-09-05 03:33:29 +02:00
|
|
|
base_url="https://adams.ugent.be/oauth/api/",
|
|
|
|
access_token_method="POST",
|
|
|
|
access_token_url="https://adams.ugent.be/oauth/oauth2/token/",
|
|
|
|
authorize_url="https://adams.ugent.be/oauth/oauth2/authorize/",
|
|
|
|
)
|
2015-03-31 20:15:22 +02:00
|
|
|
|
2019-09-10 15:17:35 +02:00
|
|
|
# pylint: disable=W0612
|
2019-08-28 03:46:04 +02:00
|
|
|
@zeus.tokengetter
|
|
|
|
def get_zeus_oauth_token():
|
2019-09-05 03:33:29 +02:00
|
|
|
return session.get("zeus_token")
|
2015-03-31 20:15:22 +02:00
|
|
|
|
2019-08-28 03:46:04 +02:00
|
|
|
return zeus
|
2015-03-31 20:15:22 +02:00
|
|
|
|
|
|
|
|
2019-09-08 01:58:21 +02:00
|
|
|
def login_and_redirect_user(user) -> Response:
|
2019-09-10 15:17:35 +02:00
|
|
|
"Log in the user and then redirect them"
|
2015-03-31 20:15:22 +02:00
|
|
|
login_user(user)
|
2019-08-28 03:46:04 +02:00
|
|
|
return redirect(url_for("general_bp.home"))
|
2015-03-31 20:15:22 +02:00
|
|
|
|
|
|
|
|
2019-09-08 01:58:21 +02:00
|
|
|
def create_user(username) -> User:
|
2019-09-10 15:17:35 +02:00
|
|
|
"Create a temporary user if it is needed"
|
2015-03-31 20:15:22 +02:00
|
|
|
user = User()
|
2022-05-20 19:04:32 +02:00
|
|
|
user.configure(username, False, 1, associations=["zeus"])
|
2015-03-31 20:15:22 +02:00
|
|
|
db.session.add(user)
|
|
|
|
db.session.commit()
|
|
|
|
return user
|