import logging import requests from django.conf import settings from django.contrib.auth import login from django.http.request import HttpRequest from django.shortcuts import redirect, render import users from users.models import CustomUser logger = logging.getLogger(__file__) class OAuthException(Exception): pass def register(_): RESPONSE_TYPE = 'code' return redirect(f'{settings.OAUTH["AUTHORIZE_URI"]}?' f'response_type={RESPONSE_TYPE}&' f'client_id={settings.OAUTH["CLIENT_ID"]}&' f'redirect_uri={settings.OAUTH["REDIRECT_URI"]}') def register_callback(req: HttpRequest): if 'code' not in req.GET or 'error' in req.GET: error = req.GET['error'] if 'error' in req.GET else None return login_fail(req, error) try: access_token = get_access_token(req.GET['code']) user_info = get_user_info(access_token) logger.debug(f'Succesfully authenticated user: {user_info["username"]} with id: {user_info["id"]}') validated_user = validate_user(user_info['id'], user_info['username']) login(req, validated_user) return redirect('/') except OAuthException as e: logger.error(e) return login_fail(req, str(e)) def login_fail(request, error: str = None): return render(request, "oauth/failed.html", {'error': error}) def validate_user(zeus_id, username) -> CustomUser: try: user = CustomUser.objects.get(zeus_id=zeus_id) user.username = username user.save() return user except users.models.CustomUser.DoesNotExist: return CustomUser.objects.create_user(zeus_id, username) def get_access_token(code): response = requests.post( settings.OAUTH["ACCESS_TOKEN_URI"], data={'code': code, 'grant_type': 'authorization_code', 'client_id': settings.OAUTH["CLIENT_ID"], 'client_secret': settings.OAUTH["CLIENT_SECRET"], 'redirect_uri': settings.OAUTH["REDIRECT_URI"]}, headers={'Referer': f'{settings.SERVER_URL}/login/zeus/register'}) if response.status_code != 200: raise OAuthException( f'Status code {response.status_code} when requesting access token.\nresponse: {response.text}') if 'access_token' not in response.json(): raise OAuthException('Got status code 200 but no access_token') return response.json()['access_token'] def get_user_info(access_token): response = requests.get( settings.OAUTH["USER_API_URI"], headers={'Authorization': f'Bearer {access_token}'}, ) if response.status_code != 200: raise OAuthException( f'Status code {response.status_code} when requesting user info.\nresponse: {response.text}') if 'username' not in response.json() or 'id' not in response.json(): raise OAuthException(f'username and id are expected values: {response.json()}') return response.json()