diff options
| author | kj_sh604 | 2026-04-03 02:09:14 -0400 |
|---|---|---|
| committer | kj_sh604 | 2026-04-03 02:09:14 -0400 |
| commit | d6458885e00788e1dec779b6807a79646aac6ed6 (patch) | |
| tree | 242f105571de42ab2a0a810ca79d41de5921d20c | |
| parent | 90ba16a3b4d1dfc29a614522824d87c5cae14bf8 (diff) | |
refactor: xss and sql injection security hardening
| -rw-r--r-- | auth_backend.py | 22 | ||||
| -rw-r--r-- | shim_app.py | 280 |
2 files changed, 282 insertions, 20 deletions
diff --git a/auth_backend.py b/auth_backend.py index a8ad782..3902d1e 100644 --- a/auth_backend.py +++ b/auth_backend.py @@ -14,6 +14,9 @@ from typing import Callable, Optional, Protocol # we store encrypted challenge output instead of storing passwords. AUTH_CHALLENGE = "SHIM_AUTH_VALID" USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$") +UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" +) ConnectFn = Callable[[], sqlite3.Connection] @@ -39,6 +42,13 @@ def normalize_username(username: str) -> str: return username.strip().lower() +def normalize_uuid(value: str) -> Optional[str]: + candidate = (value or "").strip().lower() + if not UUID_RE.fullmatch(candidate): + return None + return candidate + + def looks_like_python_script(path: Path) -> bool: try: with open(path, "r", encoding="utf-8", errors="ignore") as f: @@ -93,6 +103,10 @@ class LocalMojicryptAuthBackend: return False, "username already exists" def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]: + normalized_user_uuid = normalize_uuid(user_uuid) + if normalized_user_uuid is None: + return False, "invalid user id" + normalized = normalize_username(new_username) username_error = self._validate_username(normalized) if username_error: @@ -102,7 +116,7 @@ class LocalMojicryptAuthBackend: with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET username = ? WHERE user_uuid = ?", - (normalized, user_uuid), + (normalized, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" @@ -112,6 +126,10 @@ class LocalMojicryptAuthBackend: return True, "username updated" def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]: + normalized_user_uuid = normalize_uuid(user_uuid) + if normalized_user_uuid is None: + return False, "invalid user id" + password_error = self._validate_password(new_password) if password_error: return False, password_error @@ -124,7 +142,7 @@ class LocalMojicryptAuthBackend: with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET encrypted_challenge = ? WHERE user_uuid = ?", - (encrypted, user_uuid), + (encrypted, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" diff --git a/shim_app.py b/shim_app.py index f7e68af..875e21b 100644 --- a/shim_app.py +++ b/shim_app.py @@ -43,7 +43,11 @@ MAX_EXTRACTED_BYTES = int( MAX_EXTRACTED_FILES = int(os.getenv("SHIM_MAX_EXTRACTED_FILES", "20000")) SESSION_COOKIE = "shim_session" ACTIVE_SITE_COOKIE = "shim_active_site" +MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"} SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$") +UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" +) ARCHIVE_SUFFIXES = ( ".zip", ".tar", @@ -67,10 +71,11 @@ SHELL_TEMPLATE = """<!doctype html> <meta http-equiv="cache-control" content="no-cache"> <meta http-equiv="expires" content="0"> <meta http-equiv="pragma" content="no-cache"> + <meta name="shim-csrf-token" content="{{ csrf_token or '' }}"> <link rel="icon" href="{{ url_for('favicon') }}" type="image/svg+xml"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> <link rel="stylesheet" href="{{ url_for('noir_overrides_css') }}"> - <script> + <script nonce="{{ script_nonce or '' }}"> if ("serviceWorker" in navigator && navigator.serviceWorker.getRegistrations) { navigator.serviceWorker.getRegistrations().then(function (registrations) { registrations.forEach(function (registration) { @@ -87,6 +92,8 @@ SHELL_TEMPLATE = """<!doctype html> (function () { var scrollKey = "__shim_form_scroll_restore__"; + var csrfMeta = document.querySelector("meta[name='shim-csrf-token']"); + var csrfToken = csrfMeta ? (csrfMeta.getAttribute("content") || "") : ""; try { var saved = sessionStorage.getItem(scrollKey); @@ -110,6 +117,24 @@ SHELL_TEMPLATE = """<!doctype html> if (!(form instanceof HTMLFormElement)) { return; } + + var confirmMessage = form.getAttribute("data-confirm"); + if (confirmMessage && !window.confirm(confirmMessage)) { + event.preventDefault(); + return; + } + + if (csrfToken && form.method && form.method.toUpperCase() === "POST") { + var hasToken = form.querySelector("input[name='_csrf_token']"); + if (!hasToken) { + var hidden = document.createElement("input"); + hidden.type = "hidden"; + hidden.name = "_csrf_token"; + hidden.value = csrfToken; + form.appendChild(hidden); + } + } + try { sessionStorage.setItem( scrollKey, @@ -121,6 +146,16 @@ SHELL_TEMPLATE = """<!doctype html> } catch (_err) { } }, true); + + document.addEventListener("focusin", function (event) { + var input = event.target; + if (!(input instanceof HTMLInputElement)) { + return; + } + if (input.hasAttribute("data-unlock-readonly")) { + input.removeAttribute("readonly"); + } + }); })(); </script> <title>{{ title }} - {{ app_name }}</title> @@ -163,7 +198,7 @@ SETUP_BODY_TEMPLATE = """ <form method="post" action="{{ url_for('setup_submit') }}" class="stack" autocomplete="off"> <label> username - <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -185,7 +220,7 @@ LOGIN_BODY_TEMPLATE = """ <form method="post" action="{{ url_for('login_submit') }}" class="stack" autocomplete="off"> <label> username - <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -236,7 +271,7 @@ DASHBOARD_BODY_TEMPLATE = """ <td>{{ site['original_filename'] }}</td> <td>{{ site['created_at'] }}</td> <td> - <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" onsubmit="return confirm('delete this site?');"> + <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" data-confirm="delete this site?"> <button type="submit" style="all: revert;">delete</button> </form> </td> @@ -289,7 +324,7 @@ DASHBOARD_BODY_TEMPLATE = """ <form method="post" action="{{ url_for('admin_create_user') }}" class="stack" autocomplete="off"> <label> username - <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -336,7 +371,7 @@ DASHBOARD_BODY_TEMPLATE = """ <button type="submit" style="all: revert;">set password</button> </form><br> {% if user['id'] != current_user['id'] %} - <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" onsubmit="return confirm('delete this user and all owned sites?');"> + <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" data-confirm="delete this user and all owned sites?"> <button type="submit" style="all: revert;">delete</button> </form> {% else %} @@ -374,6 +409,11 @@ def slug_is_valid(slug: str) -> bool: return bool(slug) and bool(SLUG_RE.fullmatch(slug)) +def uuid_is_valid(value: str) -> bool: + value = (value or "").strip().lower() + return bool(UUID_RE.fullmatch(value)) + + def detect_archive_suffix(filename: str) -> Optional[str]: lower = filename.lower() for suffix in ARCHIVE_SUFFIXES: @@ -386,6 +426,8 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]: name = raw_name.replace("\\", "/").strip() if not name: return None + if "\x00" in name: + raise ValueError("archive contains invalid null byte path") while name.startswith("./"): name = name[2:] if not name: @@ -395,6 +437,10 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]: parts = [part for part in name.split("/") if part not in ("", ".")] if not parts: return None + if any(len(part) > 255 for part in parts): + raise ValueError("archive contains overlong path component") + if len(parts) > 64: + raise ValueError("archive path depth is too large") if any(part == ".." for part in parts): raise ValueError("archive contains parent path traversal") if ":" in parts[0]: @@ -656,18 +702,33 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: app = Flask(__name__, static_folder=None) app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES + app.config["MAX_FORM_MEMORY_SIZE"] = int( + os.getenv("SHIM_MAX_FORM_MEMORY_SIZE", str(2 * 1024 * 1024)) + ) app.config["SECRET_KEY"] = os.getenv("SHIM_SECRET_KEY", secrets.token_hex(32)) app.config["SHIM_PORT"] = cfg.port app.config["SHIM_BIND"] = cfg.bind app.config["SHIM_APP_NAME"] = cfg.app_name app.config["SHIM_MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin) + cookie_secure_mode = os.getenv("SHIM_COOKIE_SECURE", "auto").strip().lower() + if cookie_secure_mode not in {"auto", "true", "false"}: + cookie_secure_mode = "auto" + def connect_db() -> sqlite3.Connection: conn = sqlite3.connect(str(cfg.db_path)) # row objects keep query call sites readable and less index-fragile. conn.row_factory = sqlite3.Row # keep fk checks enabled even on sqlite defaults that disable them. conn.execute("PRAGMA foreign_keys = ON") + try: + conn.enable_load_extension(False) + except (AttributeError, sqlite3.OperationalError): + pass + try: + conn.execute("PRAGMA trusted_schema = OFF") + except sqlite3.DatabaseError: + pass return conn with connect_db() as conn: @@ -685,6 +746,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, + csrf_token TEXT NOT NULL, expires_at INTEGER NOT NULL, created_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE @@ -732,6 +794,26 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: "CREATE UNIQUE INDEX IF NOT EXISTS idx_users_user_uuid ON users(user_uuid)" ) + # migration for existing installs - add and backfill csrf tokens for sessions. + session_columns = { + row["name"] + for row in conn.execute("PRAGMA table_info(sessions)").fetchall() + } + if "csrf_token" not in session_columns: + conn.execute("ALTER TABLE sessions ADD COLUMN csrf_token TEXT") + + missing_session_csrf = conn.execute( + """ + SELECT token FROM sessions + WHERE csrf_token IS NULL OR csrf_token = '' + """ + ).fetchall() + for row in missing_session_csrf: + conn.execute( + "UPDATE sessions SET csrf_token = ? WHERE token = ?", + (secrets.token_urlsafe(32), row["token"]), + ) + # migration for existing installs - add and backfill uuid ids for sites. site_columns = { row["name"] @@ -764,24 +846,43 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: def render_page(title: str, body_template: str, **context: object) -> str: body = render_template_string(body_template, **context) + csrf_token = "" + script_nonce = secrets.token_urlsafe(16) + g.script_nonce = script_nonce + if g.current_user is not None: + csrf_token = g.current_user["csrf_token"] return render_template_string( SHELL_TEMPLATE, title=title, app_name=cfg.app_name, current_user=g.current_user, + csrf_token=csrf_token, + script_nonce=script_nonce, body=body, ) + def cookie_secure_enabled() -> bool: + if cookie_secure_mode == "true": + return True + if cookie_secure_mode == "false": + return False + if request.is_secure: + return True + xfp = request.headers.get("X-Forwarded-Proto", "") + forwarded_proto = xfp.split(",", 1)[0].strip().lower() + return forwarded_proto == "https" + def create_session(user_id: int) -> str: token = secrets.token_urlsafe(48) + csrf_token = secrets.token_urlsafe(32) now = int(time.time()) with connect_db() as conn: conn.execute( """ - INSERT INTO sessions (token, user_id, expires_at, created_at) - VALUES (?, ?, ?, ?) + INSERT INTO sessions (token, user_id, csrf_token, expires_at, created_at) + VALUES (?, ?, ?, ?, ?) """, - (token, user_id, now + SESSION_TTL_SECONDS, now), + (token, user_id, csrf_token, now + SESSION_TTL_SECONDS, now), ) return token @@ -793,15 +894,78 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: now = int(time.time()) with connect_db() as conn: conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,)) - return conn.execute( + row = conn.execute( """ - SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role + SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token FROM sessions s JOIN users u ON u.id = s.user_id WHERE s.token = ? AND s.expires_at > ? """, (token, now), ).fetchone() + if row is None: + return None + if not row["csrf_token"]: + conn.execute( + "UPDATE sessions SET csrf_token = ? WHERE token = ?", + (secrets.token_urlsafe(32), token), + ) + row = conn.execute( + """ + SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token + FROM sessions s + JOIN users u ON u.id = s.user_id + WHERE s.token = ? AND s.expires_at > ? + """, + (token, now), + ).fetchone() + return row + + def is_same_origin_request() -> bool: + forwarded_proto = request.headers.get("X-Forwarded-Proto", "") + external_scheme = forwarded_proto.split(",", 1)[0].strip().lower() + if external_scheme not in {"http", "https"}: + external_scheme = request.scheme + external_netloc = request.host + + origin = request.headers.get("Origin") + if origin: + try: + parsed_origin = urlparse(origin) + except ValueError: + return False + return ( + parsed_origin.scheme == external_scheme + and parsed_origin.netloc == external_netloc + ) + + referer = request.headers.get("Referer") + if referer: + try: + parsed_referer = urlparse(referer) + except ValueError: + return False + return ( + parsed_referer.scheme == external_scheme + and parsed_referer.netloc == external_netloc + ) + + return True + + def is_valid_csrf_for_request() -> bool: + if g.current_user is None: + return True + expected = g.current_user["csrf_token"] or "" + if not expected: + return False + + supplied = request.form.get("_csrf_token", "") + if not supplied: + supplied = request.headers.get("X-CSRF-Token", "") + + if not supplied: + return False + return secrets.compare_digest(expected, supplied) def require_auth() -> Optional[Response]: if g.current_user is not None: @@ -864,6 +1028,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: return candidate def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response: + g.is_hosted_site_response = True site_root = (cfg.sites_dir / site["storage_key"]).resolve() if not site_root.is_dir(): abort(404) @@ -889,12 +1054,61 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: @app.before_request def load_current_user() -> None: g.current_user = None + g.is_hosted_site_response = False token = request.cookies.get(SESSION_COOKIE) - if not token: - return - user = user_from_token(token) - if user is not None: - g.current_user = user + if token: + user = user_from_token(token) + if user is not None: + g.current_user = user + + if request.method in MUTATING_METHODS and request.path.startswith("/app/"): + if not is_same_origin_request(): + abort(403) + if not is_valid_csrf_for_request(): + abort(403) + + @app.after_request + def add_security_headers(response: Response) -> Response: + response.headers.setdefault("X-Content-Type-Options", "nosniff") + response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") + response.headers.setdefault("X-Permitted-Cross-Domain-Policies", "none") + + if cookie_secure_enabled(): + response.headers.setdefault( + "Strict-Transport-Security", + "max-age=31536000; includeSubDomains", + ) + + if not getattr(g, "is_hosted_site_response", False): + nonce = getattr(g, "script_nonce", "") + script_src = "script-src 'self'" + if nonce: + script_src += f" 'nonce-{nonce}'" + response.headers.setdefault("X-Frame-Options", "DENY") + response.headers.setdefault( + "Permissions-Policy", + "camera=(), microphone=(), geolocation=(), payment=()", + ) + response.headers.setdefault("Cross-Origin-Opener-Policy", "same-origin") + response.headers.setdefault("Cross-Origin-Resource-Policy", "same-origin") + response.headers.setdefault( + "Content-Security-Policy", + "default-src 'self'; " + + script_src + + "; " + "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " + "img-src 'self' data: blob: https:; " + "font-src 'self' data:; " + "connect-src 'self'; " + "object-src 'none'; " + "base-uri 'self'; " + "frame-ancestors 'none'; " + "form-action 'self'", + ) + if request.path.startswith("/app") and response.mimetype == "text/html": + response.headers["Cache-Control"] = "no-store" + + return response @app.errorhandler(413) def too_large(_: Exception) -> tuple[str, int]: @@ -1008,6 +1222,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", + secure=cookie_secure_enabled(), path="/", ) flash("admin account created", "success") @@ -1042,6 +1257,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", + secure=cookie_secure_enabled(), path="/", ) flash("signed in", "success") @@ -1053,7 +1269,15 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if token: destroy_session(token) response = redirect(url_for("login")) - response.set_cookie(SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", path="/") + response.set_cookie( + SESSION_COOKIE, + "", + max_age=0, + httponly=True, + samesite="Lax", + secure=cookie_secure_enabled(), + path="/", + ) return response @app.post("/app/upload") @@ -1131,6 +1355,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if auth_redirect is not None: return auth_redirect + if not uuid_is_valid(site_id): + flash("invalid site id", "error") + return redirect(url_for("dashboard")) + with connect_db() as conn: site = conn.execute( "SELECT site_uuid, owner_user_id, slug, storage_key FROM sites WHERE site_uuid = ?", @@ -1171,6 +1399,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + username = request.form.get("username", "") ok, message = auth_backend.update_username(user_uuid=user_id, new_username=username) flash(message, "success" if ok else "error") @@ -1182,6 +1414,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + password = request.form.get("password", "") ok, message = auth_backend.update_password(user_uuid=user_id, new_password=password) flash(message, "success" if ok else "error") @@ -1193,6 +1429,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + if g.current_user["id"] == user_id: flash("you cannot delete your own account", "error") return redirect(url_for("dashboard")) @@ -1225,6 +1465,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(site_id): + flash("invalid site id", "error") + return redirect(url_for("dashboard")) + new_slug = request.form.get("slug", "").strip().lower() if not slug_is_valid(new_slug): flash("invalid slug format", "error") @@ -1280,7 +1524,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: def site_root_relative_fallback(subpath: str) -> Response: # handle absolute root paths from hosted apps by resolving the slug from referer/cookie first = subpath.split("/", 1)[0].lower() - if first in {"app", "api", "s", "_site", "healthz"}: + if first in {"app", "api", "s", "_site", "healthz", "favicon.svg", "robots.txt"}: abort(404) slug = extract_slug_from_referer(request.headers.get("Referer")) |
