aboutsummaryrefslogtreecommitdiffstats
path: root/src/app.py
diff options
context:
space:
mode:
authorkj_sh6042026-06-01 14:37:28 -0400
committerkj_sh6042026-06-01 14:37:28 -0400
commit6fd3f732dd6430a7e5644524b27ea6d60f5e2a45 (patch)
tree33b28188e2d5b92a259cbaf149a7171a77a42b3f /src/app.py
parent738de85c9b646ebd68b1677538581267cf1fb515 (diff)
refactor: upload photos
Diffstat (limited to 'src/app.py')
-rw-r--r--src/app.py353
1 files changed, 352 insertions, 1 deletions
diff --git a/src/app.py b/src/app.py
index 47a8865..61889b5 100644
--- a/src/app.py
+++ b/src/app.py
@@ -3,13 +3,16 @@
# likha-pdf — markdown to pdf, no latex required
# production-friendly flask app with weasyprint + reportlab fallback
-import logging
import io
+import base64
+import logging
import os
+import re
import secrets
import sqlite3
import time
from collections import deque
+from datetime import timedelta
from pathlib import Path
from threading import Lock
from urllib.parse import urlsplit
@@ -18,19 +21,25 @@ from flask import (
Flask,
Response,
current_app,
+ jsonify,
request,
+ session,
send_from_directory,
)
from markupsafe import escape
from markdown import markdown
from weasyprint import HTML, default_url_fetcher
from werkzeug.middleware.proxy_fix import ProxyFix
+from werkzeug.utils import secure_filename
APP_NAME = "likha-pdf"
DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 5001
DEFAULT_MAX_CONTENT_LENGTH = 2048 * 1024 * 1024
DEFAULT_MAX_FORM_MEMORY_SIZE = DEFAULT_MAX_CONTENT_LENGTH
+DEFAULT_MAX_IMAGE_UPLOAD_BYTES = 25 * 1024 * 1024
+DEFAULT_IMAGE_UPLOAD_DIR = "uploads"
+DEFAULT_IMAGE_SESSION_TTL_SECONDS = 24 * 60 * 60
DEFAULT_CONVERT_RATE_LIMIT_REQUESTS = 5
DEFAULT_CONVERT_RATE_LIMIT_WINDOW_SECONDS = 60
DEFAULT_CONVERT_RATE_LIMIT_DB_PATH = "/tmp/likha-pdf-rate-limit.sqlite3"
@@ -56,6 +65,16 @@ TEMPLATES_DIR = BASE_DIR / "templates"
PARTIALS_DIR = TEMPLATES_DIR / "partials"
STATIC_DIR = BASE_DIR / "static"
+SESSION_IMAGE_SCHEME = "session-image://"
+SESSION_IMAGE_TOKEN_PATTERN = re.compile(r"session-image://([a-zA-Z0-9-]+)")
+
+ALLOWED_IMAGE_EXTENSIONS = {
+ ".png",
+ ".jpg",
+ ".jpeg",
+ ".webp",
+}
+
VALID_PAPER_SIZES = {
"a0paper",
"a1paper",
@@ -198,6 +217,17 @@ def format_bytes(num_bytes):
return f"{value:.2f} PB"
+def clean_image_name(name):
+ cleaned = str(name or "image")
+ cleaned = cleaned.replace("]", "")
+ cleaned = cleaned.replace("\r", " ").replace("\n", " ").strip()
+ return cleaned or "image"
+
+
+def build_session_image_snippet(name, image_id):
+ return f"![{clean_image_name(name)}]({SESSION_IMAGE_SCHEME}{image_id})"
+
+
def safe_weasy_url_fetcher(url, *args, **kwargs):
"""allow only data urls, block file/network/relative resources"""
scheme = (urlsplit(url).scheme or "").lower()
@@ -206,6 +236,232 @@ def safe_weasy_url_fetcher(url, *args, **kwargs):
raise ValueError("blocked non-data resource url")
+class SessionImageStore:
+ def __init__(self, base_dir, max_image_upload_bytes, session_ttl_seconds):
+ self.base_dir = Path(base_dir).expanduser()
+ self.max_image_upload_bytes = int(max_image_upload_bytes)
+ self.session_ttl_seconds = float(session_ttl_seconds)
+ self._cleanup_interval_seconds = 60.0
+
+ self._session_images = {}
+ self._session_last_seen = {}
+ self._next_cleanup_at = 0.0
+ self._lock = Lock()
+
+ self.base_dir.mkdir(parents=True, exist_ok=True)
+
+ def _touch_session(self, session_id, now):
+ self._session_last_seen[session_id] = now
+
+ def _cleanup_expired_locked(self, now):
+ if now < self._next_cleanup_at:
+ return
+
+ expire_before = now - self.session_ttl_seconds
+ expired_sessions = [
+ session_id
+ for session_id, last_seen in self._session_last_seen.items()
+ if last_seen < expire_before
+ ]
+
+ for session_id in expired_sessions:
+ self._session_last_seen.pop(session_id, None)
+ self._session_images.pop(session_id, None)
+
+ session_dir = self.base_dir / session_id
+ if session_dir.exists() and session_dir.is_dir():
+ for child in session_dir.iterdir():
+ if child.is_file():
+ try:
+ child.unlink()
+ except OSError:
+ pass
+
+ try:
+ session_dir.rmdir()
+ except OSError:
+ pass
+
+ self._next_cleanup_at = now + self._cleanup_interval_seconds
+
+ def _session_dir(self, session_id):
+ path = self.base_dir / session_id
+ path.mkdir(parents=True, exist_ok=True)
+ return path
+
+ def _record_to_public(self, record):
+ return {
+ "id": record["id"],
+ "name": record["name"],
+ "mimeType": record["mime_type"],
+ "sizeBytes": record["size_bytes"],
+ "createdAt": record["created_at"],
+ "snippet": build_session_image_snippet(record["name"], record["id"]),
+ }
+
+ def _remove_missing_record_locked(self, session_id, image_id):
+ bucket = self._session_images.get(session_id)
+ if not bucket:
+ return
+
+ bucket.pop(image_id, None)
+ if not bucket:
+ self._session_images.pop(session_id, None)
+
+ def add_image(self, session_id, uploaded_file):
+ now = time.time()
+
+ original_name = secure_filename(uploaded_file.filename or "")
+ if not original_name:
+ original_name = "image"
+
+ mime_type = (uploaded_file.mimetype or "").lower()
+ suffix = Path(original_name).suffix.lower()
+ if not mime_type.startswith("image/") and suffix not in ALLOWED_IMAGE_EXTENSIONS:
+ raise ValueError("unsupported image type.")
+
+ if not mime_type.startswith("image/"):
+ if suffix in {".jpg", ".jpeg"}:
+ mime_type = "image/jpeg"
+ elif suffix == ".png":
+ mime_type = "image/png"
+ elif suffix == ".gif":
+ mime_type = "image/gif"
+ elif suffix == ".webp":
+ mime_type = "image/webp"
+ elif suffix == ".svg":
+ mime_type = "image/svg+xml"
+ else:
+ mime_type = "application/octet-stream"
+
+ if suffix not in ALLOWED_IMAGE_EXTENSIONS:
+ suffix = ""
+
+ image_id = secrets.token_hex(20)
+ destination = self._session_dir(session_id) / f"{image_id}{suffix}"
+ uploaded_file.save(str(destination))
+
+ size_bytes = destination.stat().st_size if destination.exists() else 0
+ if size_bytes <= 0:
+ try:
+ destination.unlink()
+ except OSError:
+ pass
+ raise ValueError("image file is empty.")
+
+ if size_bytes > self.max_image_upload_bytes:
+ try:
+ destination.unlink()
+ except OSError:
+ pass
+ raise ValueError(
+ "image is too large. "
+ f"maximum size per image is {format_bytes(self.max_image_upload_bytes)}."
+ )
+
+ record = {
+ "id": image_id,
+ "name": original_name,
+ "mime_type": mime_type,
+ "size_bytes": int(size_bytes),
+ "created_at": int(now * 1000),
+ "path": destination,
+ }
+
+ with self._lock:
+ self._cleanup_expired_locked(now)
+ session_bucket = self._session_images.setdefault(session_id, {})
+ session_bucket[image_id] = record
+ self._touch_session(session_id, now)
+
+ return self._record_to_public(record)
+
+ def list_images(self, session_id):
+ now = time.time()
+ with self._lock:
+ self._cleanup_expired_locked(now)
+ self._touch_session(session_id, now)
+ session_bucket = self._session_images.get(session_id, {})
+
+ records = []
+ for image_id, record in list(session_bucket.items()):
+ image_path = Path(record["path"])
+ if not image_path.exists():
+ self._remove_missing_record_locked(session_id, image_id)
+ continue
+ records.append(self._record_to_public(record))
+
+ records.sort(key=lambda entry: entry["createdAt"], reverse=True)
+ return records
+
+ def get_image_data_url(self, session_id, image_id):
+ now = time.time()
+
+ with self._lock:
+ self._cleanup_expired_locked(now)
+ self._touch_session(session_id, now)
+
+ session_bucket = self._session_images.get(session_id, {})
+ record = session_bucket.get(image_id)
+ if record is None:
+ return None
+
+ image_path = Path(record["path"])
+ mime_type = record["mime_type"]
+
+ if not image_path.exists():
+ with self._lock:
+ self._remove_missing_record_locked(session_id, image_id)
+ return None
+
+ try:
+ payload = image_path.read_bytes()
+ except OSError:
+ return None
+
+ encoded = base64.b64encode(payload).decode("ascii")
+ return f"data:{mime_type};base64,{encoded}"
+
+
+def resolve_session_image_tokens(source_markdown, session_id, image_store):
+ image_ids = {
+ match.group(1)
+ for match in SESSION_IMAGE_TOKEN_PATTERN.finditer(source_markdown)
+ if match.group(1)
+ }
+
+ if not image_ids:
+ return source_markdown, []
+
+ resolved_markdown = source_markdown
+ missing_image_ids = []
+
+ for image_id in image_ids:
+ data_url = image_store.get_image_data_url(session_id, image_id)
+ if data_url is None:
+ missing_image_ids.append(image_id)
+ continue
+
+ resolved_markdown = resolved_markdown.replace(
+ f"{SESSION_IMAGE_SCHEME}{image_id}",
+ data_url,
+ )
+
+ missing_image_ids.sort()
+ return resolved_markdown, missing_image_ids
+
+
+def get_or_create_session_id():
+ session_id = session.get("likha_pdf_session_id")
+ if isinstance(session_id, str) and session_id:
+ return session_id
+
+ session_id = secrets.token_hex(24)
+ session["likha_pdf_session_id"] = session_id
+ session.permanent = True
+ return session_id
+
+
class SlidingWindowRateLimiter:
def __init__(
self,
@@ -806,6 +1062,31 @@ def create_app():
app.config["MAX_CONTENT_LENGTH"] = max_content_length
app.config["MAX_FORM_MEMORY_SIZE"] = max_form_memory_size
+ max_image_upload_bytes = env_int(
+ "MAX_IMAGE_UPLOAD_BYTES",
+ DEFAULT_MAX_IMAGE_UPLOAD_BYTES,
+ minimum=1,
+ )
+ image_upload_dir = os.getenv("IMAGE_UPLOAD_DIR", DEFAULT_IMAGE_UPLOAD_DIR).strip()
+ if not image_upload_dir:
+ image_upload_dir = DEFAULT_IMAGE_UPLOAD_DIR
+
+ image_session_ttl_seconds = env_int(
+ "IMAGE_SESSION_TTL_SECONDS",
+ DEFAULT_IMAGE_SESSION_TTL_SECONDS,
+ minimum=60,
+ )
+
+ image_store = SessionImageStore(
+ image_upload_dir,
+ max_image_upload_bytes,
+ image_session_ttl_seconds,
+ )
+
+ app.config["MAX_IMAGE_UPLOAD_BYTES"] = max_image_upload_bytes
+ app.config["IMAGE_UPLOAD_DIR"] = image_upload_dir
+ app.config["IMAGE_SESSION_TTL_SECONDS"] = image_session_ttl_seconds
+
convert_rate_limit_requests = env_int(
"CONVERT_RATE_LIMIT_REQUESTS",
DEFAULT_CONVERT_RATE_LIMIT_REQUESTS,
@@ -871,6 +1152,24 @@ def create_app():
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
app.logger.setLevel(log_level)
+ secret_key = os.getenv("SECRET_KEY", "").strip()
+ if not secret_key:
+ secret_key = secrets.token_hex(32)
+ app.logger.warning(
+ "SECRET_KEY is not set, generated ephemeral key for this process"
+ )
+
+ app.secret_key = secret_key
+ app.config["SESSION_COOKIE_HTTPONLY"] = True
+ app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
+ app.config["SESSION_COOKIE_SECURE"] = env_bool(
+ "SESSION_COOKIE_SECURE",
+ default=trust_proxy,
+ )
+ app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(
+ seconds=image_session_ttl_seconds
+ )
+
@app.after_request
def add_security_headers(resp):
resp.headers.setdefault("X-Content-Type-Options", "nosniff")
@@ -913,6 +1212,38 @@ def create_app():
def favicon():
return send_from_directory(str(BASE_DIR), "favicon.svg")
+ @app.route("/upload-image", methods=["POST"])
+ def upload_image():
+ session_id = get_or_create_session_id()
+ image_file = request.files.get("image")
+ if image_file is None:
+ return jsonify({"error": "image file is required."}), 400
+
+ if not (image_file.filename or "").strip():
+ return jsonify({"error": "image file is required."}), 400
+
+ try:
+ image_record = image_store.add_image(session_id, image_file)
+ except ValueError as exc:
+ return jsonify({"error": str(exc)}), 400
+ except OSError:
+ app.logger.exception("failed to save uploaded image")
+ return jsonify({"error": "failed to save image."}), 500
+
+ response = jsonify({"image": image_record})
+ response.status_code = 201
+ response.headers["Cache-Control"] = "no-store"
+ return response
+
+ @app.route("/session-images", methods=["GET"])
+ def session_images():
+ session_id = get_or_create_session_id()
+ records = image_store.list_images(session_id)
+
+ response = jsonify({"images": records})
+ response.headers["Cache-Control"] = "no-store"
+ return response
+
@app.route("/convert", methods=["POST"])
def convert():
rate_limit_key = f"ip:{request.remote_addr or 'unknown'}"
@@ -945,6 +1276,26 @@ def create_app():
400,
)
+ session_id = get_or_create_session_id()
+ md, missing_image_ids = resolve_session_image_tokens(md, session_id, image_store)
+ if missing_image_ids:
+ app.logger.warning(
+ "missing session images during convert: %s",
+ ", ".join(missing_image_ids),
+ )
+ return (
+ read_partial(
+ "error.html",
+ {
+ "{{ message }}": (
+ "one or more images in markdown are missing from this browser session. "
+ "please upload the missing image again."
+ ),
+ },
+ ),
+ 400,
+ )
+
paper_size = pick_option(
request.form.get("paper_size", ""),
"letterpaper",