import logging import requests from django.contrib.auth import login from django.http.request import HttpRequest from django.shortcuts import redirect from users.models import CustomUser USER_API_URI = 'https://adams.ugent.be/oauth/api/current_user/' ACCESS_TOKEN_URI = 'https://adams.ugent.be/oauth/oauth2/token/' AUTHORIZE_URI = 'https://adams.ugent.be/oauth/oauth2/authorize/' CLIENT_ID = 'tomtest' CLIENT_SECRET = 'blargh' logger = logging.getLogger(__file__) class OAuthException(Exception): pass def register(_): RESPONSE_TYPE = 'code' REDIRECT_URI = 'http://localhost:8000/login/zeus/authorized' return redirect(f'{AUTHORIZE_URI}?response_type={RESPONSE_TYPE}&client_id={CLIENT_ID}&redirect_uri={REDIRECT_URI}') def register_callback(req: HttpRequest): code = req.GET['code'] response = requests.post(ACCESS_TOKEN_URI, data={'code': code, 'grant_type': 'authorization_code', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'redirect_uri': 'http://localhost:8000/login/zeus/authorized'}) try: if response.status_code == 200: json: dict = response.json() # TODO: maybe later do something with the refresh token. user: dict = user_info(json['access_token']) if 'username' not in user.keys() or 'id' not in user.keys(): raise OAuthException(f'username and id are expected values: {user}') else: logger.debug(f'Succesfully authenticated user: {user["username"]} with id: {user["id"]}') validated_user = validate_user(user['zeus_id'], user['username']) login(req, validated_user) redirect('/') else: raise OAuthException(f'Status code not 200, response: {response.json()}') except OAuthException as e: logger.error(e) return register('') def validate_user(zeus_id, username) -> CustomUser: user = CustomUser.objects.get(zeus_id=zeus_id) if user is None: return CustomUser.objects.create_user(zeus_id, username) user.username = username user.save() return user def user_info(access_token): r = requests.get(USER_API_URI, headers={'Authorization': f'Bearer {access_token}'}) return r.json()