aboutsummaryrefslogtreecommitdiffstats
path: root/server.py
diff options
context:
space:
mode:
authorkj_sh6042026-03-14 17:18:25 -0400
committerkj_sh6042026-03-14 17:18:25 -0400
commitc18bce8e602c8f1001bd691af9ee2d59912f4250 (patch)
treefa556036221cd76f401db6e63ea95611086aee30 /server.py
parent6846209854bce6a0f154ce19033b06f449abd71c (diff)
refactor: server.py
Diffstat (limited to 'server.py')
-rw-r--r--server.py845
1 files changed, 0 insertions, 845 deletions
diff --git a/server.py b/server.py
deleted file mode 100644
index 79e8079..0000000
--- a/server.py
+++ /dev/null
@@ -1,845 +0,0 @@
-#!/usr/bin/env python3
-
-# kj-clipboard server — no frills public clipboard
-# single-file server: sqlite, mojicrypt encryption, syntax highlighting
-# usage: python3 server.py
-
-import http.server
-import json
-import os
-import re
-import secrets
-import signal
-import sqlite3
-import subprocess
-import sys
-import threading
-import time
-from pathlib import Path
-from urllib.parse import urlparse, unquote
-
-
-# config
-
-PORT = int(os.environ.get("KJ_CLIPBOARD_PORT", 5555))
-BIND = os.environ.get("KJ_CLIPBOARD_BIND", "0.0.0.0")
-BASE_DIR = Path(__file__).parent.resolve()
-DB_PATH = BASE_DIR / "data" / "kj-clipboard.db"
-RANDOM_ID_LENGTH = 40 # random chars after unix epoch
-MAX_PASTE_SIZE = 1024 * 1024 # 1 MiB
-MAX_PASSPHRASE_SIZE = 256
-MAX_LANGUAGE_SIZE = 32
-MAX_DECRYPT_SIZE = 2 * 1024 * 1024 # includes encrypted glyph overhead
-ID_PATTERN = re.compile(r"^[0-9]{10,}[a-f0-9]{40}$")
-LANGUAGE_PATTERN = re.compile(r"^[a-z0-9_+#-]{1,32}$")
-
-REQUESTS_PER_WINDOW = int(os.environ.get("KJ_CLIPBOARD_RATE_LIMIT", "60"))
-RATE_WINDOW_SECONDS = int(os.environ.get("KJ_CLIPBOARD_RATE_WINDOW", "60"))
-
-ALLOWED_LANGUAGES = {
- "1c",
- "abnf",
- "accesslog",
- "ada",
- "arduino",
- "armasm",
- "asciidoc",
- "aspectj",
- "autohotkey",
- "autoit",
- "avrasm",
- "awk",
- "bash",
- "basic",
- "bnf",
- "brainfuck",
- "c",
- "cal",
- "capnproto",
- "ceylon",
- "clean",
- "clojure",
- "clojure-repl",
- "cmake",
- "coffeescript",
- "coq",
- "cos",
- "cpp",
- "crmsh",
- "crystal",
- "csharp",
- "csp",
- "css",
- "d",
- "dart",
- "delphi",
- "diff",
- "django",
- "dns",
- "dockerfile",
- "dos",
- "dsconfig",
- "dts",
- "dust",
- "ebnf",
- "elixir",
- "elm",
- "erb",
- "erlang",
- "excel",
- "fix",
- "flix",
- "fortran",
- "fsharp",
- "gams",
- "gauss",
- "gcode",
- "gherkin",
- "glsl",
- "gml",
- "go",
- "golo",
- "gradle",
- "graphql",
- "groovy",
- "haml",
- "handlebars",
- "haskell",
- "haxe",
- "hsp",
- "http",
- "hy",
- "inform7",
- "ini",
- "irpf90",
- "isbl",
- "java",
- "javascript",
- "jboss-cli",
- "json",
- "julia",
- "kotlin",
- "lasso",
- "latex",
- "ldif",
- "leaf",
- "less",
- "lisp",
- "livecodeserver",
- "livescript",
- "llvm",
- "lsl",
- "lua",
- "makefile",
- "markdown",
- "mathematica",
- "matlab",
- "maxima",
- "mel",
- "mercury",
- "mipsasm",
- "mizar",
- "mojolicious",
- "monkey",
- "moonscript",
- "n1ql",
- "nestedtext",
- "nginx",
- "nim",
- "nix",
- "node-repl",
- "nsis",
- "objectivec",
- "ocaml",
- "openscad",
- "oxygene",
- "parser3",
- "perl",
- "pf",
- "pgsql",
- "php",
- "php-template",
- "plaintext",
- "pony",
- "powershell",
- "processing",
- "profile",
- "prolog",
- "properties",
- "protobuf",
- "puppet",
- "purebasic",
- "python",
- "python-repl",
- "q",
- "qml",
- "r",
- "reasonml",
- "rib",
- "roboconf",
- "routeros",
- "rsl",
- "ruby",
- "ruleslanguage",
- "rust",
- "sas",
- "scala",
- "scheme",
- "scilab",
- "scss",
- "shell",
- "smali",
- "smalltalk",
- "sml",
- "sqf",
- "sql",
- "stan",
- "stata",
- "step21",
- "stylus",
- "subunit",
- "swift",
- "taggerscript",
- "tap",
- "tcl",
- "thrift",
- "tp",
- "twig",
- "typescript",
- "vala",
- "vbnet",
- "vbscript",
- "verilog",
- "vhdl",
- "vim",
- "wasm",
- "wren",
- "x86asm",
- "xl",
- "xml",
- "xquery",
- "yaml",
- "zephir",
-}
-
-
-# in-memory fixed-window rate limiter (cheap guardrail behind nginx)
-_rate_lock = threading.Lock()
-_rate_state = {}
-
-
-# database
-
-
-def init_db():
- DB_PATH.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(str(DB_PATH))
- conn.execute("PRAGMA journal_mode=WAL")
- conn.execute("PRAGMA synchronous=NORMAL")
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS pastes (
- id TEXT PRIMARY KEY,
- content TEXT NOT NULL,
- language TEXT DEFAULT NULL,
- is_code INTEGER DEFAULT 0,
- is_encrypted INTEGER DEFAULT 0,
- created_at INTEGER NOT NULL
- )
- """
- )
- conn.commit()
- conn.close()
-
-
-def generate_id():
- """generate id in format: <unix-epoch><40-char-random-hex>"""
- return f"{int(time.time())}{secrets.token_hex(RANDOM_ID_LENGTH // 2)}"
-
-
-def is_valid_paste_id(paste_id):
- """validate id format: unix epoch prefix + 40 hex chars"""
- return bool(ID_PATTERN.match(paste_id))
-
-
-def save_paste(content, language=None, is_code=False, is_encrypted=False):
- """store a paste in the database, return its id"""
- conn = sqlite3.connect(str(DB_PATH))
- try:
- for _ in range(5):
- paste_id = generate_id()
- try:
- conn.execute(
- "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) "
- "VALUES (?, ?, ?, ?, ?, ?)",
- (
- paste_id,
- content,
- language,
- int(is_code),
- int(is_encrypted),
- int(time.time()),
- ),
- )
- conn.commit()
- return paste_id
- except sqlite3.IntegrityError:
- continue
- raise RuntimeError("failed to generate unique paste id")
- finally:
- conn.close()
-
-
-def get_paste(paste_id):
- """retrieve a paste by id, returns dict or None"""
- conn = sqlite3.connect(str(DB_PATH))
- conn.row_factory = sqlite3.Row
- row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone()
- conn.close()
- if row:
- return dict(row)
- return None
-
-
-# mojicrypt helpers
-
-
-def mojicrypt_encrypt(text, passphrase):
- """encrypt text with mojicrypt, return glyph string or None on failure"""
- try:
- result = subprocess.run(
- ["mojicrypt", "encrypt", "-p", passphrase],
- input=text,
- capture_output=True,
- text=True,
- timeout=120,
- check=False,
- )
- if result.returncode != 0:
- return None
- return result.stdout.strip()
- except (subprocess.TimeoutExpired, FileNotFoundError):
- return None
-
-
-def mojicrypt_decrypt(encrypted_blob, passphrase):
- """decrypt a mojicrypt glyph string, return plaintext or None on failure"""
- try:
- result = subprocess.run(
- ["mojicrypt", "decrypt", "-p", passphrase],
- input=encrypted_blob,
- capture_output=True,
- text=True,
- timeout=120,
- check=False,
- )
- if result.returncode != 0:
- return None
- return result.stdout.strip()
- except (subprocess.TimeoutExpired, FileNotFoundError):
- return None
-
-
-# html templates
-
-
-def landing_page():
- return (BASE_DIR / "index.html").read_text(encoding="utf-8")
-
-
-def paste_page(paste):
- """render the view page for a paste"""
- paste_id = paste["id"]
- content = paste["content"]
- is_code = paste["is_code"]
- is_encrypted = paste["is_encrypted"]
- language = paste["language"] or ""
- created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(paste["created_at"]))
-
- if is_encrypted:
- # show decrypt form instead of content
- content_block = f"""
- <form id="decrypt-form">
- <p>this paste is password-protected.</p>
- <input type="password" id="decrypt-pass" placeholder="passphrase" required>
- <button type="submit" id="decrypt-btn">decrypt</button>
- <span id="decrypt-status" style="margin-left:0.5rem;"></span>
- </form>
- <div id="paste-content" style="display:none;"></div>
- <script>
- document.getElementById("decrypt-form").addEventListener("submit", async function(e) {{
- e.preventDefault();
- const btn = document.getElementById("decrypt-btn");
- const status = document.getElementById("decrypt-status");
- const pass = document.getElementById("decrypt-pass").value;
- btn.disabled = true;
- btn.textContent = "--->";
- status.textContent = "decrypting...";
- const resp = await fetch("/api/decrypt", {{
- method: "POST",
- headers: {{"Content-Type": "application/json"}},
- body: JSON.stringify({{id: "{paste_id}", passphrase: pass}})
- }});
- const data = await resp.json();
- if (data.error) {{
- alert(data.error);
- btn.disabled = false;
- btn.textContent = "decrypt";
- status.textContent = "";
- return;
- }}
- status.textContent = "decrypted.";
- document.getElementById("decrypt-form").style.display = "none";
- const el = document.getElementById("paste-content");
- el.style.display = "block";
- {"" if not is_code else f'''
- const codeEl = document.createElement("pre");
- const codeInner = document.createElement("code");
- codeInner.className = "{("language-" + language) if language else ""}";
- codeInner.textContent = data.content;
- codeEl.appendChild(codeInner);
- el.appendChild(codeEl);
- hljs.highlightElement(codeInner);
- '''}
- {"" if is_code else '''
- const pre = document.createElement("pre");
- pre.textContent = data.content;
- el.appendChild(pre);
- '''}
- // update copy button
- document.getElementById("copy-btn").onclick = function() {{
- copyPaste();
- }};
- // keep decrypted text in a runtime-only container for copy.
- el.dataset.decryptedContent = data.content;
- }});
- </script>"""
- else:
- escaped = html_escape(content)
- if is_code:
- lang_class = f'class="language-{language}"' if language else ""
- content_block = (
- f'<pre><code id="paste-code" {lang_class}>{escaped}</code></pre>'
- )
- else:
- content_block = f'<pre id="paste-plain">{escaped}</pre>'
-
- highlight_css = ""
- highlight_js = ""
- if is_code:
- highlight_css = '<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/github-dark.min.css">'
- highlight_js = """<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>
-<script>hljs.highlightAll();</script>"""
-
- return f"""<!DOCTYPE html>
-<html lang="en">
-<head>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <meta name="color-scheme" content="light dark">
- <meta name="robots" content="noindex, nofollow">
- <title>kj-clipboard - {paste_id}</title>
- <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css">
- {highlight_css}
-</head>
-<body>
- <h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1>
- <p class="meta">created {created}{(" · " + language) if language else ""}{" · encrypted" if is_encrypted else ""}</p>
- <div class="actions">
- <button id="copy-btn" onclick="copyPaste()">copy to clipboard</button>
- <a href="/raw/{paste_id}">raw</a>
- </div>
- {content_block}
- {highlight_js}
- <script>
- function copyPaste() {{
- const btn = document.getElementById("copy-btn");
- let text = "";
-
- const decryptedWrap = document.getElementById("paste-content");
- if (decryptedWrap && decryptedWrap.dataset && decryptedWrap.dataset.decryptedContent) {{
- text = decryptedWrap.dataset.decryptedContent;
- }} else {{
- const code = document.getElementById("paste-code");
- const plain = document.getElementById("paste-plain");
- if (code) {{
- text = code.textContent || "";
- }} else if (plain) {{
- text = plain.textContent || "";
- }}
- }}
-
- if (!text) {{
- btn.textContent = "nothing to copy";
- setTimeout(() => btn.textContent = "copy to clipboard", 1500);
- return;
- }}
-
- navigator.clipboard.writeText(text);
- btn.textContent = "copied!";
- setTimeout(() => btn.textContent = "copy to clipboard", 1500);
- }}
- </script>
-</body>
-</html>"""
-
-
-def not_found_page():
- return """<!DOCTYPE html>
-<html lang="en">
-<head>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <meta name="color-scheme" content="light dark">
- <title>kj-clipboard - not found</title>
- <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css">
-</head>
-<body>
- <h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1>
- <p>paste not found.</p>
-</body>
-</html>"""
-
-
-# helpers
-
-
-def html_escape(text):
- return (
- text.replace("&", "&amp;")
- .replace("<", "&lt;")
- .replace(">", "&gt;")
- .replace('"', "&quot;")
- .replace("'", "&#x27;")
- )
-
-
-def html_escape_attr(text):
- return html_escape(text).replace("\n", "&#10;").replace("\r", "&#13;")
-
-
-def normalize_language(value):
- """normalize and validate highlight.js language token"""
- if value is None:
- return None
- if not isinstance(value, str):
- return None
- lang = value.strip().lower()
- if not lang:
- return None
- if len(lang) > MAX_LANGUAGE_SIZE:
- return None
- if not LANGUAGE_PATTERN.match(lang):
- return None
- if lang not in ALLOWED_LANGUAGES:
- return None
- return lang
-
-
-def get_client_ip(handler):
- """use x-forwarded-for first when present (nginx reverse proxy)"""
- xff = handler.headers.get("X-Forwarded-For", "").strip()
- if xff:
- return xff.split(",")[0].strip()
- return handler.client_address[0]
-
-
-def is_rate_limited(client_ip):
- """fixed window limiter for POST endpoints"""
- now = int(time.time())
- window = now // RATE_WINDOW_SECONDS
-
- with _rate_lock:
- key = (client_ip, window)
- count = _rate_state.get(key, 0) + 1
- _rate_state[key] = count
-
- # cheap cleanup to avoid unbounded growth
- stale_before = window - 2
- stale_keys = [k for k in _rate_state if k[1] < stale_before]
- for k in stale_keys:
- _rate_state.pop(k, None)
-
- return count > REQUESTS_PER_WINDOW
-
-
-# request handler
-
-
-class ClipboardHandler(http.server.BaseHTTPRequestHandler):
- def log_message(self, fmt, *args):
- client_ip = get_client_ip(self)
- sys.stderr.write(
- f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} — {fmt % args}\n"
- )
-
- def add_security_headers(self):
- self.send_header("X-Content-Type-Options", "nosniff")
- self.send_header("X-Frame-Options", "DENY")
- self.send_header("Referrer-Policy", "no-referrer")
- self.send_header("Permissions-Policy", "interest-cohort=()")
- self.send_header("Cross-Origin-Opener-Policy", "same-origin")
- self.send_header("Cross-Origin-Resource-Policy", "same-origin")
- self.send_header("Cache-Control", "no-store")
- # CSP allows required CDNs and inline scripts currently used in templates.
- self.send_header(
- "Content-Security-Policy",
- "default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; "
- "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; "
- "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'",
- )
-
- def send_html(self, code, body):
- data = body.encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", "text/html; charset=utf-8")
- self.send_header("Content-Length", str(len(data)))
- self.add_security_headers()
- self.end_headers()
- self.wfile.write(data)
-
- def send_json(self, code, obj):
- data = json.dumps(obj, separators=(",", ":")).encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", "application/json")
- self.send_header("Content-Length", str(len(data)))
- self.add_security_headers()
- self.end_headers()
- self.wfile.write(data)
-
- def send_plain(self, code, text):
- data = text.encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", "text/plain; charset=utf-8")
- self.send_header("Content-Length", str(len(data)))
- self.add_security_headers()
- self.end_headers()
- self.wfile.write(data)
-
- def read_body(self, max_size):
- try:
- length = int(self.headers.get("Content-Length", "0"))
- except ValueError:
- return None
- if length <= 0 or length > max_size:
- return None
- return self.rfile.read(length)
-
- def read_json_body(self, max_size):
- ctype = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
- if ctype != "application/json":
- return None
- body = self.read_body(max_size)
- if body is None:
- return None
- try:
- data = json.loads(body)
- except (json.JSONDecodeError, TypeError, UnicodeDecodeError):
- return None
- if not isinstance(data, dict):
- return None
- return data
-
- def do_GET(self):
- try:
- parsed = urlparse(self.path)
- path = unquote(parsed.path).rstrip("/") or "/"
-
- if path == "/healthz":
- self.send_plain(200, "ok")
- return
-
- if path == "/":
- self.send_html(200, landing_page())
- return
-
- if path.startswith("/raw/"):
- paste_id = path[5:]
- if not is_valid_paste_id(paste_id):
- self.send_plain(404, "not found")
- return
- paste = get_paste(paste_id)
- if not paste:
- self.send_plain(404, "not found")
- return
- if paste["is_encrypted"]:
- self.send_plain(
- 403,
- "403: this paste is encrypted; raw view is not available - use the web interface to decrypt and view the content",
- )
- return
- self.send_plain(200, paste["content"])
- return
-
- # treat anything else as a paste id
- paste_id = path.lstrip("/")
- if not is_valid_paste_id(paste_id):
- self.send_html(404, not_found_page())
- return
-
- paste = get_paste(paste_id)
- if not paste:
- self.send_html(404, not_found_page())
- return
-
- self.send_html(200, paste_page(paste))
- except Exception:
- self.send_json(500, {"error": "internal server error"})
-
- def do_POST(self):
- try:
- client_ip = get_client_ip(self)
- if is_rate_limited(client_ip):
- self.send_json(429, {"error": "rate limit exceeded"})
- return
-
- parsed = urlparse(self.path)
- path = unquote(parsed.path)
-
- if path == "/api/paste":
- self.handle_create_paste()
- elif path == "/api/decrypt":
- self.handle_decrypt()
- else:
- self.send_json(404, {"error": "not found"})
- except Exception:
- self.send_json(500, {"error": "internal server error"})
-
- def handle_create_paste(self):
- data = self.read_json_body(MAX_PASTE_SIZE)
- if data is None:
- self.send_json(400, {"error": "invalid request"})
- return
-
- content = data.get("content", "")
- if not isinstance(content, str):
- self.send_json(400, {"error": "content is required"})
- return
-
- # preserve exact content while blocking empty/oversized payloads
- if not content.strip():
- self.send_json(400, {"error": "content is required"})
- return
- if len(content.encode("utf-8")) > MAX_PASTE_SIZE:
- self.send_json(413, {"error": "paste too large (max 1 MiB)"})
- return
-
- is_code = bool(data.get("is_code", False))
- language = normalize_language(data.get("language", ""))
- passphrase = data.get("passphrase", "")
- if passphrase is None:
- passphrase = ""
- if not isinstance(passphrase, str):
- self.send_json(400, {"error": "invalid passphrase"})
- return
- passphrase = passphrase.strip()
- if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE:
- self.send_json(400, {"error": "passphrase too long"})
- return
-
- if data.get("language", "") and language is None:
- self.send_json(400, {"error": "invalid language name"})
- return
-
- is_encrypted = False
- store_content = content
-
- if passphrase:
- encrypted = mojicrypt_encrypt(content, passphrase)
- if encrypted is None:
- self.send_json(
- 500, {"error": "encryption failed — is mojicrypt installed?"}
- )
- return
- store_content = encrypted
- is_encrypted = True
-
- paste_id = save_paste(
- content=store_content,
- language=language,
- is_code=is_code,
- is_encrypted=is_encrypted,
- )
-
- self.send_json(200, {"id": paste_id, "url": f"/{paste_id}"})
-
- def handle_decrypt(self):
- data = self.read_json_body(MAX_DECRYPT_SIZE)
- if data is None:
- self.send_json(400, {"error": "invalid request"})
- return
-
- paste_id = data.get("id", "")
- passphrase = data.get("passphrase", "")
-
- if not isinstance(paste_id, str) or not isinstance(passphrase, str):
- self.send_json(400, {"error": "id and passphrase are required"})
- return
- passphrase = passphrase.strip()
- if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE:
- self.send_json(400, {"error": "passphrase too long"})
- return
-
- if not paste_id or not passphrase:
- self.send_json(400, {"error": "id and passphrase are required"})
- return
-
- if not is_valid_paste_id(paste_id):
- self.send_json(404, {"error": "paste not found"})
- return
-
- paste = get_paste(paste_id)
- if not paste:
- self.send_json(404, {"error": "paste not found"})
- return
-
- if not paste["is_encrypted"]:
- self.send_json(400, {"error": "paste is not encrypted"})
- return
-
- plaintext = mojicrypt_decrypt(paste["content"], passphrase)
- if plaintext is None:
- self.send_json(403, {"error": "wrong passphrase or corrupted data"})
- return
-
- self.send_json(200, {"content": plaintext})
-
-
-# main
-
-
-class ClipboardHTTPServer(http.server.ThreadingHTTPServer):
- daemon_threads = True
- allow_reuse_address = True
-
-
-def main():
- init_db()
- print(f"kj-clipboard — listening on {BIND}:{PORT}")
-
- server = ClipboardHTTPServer((BIND, PORT), ClipboardHandler)
- shutdown_requested = threading.Event()
-
- def request_shutdown(msg):
- if shutdown_requested.is_set():
- return
- shutdown_requested.set()
- print(msg)
- # shutdown() should run outside the serving thread.
- threading.Thread(target=server.shutdown, daemon=True).start()
-
- def _shutdown_handler(signum, _frame):
- request_shutdown(f"\nreceived signal {signum}, shutting down.")
-
- signal.signal(signal.SIGTERM, _shutdown_handler)
-
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- print("\nreceived keyboard interrupt, shutting down.")
- finally:
- server.server_close()
-
-
-if __name__ == "__main__":
- main()