From c18bce8e602c8f1001bd691af9ee2d59912f4250 Mon Sep 17 00:00:00 2001 From: kj_sh604 Date: Sat, 14 Mar 2026 17:18:25 -0400 Subject: refactor: server.py --- server.py | 845 -------------------------------------------------------------- 1 file changed, 845 deletions(-) delete mode 100644 server.py diff --git a/server.py b/server.py deleted file mode 100644 index 79e8079..0000000 --- a/server.py +++ /dev/null @@ -1,845 +0,0 @@ -#!/usr/bin/env python3 - -# kj-clipboard server — no frills public clipboard -# single-file server: sqlite, mojicrypt encryption, syntax highlighting -# usage: python3 server.py - -import http.server -import json -import os -import re -import secrets -import signal -import sqlite3 -import subprocess -import sys -import threading -import time -from pathlib import Path -from urllib.parse import urlparse, unquote - - -# config - -PORT = int(os.environ.get("KJ_CLIPBOARD_PORT", 5555)) -BIND = os.environ.get("KJ_CLIPBOARD_BIND", "0.0.0.0") -BASE_DIR = Path(__file__).parent.resolve() -DB_PATH = BASE_DIR / "data" / "kj-clipboard.db" -RANDOM_ID_LENGTH = 40 # random chars after unix epoch -MAX_PASTE_SIZE = 1024 * 1024 # 1 MiB -MAX_PASSPHRASE_SIZE = 256 -MAX_LANGUAGE_SIZE = 32 -MAX_DECRYPT_SIZE = 2 * 1024 * 1024 # includes encrypted glyph overhead -ID_PATTERN = re.compile(r"^[0-9]{10,}[a-f0-9]{40}$") -LANGUAGE_PATTERN = re.compile(r"^[a-z0-9_+#-]{1,32}$") - -REQUESTS_PER_WINDOW = int(os.environ.get("KJ_CLIPBOARD_RATE_LIMIT", "60")) -RATE_WINDOW_SECONDS = int(os.environ.get("KJ_CLIPBOARD_RATE_WINDOW", "60")) - -ALLOWED_LANGUAGES = { - "1c", - "abnf", - "accesslog", - "ada", - "arduino", - "armasm", - "asciidoc", - "aspectj", - "autohotkey", - "autoit", - "avrasm", - "awk", - "bash", - "basic", - "bnf", - "brainfuck", - "c", - "cal", - "capnproto", - "ceylon", - "clean", - "clojure", - "clojure-repl", - "cmake", - "coffeescript", - "coq", - "cos", - "cpp", - "crmsh", - "crystal", - "csharp", - "csp", - "css", - "d", - "dart", - "delphi", - "diff", - "django", - "dns", - "dockerfile", - "dos", - "dsconfig", - "dts", - "dust", - "ebnf", - "elixir", - "elm", - "erb", - "erlang", - "excel", - "fix", - "flix", - "fortran", - "fsharp", - "gams", - "gauss", - "gcode", - "gherkin", - "glsl", - "gml", - "go", - "golo", - "gradle", - "graphql", - "groovy", - "haml", - "handlebars", - "haskell", - "haxe", - "hsp", - "http", - "hy", - "inform7", - "ini", - "irpf90", - "isbl", - "java", - "javascript", - "jboss-cli", - "json", - "julia", - "kotlin", - "lasso", - "latex", - "ldif", - "leaf", - "less", - "lisp", - "livecodeserver", - "livescript", - "llvm", - "lsl", - "lua", - "makefile", - "markdown", - "mathematica", - "matlab", - "maxima", - "mel", - "mercury", - "mipsasm", - "mizar", - "mojolicious", - "monkey", - "moonscript", - "n1ql", - "nestedtext", - "nginx", - "nim", - "nix", - "node-repl", - "nsis", - "objectivec", - "ocaml", - "openscad", - "oxygene", - "parser3", - "perl", - "pf", - "pgsql", - "php", - "php-template", - "plaintext", - "pony", - "powershell", - "processing", - "profile", - "prolog", - "properties", - "protobuf", - "puppet", - "purebasic", - "python", - "python-repl", - "q", - "qml", - "r", - "reasonml", - "rib", - "roboconf", - "routeros", - "rsl", - "ruby", - "ruleslanguage", - "rust", - "sas", - "scala", - "scheme", - "scilab", - "scss", - "shell", - "smali", - "smalltalk", - "sml", - "sqf", - "sql", - "stan", - "stata", - "step21", - "stylus", - "subunit", - "swift", - "taggerscript", - "tap", - "tcl", - "thrift", - "tp", - "twig", - "typescript", - "vala", - "vbnet", - "vbscript", - "verilog", - "vhdl", - "vim", - "wasm", - "wren", - "x86asm", - "xl", - "xml", - "xquery", - "yaml", - "zephir", -} - - -# in-memory fixed-window rate limiter (cheap guardrail behind nginx) -_rate_lock = threading.Lock() -_rate_state = {} - - -# database - - -def init_db(): - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(DB_PATH)) - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA synchronous=NORMAL") - conn.execute( - """ - CREATE TABLE IF NOT EXISTS pastes ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - language TEXT DEFAULT NULL, - is_code INTEGER DEFAULT 0, - is_encrypted INTEGER DEFAULT 0, - created_at INTEGER NOT NULL - ) - """ - ) - conn.commit() - conn.close() - - -def generate_id(): - """generate id in format: <40-char-random-hex>""" - return f"{int(time.time())}{secrets.token_hex(RANDOM_ID_LENGTH // 2)}" - - -def is_valid_paste_id(paste_id): - """validate id format: unix epoch prefix + 40 hex chars""" - return bool(ID_PATTERN.match(paste_id)) - - -def save_paste(content, language=None, is_code=False, is_encrypted=False): - """store a paste in the database, return its id""" - conn = sqlite3.connect(str(DB_PATH)) - try: - for _ in range(5): - paste_id = generate_id() - try: - conn.execute( - "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) " - "VALUES (?, ?, ?, ?, ?, ?)", - ( - paste_id, - content, - language, - int(is_code), - int(is_encrypted), - int(time.time()), - ), - ) - conn.commit() - return paste_id - except sqlite3.IntegrityError: - continue - raise RuntimeError("failed to generate unique paste id") - finally: - conn.close() - - -def get_paste(paste_id): - """retrieve a paste by id, returns dict or None""" - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() - conn.close() - if row: - return dict(row) - return None - - -# mojicrypt helpers - - -def mojicrypt_encrypt(text, passphrase): - """encrypt text with mojicrypt, return glyph string or None on failure""" - try: - result = subprocess.run( - ["mojicrypt", "encrypt", "-p", passphrase], - input=text, - capture_output=True, - text=True, - timeout=120, - check=False, - ) - if result.returncode != 0: - return None - return result.stdout.strip() - except (subprocess.TimeoutExpired, FileNotFoundError): - return None - - -def mojicrypt_decrypt(encrypted_blob, passphrase): - """decrypt a mojicrypt glyph string, return plaintext or None on failure""" - try: - result = subprocess.run( - ["mojicrypt", "decrypt", "-p", passphrase], - input=encrypted_blob, - capture_output=True, - text=True, - timeout=120, - check=False, - ) - if result.returncode != 0: - return None - return result.stdout.strip() - except (subprocess.TimeoutExpired, FileNotFoundError): - return None - - -# html templates - - -def landing_page(): - return (BASE_DIR / "index.html").read_text(encoding="utf-8") - - -def paste_page(paste): - """render the view page for a paste""" - paste_id = paste["id"] - content = paste["content"] - is_code = paste["is_code"] - is_encrypted = paste["is_encrypted"] - language = paste["language"] or "" - created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(paste["created_at"])) - - if is_encrypted: - # show decrypt form instead of content - content_block = f""" -
-

