Security hardening: HTML sanitization, WS auth, rate limiting, constant-time login

This commit is contained in:
Erik 2026-04-10 20:40:09 +02:00
parent 8f681398ee
commit 52d57c9121

49
main.py
View file

@ -6,7 +6,9 @@ stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and W
endpoints for browser clients to retrieve live and historical data, trails, and per-character stats.
"""
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import html as _html
import json
import logging
import os
@ -1048,7 +1050,7 @@ class AuthMiddleware(BaseHTTPMiddleware):
):
return await call_next(request)
# WebSocket upgrade for /ws/live — allow (browser WS, read-only)
# WebSocket upgrades bypass middleware (auth checked in handler)
if path.startswith("/ws/live"):
return await call_next(request)
@ -1302,9 +1304,26 @@ async def login_page():
return HTMLResponse("<h1>Login page not found</h1>", status_code=500)
# --------------- login security helpers ---------------
_login_attempts: Dict[str, float] = defaultdict(float) # IP -> last attempt timestamp
_LOGIN_COOLDOWN = 5 # seconds between attempts per IP
_DUMMY_HASH = _bcrypt.hashpw(b"dummy_constant_time_pad", _bcrypt.gensalt()).decode()
@app.post("/login")
async def login(request: Request):
"""Authenticate user and set session cookie."""
# Rate limit: 1 attempt per 5 seconds per IP
client_ip = request.client.host if request.client else "unknown"
now = time.monotonic()
last = _login_attempts.get(client_ip, 0)
if now - last < _LOGIN_COOLDOWN:
raise HTTPException(
status_code=429,
detail="Too many login attempts. Try again in a few seconds.",
)
_login_attempts[client_ip] = now
try:
body = await request.json()
except Exception:
@ -1318,7 +1337,13 @@ async def login(request: Request):
"SELECT id, username, password_hash, is_admin FROM users WHERE LOWER(username) = :username",
{"username": username},
)
if not row or not _bcrypt.checkpw(password.encode(), row["password_hash"].encode()):
# Constant-time: always run bcrypt even if user doesn't exist
if row:
pw_ok = _bcrypt.checkpw(password.encode(), row["password_hash"].encode())
else:
_bcrypt.checkpw(b"dummy", _DUMMY_HASH.encode())
pw_ok = False
if not pw_ok:
raise HTTPException(status_code=401, detail="Invalid username or password")
token = create_session_cookie(row["username"], row["is_admin"])
@ -1611,9 +1636,9 @@ async def add_issue(request: Request, issue: dict):
issues = _load_issues()
new_issue = {
"id": uuid.uuid4().hex[:8],
"title": issue.get("title", "").strip(),
"description": issue.get("description", "").strip(),
"category": issue.get("category", "other"),
"title": _html.escape(issue.get("title", "").strip()),
"description": _html.escape(issue.get("description", "").strip()),
"category": _html.escape(issue.get("category", "other")),
"author": user.get("username", "Anonymous"),
"created": datetime.utcnow().isoformat(),
"resolved": False,
@ -1636,14 +1661,14 @@ async def update_issue(issue_id: str, update: dict):
if "resolved" in update:
i["resolved"] = bool(update["resolved"])
if "title" in update:
title = update["title"].strip()
title = _html.escape(update["title"].strip())
if not title:
raise HTTPException(status_code=400, detail="Title cannot be empty")
i["title"] = title
if "description" in update:
i["description"] = update["description"].strip()
i["description"] = _html.escape(update["description"].strip())
if "category" in update:
i["category"] = update["category"]
i["category"] = _html.escape(update["category"])
found = i
break
if not found:
@ -1664,7 +1689,7 @@ async def add_comment(issue_id: str, request: Request, comment: dict):
break
if not found:
raise HTTPException(status_code=404, detail="Issue not found")
text = comment.get("text", "").strip()
text = _html.escape(comment.get("text", "").strip())
if not text:
raise HTTPException(status_code=400, detail="Comment text is required")
new_comment = {
@ -3155,6 +3180,12 @@ async def ws_live_updates(websocket: WebSocket):
Manages a set of connected browser clients; listens for incoming command messages
and forwards them to the appropriate plugin client WebSocket.
"""
# Require valid session cookie for browser WebSocket
token = websocket.cookies.get("session")
if not token or not verify_session_cookie(token):
await websocket.close(code=4401, reason="Not authenticated")
return
global _browser_connections
# Add new browser client to the set
await websocket.accept()