From 52d57c912138f98906a4a0d77c8e6f43f47b4e57 Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 10 Apr 2026 20:40:09 +0200 Subject: [PATCH] Security hardening: HTML sanitization, WS auth, rate limiting, constant-time login --- main.py | 49 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index ac33fae6..06e2e02a 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,9 @@ stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and W endpoints for browser clients to retrieve live and historical data, trails, and per-character stats. """ +from collections import defaultdict from datetime import datetime, timedelta, timezone +import html as _html import json import logging import os @@ -1048,7 +1050,7 @@ class AuthMiddleware(BaseHTTPMiddleware): ): return await call_next(request) - # WebSocket upgrade for /ws/live — allow (browser WS, read-only) + # WebSocket upgrades bypass middleware (auth checked in handler) if path.startswith("/ws/live"): return await call_next(request) @@ -1302,9 +1304,26 @@ async def login_page(): return HTMLResponse("

Login page not found

", status_code=500) +# --------------- login security helpers --------------- +_login_attempts: Dict[str, float] = defaultdict(float) # IP -> last attempt timestamp +_LOGIN_COOLDOWN = 5 # seconds between attempts per IP +_DUMMY_HASH = _bcrypt.hashpw(b"dummy_constant_time_pad", _bcrypt.gensalt()).decode() + + @app.post("/login") async def login(request: Request): """Authenticate user and set session cookie.""" + # Rate limit: 1 attempt per 5 seconds per IP + client_ip = request.client.host if request.client else "unknown" + now = time.monotonic() + last = _login_attempts.get(client_ip, 0) + if now - last < _LOGIN_COOLDOWN: + raise HTTPException( + status_code=429, + detail="Too many login attempts. Try again in a few seconds.", + ) + _login_attempts[client_ip] = now + try: body = await request.json() except Exception: @@ -1318,7 +1337,13 @@ async def login(request: Request): "SELECT id, username, password_hash, is_admin FROM users WHERE LOWER(username) = :username", {"username": username}, ) - if not row or not _bcrypt.checkpw(password.encode(), row["password_hash"].encode()): + # Constant-time: always run bcrypt even if user doesn't exist + if row: + pw_ok = _bcrypt.checkpw(password.encode(), row["password_hash"].encode()) + else: + _bcrypt.checkpw(b"dummy", _DUMMY_HASH.encode()) + pw_ok = False + if not pw_ok: raise HTTPException(status_code=401, detail="Invalid username or password") token = create_session_cookie(row["username"], row["is_admin"]) @@ -1611,9 +1636,9 @@ async def add_issue(request: Request, issue: dict): issues = _load_issues() new_issue = { "id": uuid.uuid4().hex[:8], - "title": issue.get("title", "").strip(), - "description": issue.get("description", "").strip(), - "category": issue.get("category", "other"), + "title": _html.escape(issue.get("title", "").strip()), + "description": _html.escape(issue.get("description", "").strip()), + "category": _html.escape(issue.get("category", "other")), "author": user.get("username", "Anonymous"), "created": datetime.utcnow().isoformat(), "resolved": False, @@ -1636,14 +1661,14 @@ async def update_issue(issue_id: str, update: dict): if "resolved" in update: i["resolved"] = bool(update["resolved"]) if "title" in update: - title = update["title"].strip() + title = _html.escape(update["title"].strip()) if not title: raise HTTPException(status_code=400, detail="Title cannot be empty") i["title"] = title if "description" in update: - i["description"] = update["description"].strip() + i["description"] = _html.escape(update["description"].strip()) if "category" in update: - i["category"] = update["category"] + i["category"] = _html.escape(update["category"]) found = i break if not found: @@ -1664,7 +1689,7 @@ async def add_comment(issue_id: str, request: Request, comment: dict): break if not found: raise HTTPException(status_code=404, detail="Issue not found") - text = comment.get("text", "").strip() + text = _html.escape(comment.get("text", "").strip()) if not text: raise HTTPException(status_code=400, detail="Comment text is required") new_comment = { @@ -3155,6 +3180,12 @@ async def ws_live_updates(websocket: WebSocket): Manages a set of connected browser clients; listens for incoming command messages and forwards them to the appropriate plugin client WebSocket. """ + # Require valid session cookie for browser WebSocket + token = websocket.cookies.get("session") + if not token or not verify_session_cookie(token): + await websocket.close(code=4401, reason="Not authenticated") + return + global _browser_connections # Add new browser client to the set await websocket.accept()