kers/oauth/views.py
2020-07-25 01:04:58 +02:00

87 lines
2.9 KiB
Python

import logging
import requests
from django.conf import settings
from django.contrib.auth import login
from django.http.request import HttpRequest
from django.shortcuts import redirect, render
import users
from users.models import CustomUser
logger = logging.getLogger(__file__)
class OAuthException(Exception):
pass
def register(_):
RESPONSE_TYPE = 'code'
return redirect(f'{settings.OAUTH["AUTHORIZE_URI"]}?'
f'response_type={RESPONSE_TYPE}&'
f'client_id={settings.OAUTH["CLIENT_ID"]}&'
f'redirect_uri={settings.OAUTH["REDIRECT_URI"]}')
def register_callback(req: HttpRequest):
if 'code' not in req.GET or 'error' in req.GET:
error = req.GET['error'] if 'error' in req.GET else None
return login_fail(req, error)
try:
access_token = get_access_token(req.GET['code'])
user_info = get_user_info(access_token)
logger.debug(f'Succesfully authenticated user: {user_info["username"]} with id: {user_info["id"]}')
validated_user = validate_user(user_info['id'], user_info['username'])
login(req, validated_user)
return redirect('/')
except OAuthException as e:
logger.error(e)
return login_fail(req, str(e))
def login_fail(request, error: str = None):
return render(request, "oauth/failed.html", {'error': error})
def validate_user(zeus_id, username) -> CustomUser:
try:
user = CustomUser.objects.get(zeus_id=zeus_id)
user.username = username
user.save()
return user
except users.models.CustomUser.DoesNotExist:
return CustomUser.objects.create_user(zeus_id, username)
def get_access_token(code):
response = requests.post(
settings.OAUTH["ACCESS_TOKEN_URI"],
data={'code': code,
'grant_type': 'authorization_code',
'client_id': settings.OAUTH["CLIENT_ID"],
'client_secret': settings.OAUTH["CLIENT_SECRET"],
'redirect_uri': settings.OAUTH["REDIRECT_URI"]},
headers={'Referer': f'{settings.SERVER_URL}/login/zeus/register'})
if response.status_code != 200:
raise OAuthException(
f'Status code {response.status_code} when requesting access token.\nresponse: {response.text}')
if 'access_token' not in response.json():
raise OAuthException('Got status code 200 but no access_token')
return response.json()['access_token']
def get_user_info(access_token):
response = requests.get(
settings.OAUTH["USER_API_URI"],
headers={'Authorization': f'Bearer {access_token}'},
)
if response.status_code != 200:
raise OAuthException(
f'Status code {response.status_code} when requesting user info.\nresponse: {response.text}')
if 'username' not in response.json() or 'id' not in response.json():
raise OAuthException(f'username and id are expected values: {response.json()}')
return response.json()