Get emoji statically
This commit is contained in:
parent
e200c72290
commit
60d4605ac1
6 changed files with 73 additions and 102 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -1,2 +1 @@
|
|||
mattermost_channels.html
|
||||
*_credentials.json
|
||||
dist/
|
||||
|
|
2
emoji.py
2
emoji.py
|
@ -22,7 +22,7 @@ def translate_emoji(match_obj):
|
|||
)
|
||||
for emoji in custom_emoji:
|
||||
if emoji["name"] == emoji_name:
|
||||
return f"<img src='/emoji_proxy/{mm_server}/{emoji['id']}' alt=':{emoji_name}:' class='emoji' />"
|
||||
return f"<img src='../emoji/images/{emoji_name}' alt=':{emoji_name}:' class='emoji' />"
|
||||
return f":{emoji_name}:"
|
||||
|
||||
# From https://mattermost.example.org/<team_name>/emoji/add:
|
||||
|
|
|
@ -1,95 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from wsgiref.simple_server import make_server, demo_app
|
||||
|
||||
software_name = "emoji_proxy"
|
||||
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
|
||||
with open("./mattermost_credentials.json") as fh:
|
||||
credentials = json.load(fh)
|
||||
|
||||
|
||||
HTTP_OK = "200 OK"
|
||||
HTTP_NOT_FOUND = "404 Not Found"
|
||||
HTTP_INTERNAL_SERVER_ERROR = "500 Internal Server Error"
|
||||
|
||||
|
||||
class HTTPError(Exception):
|
||||
def __init__(self, status, message):
|
||||
self.status = status
|
||||
self.message = message
|
||||
|
||||
def __repr__(self):
|
||||
return f"<HTTPError {self.status}>"
|
||||
|
||||
|
||||
default_headers = [
|
||||
("Server", software_name),
|
||||
]
|
||||
|
||||
|
||||
def app(environ, start_response):
|
||||
path = environ["PATH_INFO"]
|
||||
|
||||
m = re.fullmatch(r'/emoji_proxy/(.+)/([a-z0-9]{26})', path)
|
||||
if not m:
|
||||
raise HTTPError(HTTP_NOT_FOUND, "Path not recognized, expecting /emoji_proxy/<mattermost_server>/<emoji_id>\n<mattermost_server> may include slashes to specify a path, though this is not typical.")
|
||||
|
||||
server = m.group(1)
|
||||
emoji_id = m.group(2)
|
||||
|
||||
if server not in credentials:
|
||||
raise HTTPError(HTTP_NOT_FOUND, f"Server {server} not supported")
|
||||
access_token = credentials[server]["access_token"]
|
||||
|
||||
req = urllib.request.Request(f"https://{server}/api/v4/emoji/{emoji_id}/image")
|
||||
req.add_header("Referer", "https://{server}/")
|
||||
req.add_header("User-Agent", software_name)
|
||||
req.add_header("Authorization", "Bearer " + access_token)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as upstream_response:
|
||||
start_response(HTTP_OK, default_headers + [
|
||||
("Cache-Control", "max-age=86400, public"),
|
||||
] + [
|
||||
(header_name, upstream_response.headers[header_name])
|
||||
for header_name in upstream_response.headers
|
||||
if header_name not in {"Server", "Connection", "Accept-Ranges", "Transfer-Encoding"} and not header_name.startswith("X-")
|
||||
])
|
||||
|
||||
chunk = upstream_response.read1()
|
||||
while chunk != b"":
|
||||
yield chunk
|
||||
chunk = upstream_response.read1()
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.status == 404:
|
||||
raise HTTPError(HTTP_NOT_FOUND, f"Emoji not found")
|
||||
raise HTTPError(f"{e.status} {e.reason}", [b"The Mattermost server reported an error:\n\n", e.read()])
|
||||
except urllib.error.URLError as e:
|
||||
raise e
|
||||
|
||||
|
||||
def app_wrap(environ, start_response):
|
||||
try:
|
||||
yield from app(environ, start_response)
|
||||
except HTTPError as e:
|
||||
start_response(e.status, default_headers + [
|
||||
("Content-Type", "text/plain; charset=utf-8"),
|
||||
])
|
||||
if isinstance(e.message, str):
|
||||
yield from [e.message.encode("utf-8")]
|
||||
else:
|
||||
yield from e.message
|
||||
|
||||
|
||||
with make_server("", 8000, app_wrap) as httpd:
|
||||
print("Serving HTTP on port 8000...")
|
||||
|
||||
# Respond to requests until process is killed
|
||||
httpd.serve_forever()
|
|
@ -1,3 +0,0 @@
|
|||
{
|
||||
"mattermost.example.org": {"access_token": "1ab2cd3ef4gh5ij6kl7mn8op9q"}
|
||||
}
|
3
gen.sh
3
gen.sh
|
@ -7,7 +7,8 @@ team_name="zeus"
|
|||
|
||||
dir="$(dirname "$0")"
|
||||
|
||||
out=mattermost_channels.html
|
||||
out="$dir/dist/channels/index.html"
|
||||
mkdir -p "$(dirname "$out")"
|
||||
|
||||
custom_emoji_file="$(mktemp --tmpdir custom_emoji.XXXXXXXXXX.json)"
|
||||
mmcli listcustomemoji | jq -s > "$custom_emoji_file"
|
||||
|
|
69
get_custom_emoji.py
Executable file
69
get_custom_emoji.py
Executable file
|
@ -0,0 +1,69 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import mimetypes
|
||||
import urllib.request
|
||||
|
||||
|
||||
destination_dir = "dist/emoji/images"
|
||||
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
os.makedirs(destination_dir, exist_ok=True)
|
||||
|
||||
user_agent = "emoji crawler by the MI5 Delegation for Guarding of Access to Resource Development"
|
||||
|
||||
MM_SERVER = os.environ["MM_SERVER"]
|
||||
MM_ACCESSTOKEN = os.environ["MM_ACCESSTOKEN"]
|
||||
|
||||
|
||||
def mm_api_get(path):
|
||||
req = urllib.request.Request(f"https://{MM_SERVER}/api{path}")
|
||||
req.add_header("Referer", f"https://{MM_SERVER}/")
|
||||
req.add_header("User-Agent", user_agent)
|
||||
req.add_header("Authorization", "Bearer " + MM_ACCESSTOKEN)
|
||||
return urllib.request.urlopen(req)
|
||||
|
||||
def mm_api_get_json(*args, **kwargs):
|
||||
with mm_api_get(*args, **kwargs) as response:
|
||||
return json.load(response)
|
||||
|
||||
|
||||
def get_list_of_custom_emoji():
|
||||
page = 0
|
||||
per_page = 200 # Maximum allowed in API
|
||||
response = []
|
||||
while page == 0 or response:
|
||||
response = mm_api_get_json(f"/v4/emoji?page={page}&per_page={per_page}&sort=name")
|
||||
for emoji in response:
|
||||
yield(emoji)
|
||||
page += 1
|
||||
|
||||
|
||||
def get_emoji_image(name, emoji_id):
|
||||
print(f"Downloading {name}", file=sys.stderr, flush=True)
|
||||
with mm_api_get(f"/v4/emoji/{emoji_id}/image") as upstream_response:
|
||||
headers = {k.lower(): v for (k, v) in upstream_response.headers.items()}
|
||||
mime = headers.get("content-type")
|
||||
if not mime.startswith("image/"):
|
||||
raise Exception(f"Got response with MIME type {mime}, not an image")
|
||||
extension = mimetypes.guess_extension(mime)
|
||||
|
||||
filepath = f"{destination_dir}/{name}{extension}"
|
||||
with open(filepath, "wb") as fh:
|
||||
chunk = upstream_response.read1()
|
||||
while chunk != b"":
|
||||
fh.write(chunk)
|
||||
chunk = upstream_response.read1()
|
||||
print(f"Saved to {filepath}", file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def main():
|
||||
for emoji in get_list_of_custom_emoji():
|
||||
get_emoji_image(emoji["name"], emoji["id"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue