From 1ec631fe7f154fba67e91c69bb752d79aef79ca5 Mon Sep 17 00:00:00 2001 From: kj_sh604 Date: Sat, 14 Mar 2026 17:18:25 -0400 Subject: refactor: src/ --- src/.gitignore | 5 + src/data/.db-gets-made-here | 1 + src/index.html | 1 + src/server.py | 845 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 852 insertions(+) create mode 100644 src/.gitignore create mode 100644 src/data/.db-gets-made-here create mode 100644 src/index.html create mode 100644 src/server.py diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..afa7079 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1,5 @@ +data/kj-clipboard.db +__pycache__/ +*.pyc +.venv/ +venv/ \ No newline at end of file diff --git a/src/data/.db-gets-made-here b/src/data/.db-gets-made-here new file mode 100644 index 0000000..47a6fda --- /dev/null +++ b/src/data/.db-gets-made-here @@ -0,0 +1 @@ +fr fr fs fs 67 67 \ No newline at end of file diff --git a/src/index.html b/src/index.html new file mode 100644 index 0000000..658d834 --- /dev/null +++ b/src/index.html @@ -0,0 +1 @@ +
no frills, just a public clipboard on the internet that you can use to share snippets around... that's it.
{escaped}'
+ )
+ else:
+ content_block = f'{escaped}'
+
+ highlight_css = ""
+ highlight_js = ""
+ if is_code:
+ highlight_css = ''
+ highlight_js = """
+"""
+
+ return f"""
+
+
+
+
+
+
+ paste not found.
+ +""" + + +# helpers + + +def html_escape(text): + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + + +def html_escape_attr(text): + return html_escape(text).replace("\n", " ").replace("\r", " ") + + +def normalize_language(value): + """normalize and validate highlight.js language token""" + if value is None: + return None + if not isinstance(value, str): + return None + lang = value.strip().lower() + if not lang: + return None + if len(lang) > MAX_LANGUAGE_SIZE: + return None + if not LANGUAGE_PATTERN.match(lang): + return None + if lang not in ALLOWED_LANGUAGES: + return None + return lang + + +def get_client_ip(handler): + """use x-forwarded-for first when present (nginx reverse proxy)""" + xff = handler.headers.get("X-Forwarded-For", "").strip() + if xff: + return xff.split(",")[0].strip() + return handler.client_address[0] + + +def is_rate_limited(client_ip): + """fixed window limiter for POST endpoints""" + now = int(time.time()) + window = now // RATE_WINDOW_SECONDS + + with _rate_lock: + key = (client_ip, window) + count = _rate_state.get(key, 0) + 1 + _rate_state[key] = count + + # cheap cleanup to avoid unbounded growth + stale_before = window - 2 + stale_keys = [k for k in _rate_state if k[1] < stale_before] + for k in stale_keys: + _rate_state.pop(k, None) + + return count > REQUESTS_PER_WINDOW + + +# request handler + + +class ClipboardHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + client_ip = get_client_ip(self) + sys.stderr.write( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} — {fmt % args}\n" + ) + + def add_security_headers(self): + self.send_header("X-Content-Type-Options", "nosniff") + self.send_header("X-Frame-Options", "DENY") + self.send_header("Referrer-Policy", "no-referrer") + self.send_header("Permissions-Policy", "interest-cohort=()") + self.send_header("Cross-Origin-Opener-Policy", "same-origin") + self.send_header("Cross-Origin-Resource-Policy", "same-origin") + self.send_header("Cache-Control", "no-store") + # CSP allows required CDNs and inline scripts currently used in templates. + self.send_header( + "Content-Security-Policy", + "default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " + "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; " + "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'", + ) + + def send_html(self, code, body): + data = body.encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def send_json(self, code, obj): + data = json.dumps(obj, separators=(",", ":")).encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def send_plain(self, code, text): + data = text.encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(data))) + self.add_security_headers() + self.end_headers() + self.wfile.write(data) + + def read_body(self, max_size): + try: + length = int(self.headers.get("Content-Length", "0")) + except ValueError: + return None + if length <= 0 or length > max_size: + return None + return self.rfile.read(length) + + def read_json_body(self, max_size): + ctype = self.headers.get("Content-Type", "").split(";")[0].strip().lower() + if ctype != "application/json": + return None + body = self.read_body(max_size) + if body is None: + return None + try: + data = json.loads(body) + except (json.JSONDecodeError, TypeError, UnicodeDecodeError): + return None + if not isinstance(data, dict): + return None + return data + + def do_GET(self): + try: + parsed = urlparse(self.path) + path = unquote(parsed.path).rstrip("/") or "/" + + if path == "/healthz": + self.send_plain(200, "ok") + return + + if path == "/": + self.send_html(200, landing_page()) + return + + if path.startswith("/raw/"): + paste_id = path[5:] + if not is_valid_paste_id(paste_id): + self.send_plain(404, "not found") + return + paste = get_paste(paste_id) + if not paste: + self.send_plain(404, "not found") + return + if paste["is_encrypted"]: + self.send_plain( + 403, + "403: this paste is encrypted; raw view is not available - use the web interface to decrypt and view the content", + ) + return + self.send_plain(200, paste["content"]) + return + + # treat anything else as a paste id + paste_id = path.lstrip("/") + if not is_valid_paste_id(paste_id): + self.send_html(404, not_found_page()) + return + + paste = get_paste(paste_id) + if not paste: + self.send_html(404, not_found_page()) + return + + self.send_html(200, paste_page(paste)) + except Exception: + self.send_json(500, {"error": "internal server error"}) + + def do_POST(self): + try: + client_ip = get_client_ip(self) + if is_rate_limited(client_ip): + self.send_json(429, {"error": "rate limit exceeded"}) + return + + parsed = urlparse(self.path) + path = unquote(parsed.path) + + if path == "/api/paste": + self.handle_create_paste() + elif path == "/api/decrypt": + self.handle_decrypt() + else: + self.send_json(404, {"error": "not found"}) + except Exception: + self.send_json(500, {"error": "internal server error"}) + + def handle_create_paste(self): + data = self.read_json_body(MAX_PASTE_SIZE) + if data is None: + self.send_json(400, {"error": "invalid request"}) + return + + content = data.get("content", "") + if not isinstance(content, str): + self.send_json(400, {"error": "content is required"}) + return + + # preserve exact content while blocking empty/oversized payloads + if not content.strip(): + self.send_json(400, {"error": "content is required"}) + return + if len(content.encode("utf-8")) > MAX_PASTE_SIZE: + self.send_json(413, {"error": "paste too large (max 1 MiB)"}) + return + + is_code = bool(data.get("is_code", False)) + language = normalize_language(data.get("language", "")) + passphrase = data.get("passphrase", "") + if passphrase is None: + passphrase = "" + if not isinstance(passphrase, str): + self.send_json(400, {"error": "invalid passphrase"}) + return + passphrase = passphrase.strip() + if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: + self.send_json(400, {"error": "passphrase too long"}) + return + + if data.get("language", "") and language is None: + self.send_json(400, {"error": "invalid language name"}) + return + + is_encrypted = False + store_content = content + + if passphrase: + encrypted = mojicrypt_encrypt(content, passphrase) + if encrypted is None: + self.send_json( + 500, {"error": "encryption failed — is mojicrypt installed?"} + ) + return + store_content = encrypted + is_encrypted = True + + paste_id = save_paste( + content=store_content, + language=language, + is_code=is_code, + is_encrypted=is_encrypted, + ) + + self.send_json(200, {"id": paste_id, "url": f"/{paste_id}"}) + + def handle_decrypt(self): + data = self.read_json_body(MAX_DECRYPT_SIZE) + if data is None: + self.send_json(400, {"error": "invalid request"}) + return + + paste_id = data.get("id", "") + passphrase = data.get("passphrase", "") + + if not isinstance(paste_id, str) or not isinstance(passphrase, str): + self.send_json(400, {"error": "id and passphrase are required"}) + return + passphrase = passphrase.strip() + if len(passphrase.encode("utf-8")) > MAX_PASSPHRASE_SIZE: + self.send_json(400, {"error": "passphrase too long"}) + return + + if not paste_id or not passphrase: + self.send_json(400, {"error": "id and passphrase are required"}) + return + + if not is_valid_paste_id(paste_id): + self.send_json(404, {"error": "paste not found"}) + return + + paste = get_paste(paste_id) + if not paste: + self.send_json(404, {"error": "paste not found"}) + return + + if not paste["is_encrypted"]: + self.send_json(400, {"error": "paste is not encrypted"}) + return + + plaintext = mojicrypt_decrypt(paste["content"], passphrase) + if plaintext is None: + self.send_json(403, {"error": "wrong passphrase or corrupted data"}) + return + + self.send_json(200, {"content": plaintext}) + + +# main + + +class ClipboardHTTPServer(http.server.ThreadingHTTPServer): + daemon_threads = True + allow_reuse_address = True + + +def main(): + init_db() + print(f"kj-clipboard — listening on {BIND}:{PORT}") + + server = ClipboardHTTPServer((BIND, PORT), ClipboardHandler) + shutdown_requested = threading.Event() + + def request_shutdown(msg): + if shutdown_requested.is_set(): + return + shutdown_requested.set() + print(msg) + # shutdown() should run outside the serving thread. + threading.Thread(target=server.shutdown, daemon=True).start() + + def _shutdown_handler(signum, _frame): + request_shutdown(f"\nreceived signal {signum}, shutting down.") + + signal.signal(signal.SIGTERM, _shutdown_handler) + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nreceived keyboard interrupt, shutting down.") + finally: + server.server_close() + + +if __name__ == "__main__": + main() -- cgit v1.2.3