haldis/app/views/order.py

331 lines
12 KiB
Python
Raw Normal View History

2019-09-10 02:50:22 +02:00
"Script to generate the order related views of Haldis"
2015-03-31 20:15:22 +02:00
import random
2019-09-08 01:34:16 +02:00
import typing
2019-09-08 02:02:16 +02:00
from datetime import datetime
2019-09-10 02:50:22 +02:00
from werkzeug.wrappers import Response
2019-09-08 02:02:16 +02:00
# from flask import current_app as app
from flask import (Blueprint, abort, flash, redirect, render_template, request,
session, url_for, wrappers)
from flask_login import current_user, login_required
from forms import AnonOrderItemForm, OrderForm, OrderItemForm
from models import Order, OrderItem, User, db
2020-02-25 17:51:53 +01:00
from hlds.definitions import location_definitions, location_definition_version
from notification import post_order_to_webhook
from utils import ignore_none
2015-03-31 20:15:22 +02:00
2019-09-05 03:33:29 +02:00
order_bp = Blueprint("order_bp", "order")
2015-03-31 20:15:22 +02:00
2019-09-05 03:33:29 +02:00
@order_bp.route("/")
2019-09-08 01:34:16 +02:00
def orders(form: OrderForm = None) -> str:
2019-09-10 02:50:22 +02:00
"Generate general order view"
2015-03-31 20:15:22 +02:00
if form is None and not current_user.is_anonymous():
form = OrderForm()
2019-09-05 03:33:29 +02:00
location_id = request.args.get("location_id")
form.location_id.default = location_id
form.process()
2015-03-31 20:15:22 +02:00
form.populate()
2019-09-05 03:33:29 +02:00
return render_template("orders.html", orders=get_orders(), form=form)
2015-03-31 20:15:22 +02:00
2019-09-05 03:33:29 +02:00
@order_bp.route("/create", methods=["POST"])
2015-03-31 20:15:22 +02:00
@login_required
2019-09-08 01:34:16 +02:00
def order_create() -> typing.Union[str, Response]:
2019-09-10 02:50:22 +02:00
"Generate order create view"
2015-03-31 20:15:22 +02:00
orderForm = OrderForm()
orderForm.populate()
if orderForm.validate_on_submit():
order = Order()
orderForm.populate_obj(order)
order.update_from_hlds()
2015-03-31 20:15:22 +02:00
db.session.add(order)
db.session.commit()
2015-06-04 18:33:17 +02:00
post_order_to_webhook(order)
2019-09-10 02:50:22 +02:00
return redirect(url_for("order_bp.order_from_id", order_id=order.id))
2015-03-31 20:15:22 +02:00
return orders(form=orderForm)
2019-09-10 02:50:22 +02:00
@order_bp.route("/<order_id>")
def order_from_id(order_id: int, form: OrderForm = None, dish_id=None) -> str:
2019-09-10 02:50:22 +02:00
"Generate order view from id"
order = Order.query.filter(Order.id == order_id).first()
2015-03-31 20:15:22 +02:00
if order is None:
abort(404)
if current_user.is_anonymous() and not order.public:
2019-09-05 03:33:29 +02:00
flash("Please login to see this order.", "info")
2015-03-31 20:15:22 +02:00
abort(401)
if form is None:
2019-09-10 02:50:22 +02:00
form = AnonOrderItemForm() if current_user.is_anonymous() \
else OrderItemForm()
if order.location:
2020-02-21 18:38:30 +01:00
form.populate(order.location, None)
2020-01-27 03:52:29 +01:00
if order.is_closed():
2015-03-31 20:15:22 +02:00
form = None
total_price = sum([o.price for o in order.items])
debts = sum([o.price for o in order.items if not o.paid])
dish = order.location.dish_by_id(dish_id) if order.location else None
2019-09-10 02:50:22 +02:00
return render_template("order.html", order=order, form=form,
total_price=total_price, debts=debts, dish=dish)
2015-03-31 20:15:22 +02:00
2019-09-10 02:50:22 +02:00
@order_bp.route("/<order_id>/items")
def items_showcase(order_id: int) -> str:
"Generate order items view from id"
order = Order.query.filter(Order.id == order_id).first()
if order is None:
abort(404)
if current_user.is_anonymous() and not order.public:
2019-09-05 03:33:29 +02:00
flash("Please login to see this order.", "info")
abort(401)
total_price = sum([o.price for o in order.items])
return render_template("order_items.html", order=order, total_price=total_price)
2015-03-31 20:15:22 +02:00
2019-09-10 02:50:22 +02:00
@order_bp.route("/<order_id>/edit", methods=["GET", "POST"])
2015-06-04 19:11:08 +02:00
@login_required
2019-09-10 02:50:22 +02:00
def order_edit(order_id: int) -> typing.Union[str, Response]:
"Generate order edit view from id"
order = Order.query.filter(Order.id == order_id).first()
if current_user.id is not order.courier_id and \
2019-09-10 02:50:22 +02:00
not current_user.is_admin():
2015-06-04 21:36:57 +02:00
abort(401)
2015-06-04 19:11:08 +02:00
if order is None:
abort(404)
orderForm = OrderForm(obj=order)
orderForm.populate()
if orderForm.validate_on_submit():
orderForm.populate_obj(order)
order.update_from_hlds()
2015-06-04 19:11:08 +02:00
db.session.commit()
2019-09-10 02:50:22 +02:00
return redirect(url_for("order_bp.order_from_id", order_id=order.id))
return render_template("order_edit.html", form=orderForm,
order_id=order_id)
2015-03-31 20:15:22 +02:00
2020-02-21 18:38:30 +01:00
@order_bp.route("/<order_id>/create", methods=["GET", "POST"])
2019-09-10 02:50:22 +02:00
def order_item_create(order_id: int) -> typing.Any:
2019-09-08 01:34:16 +02:00
# type is 'typing.Union[str, Response]', but this errors due to
# https://github.com/python/mypy/issues/7187
2019-09-10 02:50:22 +02:00
"Add item to order from id"
current_order = Order.query.filter(Order.id == order_id).first()
2015-03-31 20:15:22 +02:00
if current_order is None:
abort(404)
2020-01-27 03:52:29 +01:00
if current_order.is_closed():
2015-03-31 20:15:22 +02:00
abort(404)
if current_user.is_anonymous() and not current_order.public:
2019-09-05 03:33:29 +02:00
flash("Please login to see this order.", "info")
2015-03-31 20:15:22 +02:00
abort(401)
2020-02-21 18:38:30 +01:00
location = current_order.location
# If location doesn't exist any more, adding items is nonsensical
if not location:
abort(404)
2019-09-10 02:50:22 +02:00
form = AnonOrderItemForm() if current_user.is_anonymous() \
else OrderItemForm()
2020-02-21 18:38:30 +01:00
dish_id = form.dish_id.data if form.is_submitted() else request.args.get("dish")
2020-02-21 18:38:30 +01:00
if dish_id and not location.dish_by_id(dish_id):
abort(404)
form.dish_id.data = dish_id
2020-02-21 18:38:30 +01:00
form.populate(current_order.location, dish_id)
# If the form was not submitted (GET request), the form had errors,
# or the dish was changed: show form again
dish_was_changed = request.form.get("form_for_dish_id") and request.form["form_for_dish_id"] != dish_id
if not form.validate_on_submit() or dish_was_changed:
return order_from_id(order_id, form=form, dish_id=dish_id)
2020-02-21 18:38:30 +01:00
# Form was submitted and is valid
# The form's validation tests that dish_id is valid and gives a friendly error if it's not
choices = location.dish_by_id(form.dish_id.data).choices
chosen = [
(
choice.option_by_id(request.form.get("choice_" + choice.id))
if choice_type == "single_choice" else
list(ignore_none(request.form.getlist("choice_" + choice.id, type=choice.option_by_id)))
)
for (choice_type, choice) in choices
]
all_choices_present = all(x is not None for x in chosen)
2020-02-21 18:38:30 +01:00
if not all_choices_present:
return redirect(url_for("order_bp.order_item_create",
order_id=order_id, dish=form.dish_id.data))
2020-02-21 18:38:30 +01:00
item = OrderItem()
form.populate_obj(item)
2020-02-25 17:51:53 +01:00
item.hlds_data_version = location_definition_version
2020-02-21 18:38:30 +01:00
item.order_id = order_id
if not current_user.is_anonymous():
item.user_id = current_user.id
else:
session["anon_name"] = item.name
2020-02-24 00:42:24 +01:00
# XXX Temporary until OrderItemChoice is used
def _name(option):
try:
return option.name
except AttributeError:
return ", ".join(o.name for o in option)
comments = [_name(option) for option in chosen if option]
if item.comment:
comments.append("Comment: " + item.comment)
item.comment = "; ".join(comments)
2020-02-21 18:38:30 +01:00
item.update_from_hlds()
2020-02-24 00:42:24 +01:00
# XXX Temporary until OrderItemChoice is used. Move this price calculation to update_from_hlds
# when in OrderItemChoice is in place.
def _price(option):
try:
return option.price or 0
except AttributeError:
return sum(o.price or 0 for o in option)
item.price += sum(_price(option) for option in chosen)
2020-02-21 18:38:30 +01:00
db.session.add(item)
db.session.commit()
flash("Ordered %s" % (item.dish_name), "success")
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2015-03-31 20:15:22 +02:00
2015-06-04 21:20:38 +02:00
@order_bp.route("/<order_id>/<item_id>/paid", methods=["POST"])
2015-06-04 21:20:38 +02:00
@login_required
2019-09-10 02:50:22 +02:00
# pylint: disable=R1710
2019-09-08 01:34:16 +02:00
def item_paid(order_id: int, item_id: int) -> typing.Optional[Response]:
2019-09-10 02:50:22 +02:00
"Indicate payment status for an item in an order"
2015-06-04 21:20:38 +02:00
item = OrderItem.query.filter(OrderItem.id == item_id).first()
2019-09-10 02:50:22 +02:00
user_id = current_user.id
if item.order.courier_id == user_id or current_user.admin:
2015-06-04 21:20:38 +02:00
item.paid = True
db.session.commit()
flash("Paid %s by %s" % (item.dish_name, item.get_name()),
2019-09-10 02:50:22 +02:00
"success")
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2015-06-04 21:20:38 +02:00
abort(404)
@order_bp.route("/<order_id>/<user_name>/user_paid", methods=["POST"])
2015-06-04 21:20:38 +02:00
@login_required
2019-09-10 02:50:22 +02:00
# pylint: disable=R1710
def items_user_paid(order_id: int, user_name: str) -> typing.Optional[Response]:
2019-09-10 02:50:22 +02:00
"Indicate payment status for a user in an order"
2015-06-04 21:20:38 +02:00
user = User.query.filter(User.username == user_name).first()
2019-09-08 01:34:16 +02:00
items: typing.List[OrderItem] = []
2015-06-04 21:20:38 +02:00
if user:
2019-09-05 03:33:29 +02:00
items = OrderItem.query.filter(
(OrderItem.user_id == user.id) & (OrderItem.order_id == order_id)
).all()
2015-06-04 21:20:38 +02:00
else:
2019-09-05 03:33:29 +02:00
items = OrderItem.query.filter(
(OrderItem.name == user_name) & (OrderItem.order_id == order_id)
).all()
2015-06-04 21:20:38 +02:00
current_order = Order.query.filter(Order.id == order_id).first()
for item in items:
print(item)
if current_order.courier_id == current_user.id or current_user.admin:
2015-06-04 21:20:38 +02:00
for item in items:
item.paid = True
db.session.commit()
2019-09-10 02:50:22 +02:00
flash("Paid %d items for %s" %
(len(items), item.get_name()), "success")
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2015-06-04 21:20:38 +02:00
abort(404)
@order_bp.route("/<order_id>/<item_id>/delete", methods=["POST"])
2019-09-10 02:50:22 +02:00
# pylint: disable=R1710
2019-09-08 01:34:16 +02:00
def delete_item(order_id: int, item_id: int) -> typing.Any:
# type is 'typing.Optional[Response]', but this errors due to
# https://github.com/python/mypy/issues/7187
2019-09-10 02:50:22 +02:00
"Delete an item from an order"
2015-03-31 20:15:22 +02:00
item = OrderItem.query.filter(OrderItem.id == item_id).first()
2019-09-10 02:50:22 +02:00
user_id = None
2015-03-31 20:15:22 +02:00
if not current_user.is_anonymous():
print("%s tries to delete orders" % (current_user.username))
2019-09-10 02:50:22 +02:00
user_id = current_user.id
if item.can_delete(order_id, user_id, session.get("anon_name", "")):
dish_name = item.dish_name
2015-03-31 20:15:22 +02:00
db.session.delete(item)
db.session.commit()
flash("Deleted %s" % (dish_name), "success")
2019-09-10 02:50:22 +02:00
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2015-03-31 20:15:22 +02:00
abort(404)
2019-09-10 02:50:22 +02:00
@order_bp.route("/<order_id>/volunteer", methods=["POST"])
2015-03-31 20:15:22 +02:00
@login_required
2019-09-10 02:50:22 +02:00
def volunteer(order_id: int) -> Response:
"Add a volunteer to an order"
order = Order.query.filter(Order.id == order_id).first()
2015-03-31 20:15:22 +02:00
if order is None:
abort(404)
if order.courier_id is None or order.courier_id == 0:
order.courier_id = current_user.id
2015-03-31 20:15:22 +02:00
db.session.commit()
flash("Thank you for volunteering!")
else:
flash("Volunteering not possible!")
2019-09-10 02:50:22 +02:00
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2015-03-31 20:15:22 +02:00
2019-09-10 02:50:22 +02:00
@order_bp.route("/<order_id>/close", methods=["POST"])
2015-03-31 20:15:22 +02:00
@login_required
2019-09-10 02:50:22 +02:00
def close_order(order_id: int) -> typing.Optional[Response]:
"Close an order"
order = Order.query.filter(Order.id == order_id).first()
2015-03-31 20:15:22 +02:00
if order is None:
abort(404)
2020-01-27 03:52:29 +01:00
if (current_user.id == order.courier_id or current_user.is_admin()) and not order.is_closed():
2015-03-31 20:15:22 +02:00
order.stoptime = datetime.now()
if order.courier_id == 0 or order.courier_id is None:
courier = select_user(order.items)
print(courier)
if courier is not None:
order.courier_id = courier.id
2015-03-31 20:15:22 +02:00
db.session.commit()
2019-09-10 02:50:22 +02:00
return redirect(url_for("order_bp.order_from_id", order_id=order_id))
2019-09-08 01:34:16 +02:00
# The line below is to make sure mypy doesn't say
# "Missing return statement"
# https://github.com/python/mypy/issues/4223
return None
2015-03-31 20:15:22 +02:00
2019-09-08 01:34:16 +02:00
def select_user(items) -> typing.Optional[User]:
2019-09-10 02:50:22 +02:00
"Select a random user from those who are signed up for the order"
2015-03-31 20:15:22 +02:00
user = None
# remove non users
items = [i for i in items if i.user_id]
2019-09-10 02:50:22 +02:00
if not items:
2015-03-31 20:15:22 +02:00
return None
while user is None:
item = random.choice(items)
user = item.user
if user:
if random.randint(user.bias, 100) < 80:
user = None
return user
2019-09-08 01:34:16 +02:00
def get_orders(expression=None) -> typing.List[Order]:
2019-09-10 02:50:22 +02:00
"Give the list of all currently open and public Orders"
order_list: typing.List[OrderForm] = []
2015-03-31 20:15:22 +02:00
if expression is None:
2019-09-05 03:33:29 +02:00
expression = (datetime.now() > Order.starttime) & (
Order.stoptime > datetime.now()
2019-09-10 02:50:22 +02:00
# pylint: disable=C0121
2019-09-05 03:33:29 +02:00
) | (Order.stoptime == None)
2015-03-31 20:15:22 +02:00
if not current_user.is_anonymous():
2019-09-10 02:50:22 +02:00
order_list = Order.query.filter(expression).all()
2015-03-31 20:15:22 +02:00
else:
2019-09-10 02:50:22 +02:00
order_list = Order.query.filter(
# pylint: disable=C0121
(expression & (Order.public == True))).all()
return order_list