diff options
| -rw-r--r-- | server.py | 845 |
1 files changed, 845 insertions, 0 deletions
diff --git a/server.py b/server.py new file mode 100644 index 0000000..79e8079 --- /dev/null +++ b/server.py @@ -0,0 +1,845 @@ +#!/usr/bin/env python3 + +# kj-clipboard server — no frills public clipboard +# single-file server: sqlite, mojicrypt encryption, syntax highlighting +# usage: python3 server.py + +import http.server +import json +import os +import re +import secrets +import signal +import sqlite3 +import subprocess +import sys +import threading +import time +from pathlib import Path +from urllib.parse import urlparse, unquote + + +# config + +PORT = int(os.environ.get("KJ_CLIPBOARD_PORT", 5555)) +BIND = os.environ.get("KJ_CLIPBOARD_BIND", "0.0.0.0") +BASE_DIR = Path(__file__).parent.resolve() +DB_PATH = BASE_DIR / "data" / "kj-clipboard.db" +RANDOM_ID_LENGTH = 40 # random chars after unix epoch +MAX_PASTE_SIZE = 1024 * 1024 # 1 MiB +MAX_PASSPHRASE_SIZE = 256 +MAX_LANGUAGE_SIZE = 32 +MAX_DECRYPT_SIZE = 2 * 1024 * 1024 # includes encrypted glyph overhead +ID_PATTERN = re.compile(r"^[0-9]{10,}[a-f0-9]{40}$") +LANGUAGE_PATTERN = re.compile(r"^[a-z0-9_+#-]{1,32}$") + +REQUESTS_PER_WINDOW = int(os.environ.get("KJ_CLIPBOARD_RATE_LIMIT", "60")) +RATE_WINDOW_SECONDS = int(os.environ.get("KJ_CLIPBOARD_RATE_WINDOW", "60")) + +ALLOWED_LANGUAGES = { + "1c", + "abnf", + "accesslog", + "ada", + "arduino", + "armasm", + "asciidoc", + "aspectj", + "autohotkey", + "autoit", + "avrasm", + "awk", + "bash", + "basic", + "bnf", + "brainfuck", + "c", + "cal", + "capnproto", + "ceylon", + "clean", + "clojure", + "clojure-repl", + "cmake", + "coffeescript", + "coq", + "cos", + "cpp", + "crmsh", + "crystal", + "csharp", + "csp", + "css", + "d", + "dart", + "delphi", + "diff", + "django", + "dns", + "dockerfile", + "dos", + "dsconfig", + "dts", + "dust", + "ebnf", + "elixir", + "elm", + "erb", + "erlang", + "excel", + "fix", + "flix", + "fortran", + "fsharp", + "gams", + "gauss", + "gcode", + "gherkin", + "glsl", + "gml", + "go", + "golo", + "gradle", + "graphql", + "groovy", + "haml", + "handlebars", + "haskell", + "haxe", + "hsp", + "http", + "hy", + "inform7", + "ini", + "irpf90", + "isbl", + "java", + "javascript", + "jboss-cli", + "json", + "julia", + "kotlin", + "lasso", + "latex", + "ldif", + "leaf", + "less", + "lisp", + "livecodeserver", + "livescript", + "llvm", + "lsl", + "lua", + "makefile", + "markdown", + "mathematica", + "matlab", + "maxima", + "mel", + "mercury", + "mipsasm", + "mizar", + "mojolicious", + "monkey", + "moonscript", + "n1ql", + "nestedtext", + "nginx", + "nim", + "nix", + "node-repl", + "nsis", + "objectivec", + "ocaml", + "openscad", + "oxygene", + "parser3", + "perl", + "pf", + "pgsql", + "php", + "php-template", + "plaintext", + "pony", + "powershell", + "processing", + "profile", + "prolog", + "properties", + "protobuf", + "puppet", + "purebasic", + "python", + "python-repl", + "q", + "qml", + "r", + "reasonml", + "rib", + "roboconf", + "routeros", + "rsl", + "ruby", + "ruleslanguage", + "rust", + "sas", + "scala", + "scheme", + "scilab", + "scss", + "shell", + "smali", + "smalltalk", + "sml", + "sqf", + "sql", + "stan", + "stata", + "step21", + "stylus", + "subunit", + "swift", + "taggerscript", + "tap", + "tcl", + "thrift", + "tp", + "twig", + "typescript", + "vala", + "vbnet", + "vbscript", + "verilog", + "vhdl", + "vim", + "wasm", + "wren", + "x86asm", + "xl", + "xml", + "xquery", + "yaml", + "zephir", +} + + +# in-memory fixed-window rate limiter (cheap guardrail behind nginx) +_rate_lock = threading.Lock() +_rate_state = {} + + +# database + + +def init_db(): + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS pastes ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + language TEXT DEFAULT NULL, + is_code INTEGER DEFAULT 0, + is_encrypted INTEGER DEFAULT 0, + created_at INTEGER NOT NULL + ) + """ + ) + conn.commit() + conn.close() + + +def generate_id(): + """generate id in format: <unix-epoch><40-char-random-hex>""" + return f"{int(time.time())}{secrets.token_hex(RANDOM_ID_LENGTH // 2)}" + + +def is_valid_paste_id(paste_id): + """validate id format: unix epoch prefix + 40 hex chars""" + return bool(ID_PATTERN.match(paste_id)) + + +def save_paste(content, language=None, is_code=False, is_encrypted=False): + """store a paste in the database, return its id""" + conn = sqlite3.connect(str(DB_PATH)) + try: + for _ in range(5): + paste_id = generate_id() + try: + conn.execute( + "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + ( + paste_id, + content, + language, + int(is_code), + int(is_encrypted), + int(time.time()), + ), + ) + conn.commit() + return paste_id + except sqlite3.IntegrityError: + continue + raise RuntimeError("failed to generate unique paste id") + finally: + conn.close() + + +def get_paste(paste_id): + """retrieve a paste by id, returns dict or None""" + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() + conn.close() + if row: + return dict(row) + return None + + +# mojicrypt helpers + + +def mojicrypt_encrypt(text, passphrase): + """encrypt text with mojicrypt, return glyph string or None on failure""" + try: + result = subprocess.run( + ["mojicrypt", "encrypt", "-p", passphrase], + input=text, + capture_output=True, + text=True, + timeout=120, + check=False, + ) + if result.returncode != 0: + return None + return result.stdout.strip() + except (subprocess.TimeoutExpired, FileNotFoundError): + return None + + +def mojicrypt_decrypt(encrypted_blob, passphrase): + """decrypt a mojicrypt glyph string, return plaintext or None on failure""" + try: + result = subprocess.run( + ["mojicrypt", "decrypt", "-p", passphrase], + input=encrypted_blob, + capture_output=True, + text=True, + timeout=120, + check=False, + ) + if result.returncode != 0: + return None + return result.stdout.strip() + except (subprocess.TimeoutExpired, FileNotFoundError): + return None + + +# html templates + + +def landing_page(): + return (BASE_DIR / "index.html").read_text(encoding="utf-8") + + +def paste_page(paste): + """render the view page for a paste""" + paste_id = paste["id"] + content = paste["content"] + is_code = paste["is_code"] + is_encrypted = paste["is_encrypted"] + language = paste["language"] or "" + created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(paste["created_at"])) + + if is_encrypted: + # show decrypt form instead of content + content_block = f""" + <form id="decrypt-form"> + <p>this paste is password-protected.</p> + <input type="password" id="decrypt-pass" placeholder="passphrase" required> + <button type="submit" id="decrypt-btn">decrypt</button> + <span id="decrypt-status" style="margin-left:0.5rem;"></span> + </form> + <div id="paste-content" style="display:none;"></div> + <script> + document.getElementById("decrypt-form").addEventListener("submit", async function(e) {{ + e.preventDefault(); + const btn = document.getElementById("decrypt-btn"); + const status = document.getElementById("decrypt-status"); + const pass = document.getElementById("decrypt-pass").value; + btn.disabled = true; + btn.textContent = "--->"; + status.textContent = "decrypting..."; + const resp = await fetch("/api/decrypt", {{ + method: "POST", + headers: {{"Content-Type": "application/json"}}, + body: JSON.stringify({{id: "{paste_id}", passphrase: pass}}) + }}); + const data = await resp.json(); + if (data.error) {{ + alert(data.error); + btn.disabled = false; + btn.textContent = "decrypt"; + status.textContent = ""; + return; + }} + status.textContent = "decrypted."; + document.getElementById("decrypt-form").style.display = "none"; + const el = document.getElementById("paste-content"); + el.style.display = "block"; + {"" if not is_code else f''' + const codeEl = document.createElement("pre"); + const codeInner = document.createElement("code"); + codeInner.className = "{("language-" + language) if language else ""}"; + codeInner.textContent = data.content; + codeEl.appendChild(codeInner); + el.appendChild(codeEl); + hljs.highlightElement(codeInner); + '''} + {"" if is_code else ''' + const pre = document.createElement("pre"); + pre.textContent = data.content; + el.appendChild(pre); + '''} + // update copy button + document.getElementById("copy-btn").onclick = function() {{ + copyPaste(); + }}; + // keep decrypted text in a runtime-only container for copy. + el.dataset.decryptedContent = data.content; + }}); + </script>""" + else: + escaped = html_escape(content) + if is_code: + lang_class = f'class="language-{language}"' if language else "" + content_block = ( + f'<pre><code id="paste-code" {lang_class}>{escaped}</code></pre>' + ) + else: + content_block = f'<pre id="paste-plain">{escaped}</pre>' + + highlight_css = "" + highlight_js = "" + if is_code: + highlight_css = '<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/github-dark.min.css">' + highlight_js = """<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script> +<script>hljs.highlightAll();</script>""" + + return f"""<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="utf-8"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <meta name="color-scheme" content="light dark"> + <meta name="robots" content="noindex, nofollow"> + <title>kj-clipboard - {paste_id}</title> + <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> + {highlight_css} +</head> +<body> + <h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1> + <p class="meta">created {created}{(" · " + language) if language else ""}{" · encrypted" if is_encrypted else ""}</p> + <div class="actions"> + <button id="copy-btn" onclick="copyPaste()">copy to clipboard</button> + <a href="/raw/{paste_id}">raw</a> + </div> + {content_block} + {highlight_js} + <script> + function copyPaste() {{ + const btn = document.getElementById("copy-btn"); + let text = ""; + + const decryptedWrap = document.getElementById("paste-content"); + if (decryptedWrap && decryptedWrap.dataset && decryptedWrap.dataset.decryptedContent) {{ + text = decryptedWrap.dataset.decryptedContent; + }} else {{ + const code = document.getElementById("paste-code"); + const plain = document.getElementById("paste-plain"); + if (code) {{ + text = code.textContent || ""; + }} else if (plain) {{ + text = plain.textContent || ""; + }} + }} + + if (!text) {{ + btn.textContent = "nothing to copy"; + setTimeout(() => btn.textContent = "copy to clipboard", 1500); + return; + }} + + navigator.clipboard.writeText(text); + btn.textContent = "copied!"; + setTimeout(() => btn.textContent = "copy to clipboard", 1500); + }} + </script> +</body> +</html>""" + + +def not_found_page(): + return """<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="utf-8"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <meta name="color-scheme" content="light dark"> + <title>kj-clipboard - not found</title> + <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> +</head> +<body> + <h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1> + <p>paste not found.</p> +</body> +</html>""" + + +# helpers + + +def html_escape(text): + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + + +def html_escape_attr(text): + return html_escape(text).replace("\n", " ").replace("\r", " ") + + +def normalize_language(value): + """normalize and validate highlight.js language token""" + if value is None: + return None + if not isinstance(value, str): + return None + lang = value.strip().lower() + if not lang: + return None + if len(lang) > MAX_LANGUAGE_SIZE: + return None + if not LANGUAGE_PATTERN.match(lang): + return None + if lang not in ALLOWED_LANGUAGES: + return None + return lang + + +def get_client_ip(handler): + """use x-forwarded-for first when present (nginx reverse proxy)""" + xff = handler.headers.get("X-Forwarded-For", "").strip() + if xff: + return xff.split(",")[0].strip() + return handler.client_address[0] + + +def is_rate_limited(client_ip): + """fixed window limiter for POST endpoints""" + now = int(time.time()) + window = now // RATE_WINDOW_SECONDS + + with _rate_lock: + key = (client_ip, window) + count = _rate_state.get(key, 0) + 1 + _rate_state[key] = count + + # cheap cleanup to avoid unbounded growth + stale_before = window - 2 + stale_keys = [k for k in _rate_state if k[1] < stale_before] + for k in stale_keys: + _rate_state.pop(k, None) + + return count > REQUESTS_PER_WINDOW + + +# request handler + + +class ClipboardHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + client_ip = get_client_ip(self) + sys.stderr.write( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} — {fmt % args}\n" + ) + + def add_security_headers(self): + self.send_header("X-Content-Type-Options", "nosniff") + self.send_header("X-Frame-Options", "DENY") + self.send_header("Referrer-Policy", "no-referrer") + self.send_header("Permissions-Policy", "interest-cohort=()") + self.send_header("Cross-Origin-Opener-Policy", "same-origin") + self.send_header("Cross-Origin-Resource-Policy", "same-origin") + self.send_header("Cache-Control", "no-store") + # CSP allows required CDNs and inline scripts currently used in templates. + self.send_header( + "Content-Security-Policy", + "default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " + "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; " + "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'", + ) + + def send_html(self, code, body): + data = body.encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def send_json(self, code, obj): + data = json.dumps(obj, separators=(",", ":")).encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def send_plain(self, code, text): + data = text.encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def read_body(self, max_size): + try: + length = int(self.headers.get("Content-Length", "0")) + except ValueError: + return None + if length <= 0 or length > max_size: + return None + return self.rfile.read(length) + + def read_json_body(self, max_size): + ctype = self.headers.get("Content-Type", "").split(";")[0].strip().lower() + if ctype != "application/json": + return None + body = self.read_body(max_size) + if body is None: + return None + try: + data = json.loads(body) + except (json.JSONDecodeError, TypeError, UnicodeDecodeError): + return None + if not isinstance(data, dict): + return None + return data + + def do_GET(self): + try: + parsed = urlparse(self.path) + path = unquote(parsed.path).rstrip("/") or "/" + + if path == "/healthz": + self.send_plain(200, "ok") + return + + if path == "/": + self.send_html(200, landing_page()) + return + + if path.startswith("/raw/"): + paste_id = path[5:] + if not is_valid_paste_id(paste_id): + self.send_plain(404, "not found") + return + paste = get_paste(paste_id) + if not paste: + self.send_plain(404, "not found") + return + if paste["is_encrypted"]: + self.send_plain( + 403, + "403: this paste is encrypted; raw view is not available - use the web interface to decrypt and view the content", + ) + return + self.send_plain(200, paste["content"]) + return + + # treat anything else as a paste id + paste_id = path.lstrip("/") + if not is_valid_paste_id(paste_id): + self.send_html(404, not_found_page()) + return + + paste = get_paste(paste_id) + if not paste: + self.send_html(404, not_found_page()) + return + + self.send_html(200, paste_page(paste)) + except Exception: + self.send_json(500, {"error": "internal server error"}) + + def do_POST(self): + try: + client_ip = get_client_ip(self) + if is_rate_limited(client_ip): + self.send_json(429, {"error": "rate limit exceeded"}) + return + + parsed = urlparse(self.path) + path = unquote(parsed.path) + + if path == "/api/paste": + self.handle_create_paste() + elif path == "/api/decrypt": + self.handle_decrypt() + else: + self.send_json(404, {"error": "not found"}) + except Exception: + self.send_json(500, {"error": "internal server error"}) + + def handle_create_paste(self): + data = self.read_json_body(MAX_PASTE_SIZE) + if data is None: + self.send_json(400, {"error": "invalid request"}) + return + + content = data.get("content", "") + if not isinstance(content, str): + self.send_json(400, {"error": "content is required"}) + return + + # preserve exact content while blocking empty/oversized payloads + if not content.strip(): + self.send_json(400, {"error": "content is required"}) + return + if len(content.encode("utf-8")) > MAX_PASTE_SIZE: + self.send_json(413, {"error": "paste too large (max 1 MiB)"}) + return + + is_code = bool(data.get("is_code", False)) + language = normalize_language(data.get("language", "")) + passphrase = data.get("passphrase", "") + if passphrase is None: + passphrase = "" + if not isinstance(passphrase, str): + self.send_json(400, {"error": "invalid passphrase"}) + return + passphrase = passphrase.strip() + if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: + self.send_json(400, {"error": "passphrase too long"}) + return + + if data.get("language", "") and language is None: + self.send_json(400, {"error": "invalid language name"}) + return + + is_encrypted = False + store_content = content + + if passphrase: + encrypted = mojicrypt_encrypt(content, passphrase) + if encrypted is None: + self.send_json( + 500, {"error": "encryption failed — is mojicrypt installed?"} + ) + return + store_content = encrypted + is_encrypted = True + + paste_id = save_paste( + content=store_content, + language=language, + is_code=is_code, + is_encrypted=is_encrypted, + ) + + self.send_json(200, {"id": paste_id, "url": f"/{paste_id}"}) + + def handle_decrypt(self): + data = self.read_json_body(MAX_DECRYPT_SIZE) + if data is None: + self.send_json(400, {"error": "invalid request"}) + return + + paste_id = data.get("id", "") + passphrase = data.get("passphrase", "") + + if not isinstance(paste_id, str) or not isinstance(passphrase, str): + self.send_json(400, {"error": "id and passphrase are required"}) + return + passphrase = passphrase.strip() + if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: + self.send_json(400, {"error": "passphrase too long"}) + return + + if not paste_id or not passphrase: + self.send_json(400, {"error": "id and passphrase are required"}) + return + + if not is_valid_paste_id(paste_id): + self.send_json(404, {"error": "paste not found"}) + return + + paste = get_paste(paste_id) + if not paste: + self.send_json(404, {"error": "paste not found"}) + return + + if not paste["is_encrypted"]: + self.send_json(400, {"error": "paste is not encrypted"}) + return + + plaintext = mojicrypt_decrypt(paste["content"], passphrase) + if plaintext is None: + self.send_json(403, {"error": "wrong passphrase or corrupted data"}) + return + + self.send_json(200, {"content": plaintext}) + + +# main + + +class ClipboardHTTPServer(http.server.ThreadingHTTPServer): + daemon_threads = True + allow_reuse_address = True + + +def main(): + init_db() + print(f"kj-clipboard — listening on {BIND}:{PORT}") + + server = ClipboardHTTPServer((BIND, PORT), ClipboardHandler) + shutdown_requested = threading.Event() + + def request_shutdown(msg): + if shutdown_requested.is_set(): + return + shutdown_requested.set() + print(msg) + # shutdown() should run outside the serving thread. + threading.Thread(target=server.shutdown, daemon=True).start() + + def _shutdown_handler(signum, _frame): + request_shutdown(f"\nreceived signal {signum}, shutting down.") + + signal.signal(signal.SIGTERM, _shutdown_handler) + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nreceived keyboard interrupt, shutting down.") + finally: + server.server_close() + + +if __name__ == "__main__": + main() |
