mattermore/app/app.py

316 lines
9.5 KiB
Python
Raw Normal View History

2018-09-24 20:57:36 +02:00
import json
from functools import wraps
2019-02-14 20:41:07 +01:00
from flask import Flask, request, Response, abort, render_template, send_file, jsonify
2018-09-24 20:57:36 +02:00
from flask_sqlalchemy import SQLAlchemy
2019-02-13 20:31:21 +01:00
from flask_migrate import Migrate
2019-02-19 12:15:19 +01:00
from datetime import datetime
from mattermostdriver import Driver
2018-09-24 20:57:36 +02:00
import requests
import config
2019-02-14 01:04:26 +01:00
import random
2019-06-12 14:24:36 +02:00
2018-09-24 20:57:36 +02:00
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = config.DATABASE_URL
# Supress Flask warning
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
2019-02-13 20:31:21 +01:00
migrate = Migrate(app, db)
2018-09-24 20:57:36 +02:00
2018-09-24 23:09:14 +02:00
response_setting = "in_channel"
2018-09-24 20:57:36 +02:00
from app import models
# Login driver: used to send messages to Mattermost
mm_driver = Driver({
'port': 443,
'url': config.server_url,
'token': config.mm_driver_token
})
mm_driver.login()
2018-09-24 20:57:36 +02:00
def check_regular(username):
'''Check if a user has the permissions of a regular user.'''
return models.User.query.filter_by(username=username, authorized=True).first() is not None
def check_admin(username):
'''Check if a user is an admin'''
return models.User.query.filter_by(username=username, authorized=True, admin=True).first() is not None
def requires_regular(f):
'''Decorator to require a regular user'''
@wraps(f)
def decorated(*args, **kwargs):
username = request.values.get('user_name')
if not username or not check_regular(username):
return abort(401)
return f(username, *args, **kwargs)
return decorated
def requires_admin(f):
'''Decorator to require an admin user'''
@wraps(f)
def decorated(*args, **kwargs):
username = request.values.get('user_name')
if not username or not check_admin(username):
return abort(401)
return f(username, *args, **kwargs)
return decorated
def requires_token(token_name):
'''Decorator to require a correct Mattermost token'''
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
expected_token = config.tokens[token_name]
token = request.values.get('token')
if expected_token != token:
return abort(401)
return f(*args, **kwargs)
return decorated_function
return decorator
2018-10-02 01:01:09 +02:00
def mattermost_response(message, ephemeral=False):
response_dict = {"response_type": "ephemeral" if ephemeral else "in_channel",
2018-09-24 20:57:36 +02:00
"text": message}
return Response(json.dumps(response_dict), mimetype="application/json")
2019-06-05 13:28:05 +02:00
# Removes @ from username if @ was prepended
def get_actual_username(username):
return username.lstrip('@')
2018-09-24 20:57:36 +02:00
@app.route('/authorize', methods=['POST'])
@requires_token('authorize')
@requires_admin
def authorize(admin_username):
'''Slash-command to authorize a new user or modify an existing user'''
tokens = request.values.get('text').strip().split()
2019-06-05 13:28:05 +02:00
to_authorize = get_actual_username(tokens[0])
2018-09-24 20:57:36 +02:00
as_admin = len(tokens) == 2 and tokens[1] == 'admin'
user = models.User.query.filter_by(username=to_authorize).first()
if not user:
user = models.User(to_authorize)
user.authorized = True
2018-09-24 22:59:30 +02:00
user.admin = as_admin or user.admin
2018-09-24 20:57:36 +02:00
db.session.add(user)
db.session.commit()
2018-09-24 22:59:30 +02:00
if user.admin:
return mattermost_response("'{}' is now an admin".format(to_authorize))
else:
return mattermost_response("'{}' is now a regular user".format(to_authorize))
@app.route('/revoke', methods=['POST'])
@requires_token('revoke')
@requires_admin
def revoke(admin_username):
'''Slash-command to revoke a user'''
tokens = request.values.get('text').strip().split()
2019-06-05 13:28:05 +02:00
to_revoke = get_actual_username(tokens[0])
2018-09-24 23:07:16 +02:00
user = models.User.query.filter_by(username=to_revoke).first()
2018-09-24 22:59:30 +02:00
if not user:
return mattermost_response("Could not find '{}'".format(to_revoke))
2018-09-24 23:07:16 +02:00
if user.admin:
return mattermost_response("Can't revoke admin user")
2018-09-24 22:59:30 +02:00
user.authorized = False
db.session.add(user)
db.session.commit()
return mattermost_response("'{}' revoked".format(to_revoke))
2018-09-24 20:57:36 +02:00
def slotmachien_request(username, command):
r = requests.post(config.slotmachien_url, json={
2018-09-24 20:57:36 +02:00
'username': username, 'token': config.slotmachien_token, 'text': command})
return r.text
@app.route('/door', methods=['POST'])
@requires_token('door')
@requires_regular
def door(username):
tokens = request.values.get('text').strip().split()
command = tokens[0].lower()
2018-10-02 01:01:09 +02:00
return mattermost_response(slotmachien_request(username, command), ephemeral=True)
2018-10-03 16:21:02 +02:00
@app.route('/doorkeeper', methods=['POST'])
@requires_token('doorkeeper')
def doorkeeper():
username = request.values.get('user').strip()
command = request.values.get('command').strip()
if command == 'open':
msg = '%s has opened the door' % (username)
elif command == 'close':
msg = '%s has closed the door' % (username)
elif command == 'delay':
msg = 'Door is closing in 10 seconds by %s' % (username)
else:
msg = 'I\'m sorry Dave, I\'m afraid I can\'t do that'
resp = mm_driver.posts.create_post(options={
'channel_id': config.doorkeeper_channel_id,
'message': msg
})
if resp is not None:
return Response(status=200)
else:
return Response(status=500)
2019-02-14 19:09:08 +01:00
2018-10-03 16:21:02 +02:00
@app.route('/cammiechat', methods=['POST'])
@requires_token('cammiechat')
@requires_regular
def cammiechat(username):
headers = {
2019-02-14 19:09:08 +01:00
"X-Username": username
2018-10-03 16:21:02 +02:00
}
requests.post("https://kelder.zeus.ugent.be/messages/", data=request.values.get('text').strip(), headers=headers)
return mattermost_response("Message sent", ephemeral=True)
2018-10-03 01:18:00 +02:00
@app.route('/addquote', methods=['POST'])
@requires_token('quote')
def add_quote():
user = request.values['user_name']
channel = request.values['channel_name']
quote_text = request.values['text']
quote = models.Quote(user, quote_text, channel)
db.session.add(quote)
db.session.commit()
2019-02-13 23:52:39 +01:00
return mattermost_response("{} added the quote \"{}\"".format(user, quote_text))
2018-10-03 01:18:00 +02:00
2019-02-14 19:09:08 +01:00
@app.route('/quote', methods=['POST'])
2019-02-14 01:04:26 +01:00
def random_quote():
text_contains = request.values['text']
2019-02-14 19:09:08 +01:00
matches = models.Quote.query.filter(models.Quote.quote.contains(text_contains)).all()
if matches:
selected_quote = random.choice(matches)
2019-02-14 19:55:55 +01:00
response = selected_quote.quote
2019-02-14 19:09:08 +01:00
return mattermost_response(response)
2019-02-14 19:55:55 +01:00
return mattermost_response('No quotes found matching "{}"'.format(text_contains), ephemeral=True)
2019-02-14 01:04:26 +01:00
2019-02-14 20:07:24 +01:00
@app.route('/robots.txt', methods=['GET'])
def get_robots():
2019-03-27 22:05:43 +01:00
return send_file('static/robots.txt')
2019-02-14 01:04:26 +01:00
2019-02-14 20:41:07 +01:00
2019-03-28 01:33:23 +01:00
@app.route('/fonts/<filename>', methods=['GET'])
def get_font(filename):
if not re.fullmatch(r'[a-zA-Z0-9][a-zA-Z0-9._-]*\.(?:otf|svg|woff2?)', filename):
return abort(404)
return send_file('static/fonts/' + filename)
2019-03-27 23:40:21 +01:00
@app.route('/quotes.css', methods=['GET'])
def get_quote_css():
return send_file('static/quotes.css')
2019-02-14 01:04:26 +01:00
2019-02-14 20:41:07 +01:00
@app.route('/quotes.html', methods=['GET'])
2018-10-03 01:18:00 +02:00
def list_quotes():
2019-02-14 19:09:08 +01:00
return render_template('quotes.html', quotes=reversed(models.Quote.query.all()))
2019-02-14 20:41:07 +01:00
@app.route('/quotes.json', methods=['GET'])
def json_quotes():
all_quotes = models.Quote.query.all()
2019-03-21 21:35:00 +01:00
response = jsonify(list({
2019-02-14 20:41:07 +01:00
'quoter': q.quoter,
'quotee': q.quotee,
'channel': q.channel,
'quote': q.quote,
'created_at': q.created_at.isoformat()
} for q in all_quotes))
2019-03-21 21:35:00 +01:00
response.headers.add('Access-Control-Allow-Origin', '*')
return response
2019-02-19 12:15:19 +01:00
2019-02-19 12:36:28 +01:00
RESTO_TEMPLATE = """
# Restomenu
2019-02-19 12:36:28 +01:00
2019-03-18 11:46:09 +01:00
## Woater me e smaksje
2019-02-19 12:36:28 +01:00
{soup_table}
2019-03-18 11:46:09 +01:00
## Vleesch
2019-02-19 12:36:28 +01:00
{meat_table}
2019-03-18 11:46:09 +01:00
## Visch
2019-02-19 12:36:28 +01:00
{fish_table}
## Vegetoarisch
2019-02-19 12:36:28 +01:00
{vegi_table}
{uncategorized}
2019-03-18 11:46:09 +01:00
## Groensels
2019-02-19 12:36:28 +01:00
{vegetable_table}
"""
UNCATEGORIZED_TEMPLATE = """
## Nog etwad anders...
{}
"""
RESTO_TABLES = {
"soup_table": {"soup"},
"meat_table": {"meat"},
"fish_table": {"fish"},
"vegi_table": {"vegetarian", "vegan"},
}
2019-02-19 12:15:19 +01:00
@app.route('/resto', methods=['GET'])
def resto_menu():
today = datetime.today()
url = "https://zeus.ugent.be/hydra/api/2.0/resto/menu/nl/{}/{}/{}.json"\
.format(today.year, today.month, today.day)
resto = requests.get(url).json()
if not resto["open"]:
2019-02-19 12:31:00 +01:00
return 'De resto is vandaag gesloten.'
2019-02-19 12:15:19 +01:00
else:
def table_for(kinds):
items = [meal for meal in resto["meals"] if meal["kind"] in kinds]
return format_items(items)
def format_items(items):
if not items:
2019-03-18 11:56:32 +01:00
return "None :("
maxwidth = max(len(item["name"]) for item in items)
2019-02-19 12:15:19 +01:00
return "\n".join("{name: <{width}}{price}".format(
name=item["name"],
width=maxwidth + 2,
price=item["price"])
for item in items
)
recognized_kinds = set.union(*RESTO_TABLES.values())
uncategorized_meals = [
meal for meal in resto["meals"]
if meal["kind"] not in recognized_kinds
]
uncategorized = "" if not uncategorized_meals else \
UNCATEGORIZED_TEMPLATE.format(
format_items([
# Small hack: change "name" into "name (kind)"
{**meal, "name": "{} ({})".format(meal["name"], meal["kind"])}
for meal in uncategorized_meals
])
)
2019-02-19 12:36:28 +01:00
return RESTO_TEMPLATE.format(
**{k: table_for(v) for k,v in RESTO_TABLES.items()},
uncategorized=uncategorized,
2019-02-19 12:15:19 +01:00
vegetable_table="\n".join(resto["vegetables"])
2019-02-19 12:31:00 +01:00
)
2019-02-19 12:15:19 +01:00
2019-02-19 12:31:00 +01:00
@app.route('/resto.json', methods=['GET'])
def resto_menu_json():
return mattermost_response(resto_menu(), ephemeral=True)