kers/oauth/views.py

87 lines
2.9 KiB
Python
Raw Normal View History

2020-07-22 01:48:22 +00:00
import logging
import requests
2020-07-22 02:32:25 +00:00
from django.conf import settings
2020-07-22 02:05:34 +00:00
from django.contrib.auth import login
from django.http.request import HttpRequest
2020-07-22 19:34:19 +00:00
from django.shortcuts import redirect, render
2020-07-22 02:05:34 +00:00
2020-07-22 03:48:19 +00:00
import users
2020-07-22 02:05:34 +00:00
from users.models import CustomUser
2020-07-22 01:48:22 +00:00
logger = logging.getLogger(__file__)
2020-07-22 02:05:34 +00:00
2020-07-22 01:48:22 +00:00
class OAuthException(Exception):
pass
def register(_):
RESPONSE_TYPE = 'code'
2020-07-22 02:32:25 +00:00
return redirect(f'{settings.OAUTH["AUTHORIZE_URI"]}?'
f'response_type={RESPONSE_TYPE}&'
f'client_id={settings.OAUTH["CLIENT_ID"]}&'
f'redirect_uri={settings.OAUTH["REDIRECT_URI"]}')
2020-07-22 01:48:22 +00:00
def register_callback(req: HttpRequest):
2020-07-22 19:19:31 +00:00
if 'code' not in req.GET or 'error' in req.GET:
2020-07-22 19:34:19 +00:00
error = req.GET['error'] if 'error' in req.GET else None
return login_fail(req, error)
2020-07-22 19:19:31 +00:00
2020-07-22 01:48:22 +00:00
try:
2020-07-22 19:19:31 +00:00
access_token = get_access_token(req.GET['code'])
user_info = get_user_info(access_token)
logger.debug(f'Succesfully authenticated user: {user_info["username"]} with id: {user_info["id"]}')
validated_user = validate_user(user_info['id'], user_info['username'])
login(req, validated_user)
return redirect('/')
2020-07-22 01:48:22 +00:00
except OAuthException as e:
logger.error(e)
2020-07-22 19:34:19 +00:00
return login_fail(req, str(e))
2020-07-22 01:48:22 +00:00
2020-07-22 19:19:31 +00:00
2020-07-22 19:34:19 +00:00
def login_fail(request, error: str = None):
return render(request, "oauth/failed.html", {'error': error})
2020-07-22 01:48:22 +00:00
2020-07-22 02:05:34 +00:00
def validate_user(zeus_id, username) -> CustomUser:
2020-07-22 03:48:19 +00:00
try:
user = CustomUser.objects.get(zeus_id=zeus_id)
user.username = username
user.save()
return user
2020-07-22 19:19:31 +00:00
except users.models.CustomUser.DoesNotExist:
2020-07-22 02:05:34 +00:00
return CustomUser.objects.create_user(zeus_id, username)
2020-07-22 19:19:31 +00:00
def get_access_token(code):
response = requests.post(
settings.OAUTH["ACCESS_TOKEN_URI"],
data={'code': code,
'grant_type': 'authorization_code',
'client_id': settings.OAUTH["CLIENT_ID"],
'client_secret': settings.OAUTH["CLIENT_SECRET"],
'redirect_uri': settings.OAUTH["REDIRECT_URI"]})
2020-07-24 23:04:58 +00:00
if response.status_code != 200:
2020-07-22 19:19:31 +00:00
raise OAuthException(
f'Status code {response.status_code} when requesting access token.\nresponse: {response.text}')
if 'access_token' not in response.json():
raise OAuthException('Got status code 200 but no access_token')
return response.json()['access_token']
def get_user_info(access_token):
response = requests.get(
2020-07-22 03:28:46 +00:00
settings.OAUTH["USER_API_URI"],
headers={'Authorization': f'Bearer {access_token}'},
)
2020-07-24 23:04:58 +00:00
if response.status_code != 200:
2020-07-22 19:19:31 +00:00
raise OAuthException(
f'Status code {response.status_code} when requesting user info.\nresponse: {response.text}')
if 'username' not in response.json() or 'id' not in response.json():
raise OAuthException(f'username and id are expected values: {response.json()}')
return response.json()