wolkje-gedeelde-mappen/wolkje_files/img_store.py

152 lines
3.7 KiB
Python
Raw Normal View History

2023-05-02 17:38:05 +02:00
#!/usr/bin/env python3
import os
import time
import mimetypes
import random
import json
import re
from slugify import slugify
from wolkje_files import file_name as fn
class UnknownMimeTypeError(Exception):
pass
class NoSuchFolderError(Exception):
pass
class NoSuchFileError(Exception):
pass
class IllegalFileNameError(Exception):
pass
_SECRETS_CHARACTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
_FOLDER_ID_LENGTH = 22
_PASSWORD_LENGTH = 32
FOLDER_ID_PATTERN = re.compile(r"^[A-Za-z0-9]{%d}$" % _FOLDER_ID_LENGTH)
def generate_secret(length):
return "".join(
_SECRETS_CHARACTERS[random.randint(0, len(_SECRETS_CHARACTERS) - 1)]
for _ in range(length)
)
def generate_new_folder_id():
folder_id = generate_secret(_FOLDER_ID_LENGTH)
while os.path.exists(_get_folder_path(folder_id)):
folder_id = generate_secret(_FOLDER_ID_LENGTH)
return folder_id
def generate_password():
return generate_secret(_PASSWORD_LENGTH)
_BASE_STORAGE_DIR_ENVNAME = "WOLKJE_STORAGE_DIR"
_BASE_STORAGE_DIR = os.getenv(_BASE_STORAGE_DIR_ENVNAME)
if not _BASE_STORAGE_DIR:
raise Exception(f"{_BASE_STORAGE_DIR_ENVNAME} not configured")
if not os.path.isdir(_BASE_STORAGE_DIR):
raise Exception(f"{_BASE_STORAGE_DIR_ENVNAME} ({_BASE_STORAGE_DIR}) is not an existing directory")
_STORAGE_DIR = os.path.join(_BASE_STORAGE_DIR, "files")
os.makedirs(_STORAGE_DIR, exist_ok=True)
_INFO_FILE_NAME = ".folder.json"
_DATA_FILE_ENCODING = "utf-8"
def _get_folder_path(folder_id):
return os.path.join(_STORAGE_DIR, folder_id)
def get_folder(folder_id):
if not FOLDER_ID_PATTERN.fullmatch(folder_id):
return None
folder_path = _get_folder_path(folder_id)
if not os.path.exists(folder_path):
return None
folder_info_path = os.path.join(folder_path, _INFO_FILE_NAME)
if not os.path.exists(folder_info_path):
return None
with open(folder_info_path, "r", encoding=_DATA_FILE_ENCODING) as fh:
data = {
**json.load(fh),
"folder_id": folder_id,
"folder_path": folder_path,
"folder_info_path": folder_info_path,
}
return data
def new_folder(folder_name):
folder_id = generate_new_folder_id()
folder_path = _get_folder_path(folder_id)
folder_info_path = os.path.join(folder_path, _INFO_FILE_NAME)
os.makedirs(folder_path)
with open(folder_info_path, "x", encoding=_DATA_FILE_ENCODING) as fh:
json.dump({
"name": folder_name,
"created": int(time.time()),
"editing_password": generate_password(),
}, fh, ensure_ascii=False)
return get_folder(folder_id)
def ls(folder_id):
folder = get_folder(folder_id)
if not folder:
return None
return [
filename
for filename in os.listdir(folder["folder_path"])
if not filename.startswith(".")
]
_CHUNK_SIZE_BYTES = 4096
def save_file(folder_id, desired_file_name, file_stream, mime_type):
folder = get_folder(folder_id)
if not folder:
raise NoSuchFolderError()
ext = mimetypes.guess_extension(mime_type)
if ext is None:
raise UnknownMimeTypeError()
file_name = fn.sanitize(fn.set_extension(desired_file_name, ext))
if file_name is None:
raise IllegalFileNameError()
file_path = os.path.join(_get_folder_path(folder_id), file_name)
with open(file_path, "wb") as fp:
while True:
chunk = file_stream.read(_CHUNK_SIZE_BYTES)
if not chunk:
break
fp.write(chunk)
return file_name
def open_file(folder_id, file_name):
folder = get_folder(folder_id)
if not folder:
raise NoSuchFolderError()
file_name = fn.sanitize(file_name)
if file_name is None:
raise IllegalFileNameError()
file_path = os.path.join(_get_folder_path(folder_id), file_name)
mime_type = mimetypes.guess_type(file_name)
fp = open(file_path, "rb")
content_length = os.path.getsize(file_path)
return fp, content_length, mime_type