this paste is password-protected.

- - - -
- - """ - else: - escaped = html_escape(content) - if is_code: - lang_class = f'class="language-{language}"' if language else "" - content_block = ( - f'
{escaped}
' - ) - else: - content_block = f'
{escaped}
' - - highlight_css = "" - highlight_js = "" - if is_code: - highlight_css = '' - highlight_js = """ -""" - - return f""" - - - - - - - kj-clipboard - {paste_id} - - {highlight_css} - - -

kj-clipboard

-

created {created}{(" · " + language) if language else ""}{" · encrypted" if is_encrypted else ""}

-
- - raw -
- {content_block} - {highlight_js} - - -""" - - -def not_found_page(): - return """ - - - - - - kj-clipboard - not found - - - -

kj-clipboard

-

paste not found.

- -""" - - -# helpers - - -def html_escape(text): - return ( - text.replace("&", "&") - .replace("<", "<") - .replace(">", ">") - .replace('"', """) - .replace("'", "'") - ) - - -def html_escape_attr(text): - return html_escape(text).replace("\n", " ").replace("\r", " ") - - -def normalize_language(value): - """normalize and validate highlight.js language token""" - if value is None: - return None - if not isinstance(value, str): - return None - lang = value.strip().lower() - if not lang: - return None - if len(lang) > MAX_LANGUAGE_SIZE: - return None - if not LANGUAGE_PATTERN.match(lang): - return None - if lang not in ALLOWED_LANGUAGES: - return None - return lang - - -def get_client_ip(handler): - """use x-forwarded-for first when present (nginx reverse proxy)""" - xff = handler.headers.get("X-Forwarded-For", "").strip() - if xff: - return xff.split(",")[0].strip() - return handler.client_address[0] - - -def is_rate_limited(client_ip): - """fixed window limiter for POST endpoints""" - now = int(time.time()) - window = now // RATE_WINDOW_SECONDS - - with _rate_lock: - key = (client_ip, window) - count = _rate_state.get(key, 0) + 1 - _rate_state[key] = count - - # cheap cleanup to avoid unbounded growth - stale_before = window - 2 - stale_keys = [k for k in _rate_state if k[1] < stale_before] - for k in stale_keys: - _rate_state.pop(k, None) - - return count > REQUESTS_PER_WINDOW - - -# request handler - - -class ClipboardHandler(http.server.BaseHTTPRequestHandler): - def log_message(self, fmt, *args): - client_ip = get_client_ip(self) - sys.stderr.write( - f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} — {fmt % args}\n" - ) - - def add_security_headers(self): - self.send_header("X-Content-Type-Options", "nosniff") - self.send_header("X-Frame-Options", "DENY") - self.send_header("Referrer-Policy", "no-referrer") - self.send_header("Permissions-Policy", "interest-cohort=()") - self.send_header("Cross-Origin-Opener-Policy", "same-origin") - self.send_header("Cross-Origin-Resource-Policy", "same-origin") - self.send_header("Cache-Control", "no-store") - # CSP allows required CDNs and inline scripts currently used in templates. - self.send_header( - "Content-Security-Policy", - "default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " - "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; " - "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'", - ) - - def send_html(self, code, body): - data = body.encode("utf-8") - self.send_response(code) - self.send_header("Content-Type", "text/html; charset=utf-8") - self.send_header("Content-Length", str(len(data))) - self.add_security_headers() - self.end_headers() - self.wfile.write(data) - - def send_json(self, code, obj): - data = json.dumps(obj, separators=(",", ":")).encode("utf-8") - self.send_response(code) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(data))) - self.add_security_headers() - self.end_headers() - self.wfile.write(data) - - def send_plain(self, code, text): - data = text.encode("utf-8") - self.send_response(code) - self.send_header("Content-Type", "text/plain; charset=utf-8") - self.send_header("Content-Length", str(len(data))) - self.add_security_headers() - self.end_headers() - self.wfile.write(data) - - def read_body(self, max_size): - try: - length = int(self.headers.get("Content-Length", "0")) - except ValueError: - return None - if length <= 0 or length > max_size: - return None - return self.rfile.read(length) - - def read_json_body(self, max_size): - ctype = self.headers.get("Content-Type", "").split(";")[0].strip().lower() - if ctype != "application/json": - return None - body = self.read_body(max_size) - if body is None: - return None - try: - data = json.loads(body) - except (json.JSONDecodeError, TypeError, UnicodeDecodeError): - return None - if not isinstance(data, dict): - return None - return data - - def do_GET(self): - try: - parsed = urlparse(self.path) - path = unquote(parsed.path).rstrip("/") or "/" - - if path == "/healthz": - self.send_plain(200, "ok") - return - - if path == "/": - self.send_html(200, landing_page()) - return - - if path.startswith("/raw/"): - paste_id = path[5:] - if not is_valid_paste_id(paste_id): - self.send_plain(404, "not found") - return - paste = get_paste(paste_id) - if not paste: - self.send_plain(404, "not found") - return - if paste["is_encrypted"]: - self.send_plain( - 403, - "403: this paste is encrypted; raw view is not available - use the web interface to decrypt and view the content", - ) - return - self.send_plain(200, paste["content"]) - return - - # treat anything else as a paste id - paste_id = path.lstrip("/") - if not is_valid_paste_id(paste_id): - self.send_html(404, not_found_page()) - return - - paste = get_paste(paste_id) - if not paste: - self.send_html(404, not_found_page()) - return - - self.send_html(200, paste_page(paste)) - except Exception: - self.send_json(500, {"error": "internal server error"}) - - def do_POST(self): - try: - client_ip = get_client_ip(self) - if is_rate_limited(client_ip): - self.send_json(429, {"error": "rate limit exceeded"}) - return - - parsed = urlparse(self.path) - path = unquote(parsed.path) - - if path == "/api/paste": - self.handle_create_paste() - elif path == "/api/decrypt": - self.handle_decrypt() - else: - self.send_json(404, {"error": "not found"}) - except Exception: - self.send_json(500, {"error": "internal server error"}) - - def handle_create_paste(self): - data = self.read_json_body(MAX_PASTE_SIZE) - if data is None: - self.send_json(400, {"error": "invalid request"}) - return - - content = data.get("content", "") - if not isinstance(content, str): - self.send_json(400, {"error": "content is required"}) - return - - # preserve exact content while blocking empty/oversized payloads - if not content.strip(): - self.send_json(400, {"error": "content is required"}) - return - if len(content.encode("utf-8")) > MAX_PASTE_SIZE: - self.send_json(413, {"error": "paste too large (max 1 MiB)"}) - return - - is_code = bool(data.get("is_code", False)) - language = normalize_language(data.get("language", "")) - passphrase = data.get("passphrase", "") - if passphrase is None: - passphrase = "" - if not isinstance(passphrase, str): - self.send_json(400, {"error": "invalid passphrase"}) - return - passphrase = passphrase.strip() - if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: - self.send_json(400, {"error": "passphrase too long"}) - return - - if data.get("language", "") and language is None: - self.send_json(400, {"error": "invalid language name"}) - return - - is_encrypted = False - store_content = content - - if passphrase: - encrypted = mojicrypt_encrypt(content, passphrase) - if encrypted is None: - self.send_json( - 500, {"error": "encryption failed — is mojicrypt installed?"} - ) - return - store_content = encrypted - is_encrypted = True - - paste_id = save_paste( - content=store_content, - language=language, - is_code=is_code, - is_encrypted=is_encrypted, - ) - - self.send_json(200, {"id": paste_id, "url": f"/{paste_id}"}) - - def handle_decrypt(self): - data = self.read_json_body(MAX_DECRYPT_SIZE) - if data is None: - self.send_json(400, {"error": "invalid request"}) - return - - paste_id = data.get("id", "") - passphrase = data.get("passphrase", "") - - if not isinstance(paste_id, str) or not isinstance(passphrase, str): - self.send_json(400, {"error": "id and passphrase are required"}) - return - passphrase = passphrase.strip() - if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: - self.send_json(400, {"error": "passphrase too long"}) - return - - if not paste_id or not passphrase: - self.send_json(400, {"error": "id and passphrase are required"}) - return - - if not is_valid_paste_id(paste_id): - self.send_json(404, {"error": "paste not found"}) - return - - paste = get_paste(paste_id) - if not paste: - self.send_json(404, {"error": "paste not found"}) - return - - if not paste["is_encrypted"]: - self.send_json(400, {"error": "paste is not encrypted"}) - return - - plaintext = mojicrypt_decrypt(paste["content"], passphrase) - if plaintext is None: - self.send_json(403, {"error": "wrong passphrase or corrupted data"}) - return - - self.send_json(200, {"content": plaintext}) - - -# main - - -class ClipboardHTTPServer(http.server.ThreadingHTTPServer): - daemon_threads = True - allow_reuse_address = True - - -def main(): - init_db() - print(f"kj-clipboard — listening on {BIND}:{PORT}") - - server = ClipboardHTTPServer((BIND, PORT), ClipboardHandler) - shutdown_requested = threading.Event() - - def request_shutdown(msg): - if shutdown_requested.is_set(): - return - shutdown_requested.set() - print(msg) - # shutdown() should run outside the serving thread. - threading.Thread(target=server.shutdown, daemon=True).start() - - def _shutdown_handler(signum, _frame): - request_shutdown(f"\nreceived signal {signum}, shutting down.") - - signal.signal(signal.SIGTERM, _shutdown_handler) - - try: - server.serve_forever() - except KeyboardInterrupt: - print("\nreceived keyboard interrupt, shutting down.") - finally: - server.server_close() - - -if __name__ == "__main__": - main() -- cgit v1.2.3