diff options
| author | Kyle Javier [kj_sh604] | 2026-04-03 03:05:38 -0400 |
|---|---|---|
| committer | GitHub | 2026-04-03 03:05:38 -0400 |
| commit | 9f0a5a6fce2621e320fdeb751243b4878f920401 (patch) | |
| tree | f14f6261e61e9733a6fdbe412796387008e9df46 | |
| parent | 90ba16a3b4d1dfc29a614522824d87c5cae14bf8 (diff) | |
| parent | 4fb393c616d1743b97618c734fd534d09b9bf5ee (diff) | |
[merge] pull request #1 from kj-sh604/feat/securityfeat/security
feat: security
| -rw-r--r-- | README | 31 | ||||
| -rw-r--r-- | auth_backend.py | 57 | ||||
| -rw-r--r-- | requirements.txt | 3 | ||||
| -rw-r--r-- | shim_app.py | 360 |
4 files changed, 372 insertions, 79 deletions
@@ -0,0 +1,31 @@ +shim +---- + +small static site host for archive uploads. + +what it does + - users upload one archive, app publishes it under a slug + - public routes: /s/<slug>/... and /_site/<slug>/... + +quick start (assumes POSIX) + - python3 -m venv .venv + - source .venv/bin/activate + - pip install -r requirements.txt + - python3 server.py + - open http://127.0.0.1:8585/app + +config + - SHIM_APP_NAME: ui/app name (default: shim) + - SHIM_BIND: bind address (default: 0.0.0.0) + - SHIM_PORT: port (default: 8585) + - SHIM_MOJICRYPT_BIN: mojicrypt path (default: ./vendor/mojicrypt) + - SHIM_COOKIE_SECURE: auto|true|false (default: auto) + - SHIM_SQLITE_TIMEOUT_SECONDS (default: 30.0) + - SHIM_SQLITE_BUSY_TIMEOUT_MS (default: 30000) + - SHIM_SQLITE_CACHE_SIZE_KIB (default: 32768) + - SHIM_SQLITE_MMAP_SIZE_BYTES (default: 268435456) + - SHIM_SQLITE_WAL_AUTOCHECKPOINT_PAGES (default: 1000) + +data paths + - db: data/shim.db + - site files: data/sites/ diff --git a/auth_backend.py b/auth_backend.py index a8ad782..817f8d4 100644 --- a/auth_backend.py +++ b/auth_backend.py @@ -9,11 +9,16 @@ import uuid from pathlib import Path from typing import Callable, Optional, Protocol +from email_validator import EmailNotValidError, validate_email + # fixed challenge text used for passphrase verification. # we store encrypted challenge output instead of storing passwords. AUTH_CHALLENGE = "SHIM_AUTH_VALID" -USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$") +HANDLE_RE = re.compile(r"^[a-z0-9_.-]{2,64}$") +UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" +) ConnectFn = Callable[[], sqlite3.Connection] @@ -39,6 +44,30 @@ def normalize_username(username: str) -> str: return username.strip().lower() +def normalize_uuid(value: str) -> Optional[str]: + candidate = (value or "").strip().lower() + if not UUID_RE.fullmatch(candidate): + return None + return candidate + + +def is_valid_email_username(value: str) -> bool: + if len(value) > 254: + return False + try: + validated = validate_email( + value, + check_deliverability=False, + allow_smtputf8=False, + allow_display_name=False, + ) + except EmailNotValidError: + return False + + # keep stored usernames predictable - only accept already normalized forms. + return validated.normalized == value + + def looks_like_python_script(path: Path) -> bool: try: with open(path, "r", encoding="utf-8", errors="ignore") as f: @@ -93,6 +122,10 @@ class LocalMojicryptAuthBackend: return False, "username already exists" def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]: + normalized_user_uuid = normalize_uuid(user_uuid) + if normalized_user_uuid is None: + return False, "invalid user id" + normalized = normalize_username(new_username) username_error = self._validate_username(normalized) if username_error: @@ -102,7 +135,7 @@ class LocalMojicryptAuthBackend: with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET username = ? WHERE user_uuid = ?", - (normalized, user_uuid), + (normalized, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" @@ -112,6 +145,10 @@ class LocalMojicryptAuthBackend: return True, "username updated" def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]: + normalized_user_uuid = normalize_uuid(user_uuid) + if normalized_user_uuid is None: + return False, "invalid user id" + password_error = self._validate_password(new_password) if password_error: return False, password_error @@ -124,7 +161,7 @@ class LocalMojicryptAuthBackend: with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET encrypted_challenge = ? WHERE user_uuid = ?", - (encrypted, user_uuid), + (encrypted, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" @@ -152,9 +189,17 @@ class LocalMojicryptAuthBackend: return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password) def _validate_username(self, username: str) -> Optional[str]: - if not USERNAME_RE.fullmatch(username): - return "username must be lowercase and use [a-z0-9_.-], 2-64 chars" - return None + if len(username) < 2: + return "username must be at least 2 characters" + if not username.isascii(): + return "username must use ascii characters only" + if HANDLE_RE.fullmatch(username): + return None + if is_valid_email_username(username): + return None + return ( + "username must be a handle [a-z0-9_.-] (2-64 chars) or a valid email" + ) def _validate_password(self, password: str) -> Optional[str]: if len(password) < 2: diff --git a/requirements.txt b/requirements.txt index cc83c7b..6a98c4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,9 @@ blinker==1.9.0 click==8.3.1 +dnspython==2.8.0 +email_validator==2.2.0 Flask==3.1.3 +idna==3.11 itsdangerous==2.2.0 Jinja2==3.1.6 MarkupSafe==3.0.3 diff --git a/shim_app.py b/shim_app.py index f7e68af..2e9bcdd 100644 --- a/shim_app.py +++ b/shim_app.py @@ -43,7 +43,11 @@ MAX_EXTRACTED_BYTES = int( MAX_EXTRACTED_FILES = int(os.getenv("SHIM_MAX_EXTRACTED_FILES", "20000")) SESSION_COOKIE = "shim_session" ACTIVE_SITE_COOKIE = "shim_active_site" +MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"} SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$") +UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" +) ARCHIVE_SUFFIXES = ( ".zip", ".tar", @@ -67,10 +71,11 @@ SHELL_TEMPLATE = """<!doctype html> <meta http-equiv="cache-control" content="no-cache"> <meta http-equiv="expires" content="0"> <meta http-equiv="pragma" content="no-cache"> + <meta name="shim-csrf-token" content="{{ csrf_token or '' }}"> <link rel="icon" href="{{ url_for('favicon') }}" type="image/svg+xml"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> <link rel="stylesheet" href="{{ url_for('noir_overrides_css') }}"> - <script> + <script nonce="{{ script_nonce or '' }}"> if ("serviceWorker" in navigator && navigator.serviceWorker.getRegistrations) { navigator.serviceWorker.getRegistrations().then(function (registrations) { registrations.forEach(function (registration) { @@ -87,6 +92,8 @@ SHELL_TEMPLATE = """<!doctype html> (function () { var scrollKey = "__shim_form_scroll_restore__"; + var csrfMeta = document.querySelector("meta[name='shim-csrf-token']"); + var csrfToken = csrfMeta ? (csrfMeta.getAttribute("content") || "") : ""; try { var saved = sessionStorage.getItem(scrollKey); @@ -110,6 +117,24 @@ SHELL_TEMPLATE = """<!doctype html> if (!(form instanceof HTMLFormElement)) { return; } + + var confirmMessage = form.getAttribute("data-confirm"); + if (confirmMessage && !window.confirm(confirmMessage)) { + event.preventDefault(); + return; + } + + if (csrfToken && form.method && form.method.toUpperCase() === "POST") { + var hasToken = form.querySelector("input[name='_csrf_token']"); + if (!hasToken) { + var hidden = document.createElement("input"); + hidden.type = "hidden"; + hidden.name = "_csrf_token"; + hidden.value = csrfToken; + form.appendChild(hidden); + } + } + try { sessionStorage.setItem( scrollKey, @@ -121,6 +146,16 @@ SHELL_TEMPLATE = """<!doctype html> } catch (_err) { } }, true); + + document.addEventListener("focusin", function (event) { + var input = event.target; + if (!(input instanceof HTMLInputElement)) { + return; + } + if (input.hasAttribute("data-unlock-readonly")) { + input.removeAttribute("readonly"); + } + }); })(); </script> <title>{{ title }} - {{ app_name }}</title> @@ -130,14 +165,13 @@ SHELL_TEMPLATE = """<!doctype html> <header class="topbar"> <h1>{{ app_name }}</h1> <nav> - <a href="{{ url_for('dashboard') }}">dashboard</a> {% if current_user %} - <span class="quiet">signed in as {{ current_user['username'] }} ({{ current_user['role'] }})</span> + <span class="quiet">{{ current_user['username'] }}{% if current_user['role'] == 'admin' %} ({{ current_user['role'] }}){% endif %}</span> <form method="post" action="{{ url_for('logout') }}" class="inline"> <button type="submit">logout</button> </form> {% else %} - <a href="{{ url_for('login') }}">login</a> + <a href="https://youtu.be/XGxIE1hr0w4" target="_blank">🧷</a> {% endif %} </nav> </header> @@ -162,8 +196,8 @@ SETUP_BODY_TEMPLATE = """ <p>first startup detected. <br><br> create the initial admin account to continue.</p> <form method="post" action="{{ url_for('setup_submit') }}" class="stack" autocomplete="off"> <label> - username - <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + username <strong>(admin)</strong> + <input name="username" type="text" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -185,7 +219,7 @@ LOGIN_BODY_TEMPLATE = """ <form method="post" action="{{ url_for('login_submit') }}" class="stack" autocomplete="off"> <label> username - <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + <input name="username" type="text" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -236,7 +270,7 @@ DASHBOARD_BODY_TEMPLATE = """ <td>{{ site['original_filename'] }}</td> <td>{{ site['created_at'] }}</td> <td> - <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" onsubmit="return confirm('delete this site?');"> + <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" data-confirm="delete this site?"> <button type="submit" style="all: revert;">delete</button> </form> </td> @@ -289,7 +323,7 @@ DASHBOARD_BODY_TEMPLATE = """ <form method="post" action="{{ url_for('admin_create_user') }}" class="stack" autocomplete="off"> <label> username - <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + <input type="text" name="username" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly> </label> <label> password @@ -327,7 +361,7 @@ DASHBOARD_BODY_TEMPLATE = """ <td>{{ user['created_at'] }}</td> <td> <form method="post" action="{{ url_for('admin_update_user_username', user_id=user['id']) }}" class="stack" autocomplete="off"> - <input type="text" name="username" value="{{ user['username'] }}" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" style="all: revert;"> + <input type="text" name="username" value="{{ user['username'] }}" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" style="all: revert;"> <button type="submit" style="all: revert;">rename user</button> </form> <br> @@ -336,7 +370,7 @@ DASHBOARD_BODY_TEMPLATE = """ <button type="submit" style="all: revert;">set password</button> </form><br> {% if user['id'] != current_user['id'] %} - <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" onsubmit="return confirm('delete this user and all owned sites?');"> + <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" data-confirm="delete this user and all owned sites?"> <button type="submit" style="all: revert;">delete</button> </form> {% else %} @@ -374,6 +408,11 @@ def slug_is_valid(slug: str) -> bool: return bool(slug) and bool(SLUG_RE.fullmatch(slug)) +def uuid_is_valid(value: str) -> bool: + value = (value or "").strip().lower() + return bool(UUID_RE.fullmatch(value)) + + def detect_archive_suffix(filename: str) -> Optional[str]: lower = filename.lower() for suffix in ARCHIVE_SUFFIXES: @@ -386,6 +425,8 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]: name = raw_name.replace("\\", "/").strip() if not name: return None + if "\x00" in name: + raise ValueError("archive contains invalid null byte path") while name.startswith("./"): name = name[2:] if not name: @@ -395,6 +436,10 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]: parts = [part for part in name.split("/") if part not in ("", ".")] if not parts: return None + if any(len(part) > 255 for part in parts): + raise ValueError("archive contains overlong path component") + if len(parts) > 64: + raise ValueError("archive path depth is too large") if any(part == ".." for part in parts): raise ValueError("archive contains parent path traversal") if ":" in parts[0]: @@ -630,6 +675,24 @@ def find_site_root(extracted_dir: Path) -> Path: return candidates[0].parent +def env_int(name: str, default: int, minimum: int) -> int: + raw = os.getenv(name, str(default)).strip() + try: + value = int(raw) + except ValueError: + value = default + return max(value, minimum) + + +def env_float(name: str, default: float, minimum: float) -> float: + raw = os.getenv(name, str(default)).strip() + try: + value = float(raw) + except ValueError: + value = default + return max(value, minimum) + + def create_app(base_dir: Optional[Path] = None) -> Flask: project_dir = Path(base_dir or Path(__file__).parent).resolve() app_name = os.getenv("SHIM_APP_NAME", "shim").strip() or "shim" @@ -648,7 +711,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: sites_dir=sites_dir, mojicrypt_bin=mojicrypt_bin, bind=os.getenv("SHIM_BIND", "0.0.0.0"), - port=int(os.getenv("SHIM_PORT", "6767")), + port=int(os.getenv("SHIM_PORT", "8585")), ) cfg.db_path.parent.mkdir(parents=True, exist_ok=True) @@ -656,18 +719,54 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: app = Flask(__name__, static_folder=None) app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES + app.config["MAX_FORM_MEMORY_SIZE"] = int( + os.getenv("SHIM_MAX_FORM_MEMORY_SIZE", str(2 * 1024 * 1024)) + ) app.config["SECRET_KEY"] = os.getenv("SHIM_SECRET_KEY", secrets.token_hex(32)) app.config["SHIM_PORT"] = cfg.port app.config["SHIM_BIND"] = cfg.bind app.config["SHIM_APP_NAME"] = cfg.app_name app.config["SHIM_MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin) + sqlite_timeout_seconds = env_float("SHIM_SQLITE_TIMEOUT_SECONDS", 30.0, 1.0) + sqlite_busy_timeout_ms = env_int("SHIM_SQLITE_BUSY_TIMEOUT_MS", 30000, 1000) + sqlite_cache_size_kib = env_int("SHIM_SQLITE_CACHE_SIZE_KIB", 32768, 4096) + sqlite_mmap_size_bytes = env_int( + "SHIM_SQLITE_MMAP_SIZE_BYTES", 256 * 1024 * 1024, 0 + ) + sqlite_wal_autocheckpoint_pages = env_int( + "SHIM_SQLITE_WAL_AUTOCHECKPOINT_PAGES", 1000, 100 + ) + + cookie_secure_mode = os.getenv("SHIM_COOKIE_SECURE", "auto").strip().lower() + if cookie_secure_mode not in {"auto", "true", "false"}: + cookie_secure_mode = "auto" + def connect_db() -> sqlite3.Connection: - conn = sqlite3.connect(str(cfg.db_path)) + conn = sqlite3.connect(str(cfg.db_path), timeout=sqlite_timeout_seconds) # row objects keep query call sites readable and less index-fragile. conn.row_factory = sqlite3.Row + # wal + busy timeout gives sqlite much better mixed read/write concurrency. + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA synchronous = NORMAL") + conn.execute(f"PRAGMA busy_timeout = {sqlite_busy_timeout_ms}") + conn.execute("PRAGMA temp_store = MEMORY") + conn.execute(f"PRAGMA cache_size = {-sqlite_cache_size_kib}") + conn.execute(f"PRAGMA wal_autocheckpoint = {sqlite_wal_autocheckpoint_pages}") + try: + conn.execute(f"PRAGMA mmap_size = {sqlite_mmap_size_bytes}") + except sqlite3.DatabaseError: + pass # keep fk checks enabled even on sqlite defaults that disable them. conn.execute("PRAGMA foreign_keys = ON") + try: + conn.enable_load_extension(False) + except (AttributeError, sqlite3.OperationalError): + pass + try: + conn.execute("PRAGMA trusted_schema = OFF") + except sqlite3.DatabaseError: + pass return conn with connect_db() as conn: @@ -685,6 +784,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, + csrf_token TEXT NOT NULL, expires_at INTEGER NOT NULL, created_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE @@ -708,54 +808,6 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: """ ) - # migration for existing installs - add and backfill uuid ids for users. - user_columns = { - row["name"] - for row in conn.execute("PRAGMA table_info(users)").fetchall() - } - if "user_uuid" not in user_columns: - conn.execute("ALTER TABLE users ADD COLUMN user_uuid TEXT") - - missing_user_ids = conn.execute( - """ - SELECT id FROM users - WHERE user_uuid IS NULL OR user_uuid = '' - """ - ).fetchall() - for row in missing_user_ids: - conn.execute( - "UPDATE users SET user_uuid = ? WHERE id = ?", - (str(uuid.uuid4()), row["id"]), - ) - - conn.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_users_user_uuid ON users(user_uuid)" - ) - - # migration for existing installs - add and backfill uuid ids for sites. - site_columns = { - row["name"] - for row in conn.execute("PRAGMA table_info(sites)").fetchall() - } - if "site_uuid" not in site_columns: - conn.execute("ALTER TABLE sites ADD COLUMN site_uuid TEXT") - - missing_site_ids = conn.execute( - """ - SELECT id FROM sites - WHERE site_uuid IS NULL OR site_uuid = '' - """ - ).fetchall() - for row in missing_site_ids: - conn.execute( - "UPDATE sites SET site_uuid = ? WHERE id = ?", - (str(uuid.uuid4()), row["id"]), - ) - - conn.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_sites_site_uuid ON sites(site_uuid)" - ) - # auth provider is injected as a single backend to keep swap-over simple. auth_backend: AuthBackend = LocalMojicryptAuthBackend( connect_db=connect_db, @@ -764,24 +816,43 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: def render_page(title: str, body_template: str, **context: object) -> str: body = render_template_string(body_template, **context) + csrf_token = "" + script_nonce = secrets.token_urlsafe(16) + g.script_nonce = script_nonce + if g.current_user is not None: + csrf_token = g.current_user["csrf_token"] return render_template_string( SHELL_TEMPLATE, title=title, app_name=cfg.app_name, current_user=g.current_user, + csrf_token=csrf_token, + script_nonce=script_nonce, body=body, ) + def cookie_secure_enabled() -> bool: + if cookie_secure_mode == "true": + return True + if cookie_secure_mode == "false": + return False + if request.is_secure: + return True + xfp = request.headers.get("X-Forwarded-Proto", "") + forwarded_proto = xfp.split(",", 1)[0].strip().lower() + return forwarded_proto == "https" + def create_session(user_id: int) -> str: token = secrets.token_urlsafe(48) + csrf_token = secrets.token_urlsafe(32) now = int(time.time()) with connect_db() as conn: conn.execute( """ - INSERT INTO sessions (token, user_id, expires_at, created_at) - VALUES (?, ?, ?, ?) + INSERT INTO sessions (token, user_id, csrf_token, expires_at, created_at) + VALUES (?, ?, ?, ?, ?) """, - (token, user_id, now + SESSION_TTL_SECONDS, now), + (token, user_id, csrf_token, now + SESSION_TTL_SECONDS, now), ) return token @@ -793,15 +864,78 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: now = int(time.time()) with connect_db() as conn: conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,)) - return conn.execute( + row = conn.execute( """ - SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role + SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token FROM sessions s JOIN users u ON u.id = s.user_id WHERE s.token = ? AND s.expires_at > ? """, (token, now), ).fetchone() + if row is None: + return None + if not row["csrf_token"]: + conn.execute( + "UPDATE sessions SET csrf_token = ? WHERE token = ?", + (secrets.token_urlsafe(32), token), + ) + row = conn.execute( + """ + SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token + FROM sessions s + JOIN users u ON u.id = s.user_id + WHERE s.token = ? AND s.expires_at > ? + """, + (token, now), + ).fetchone() + return row + + def is_same_origin_request() -> bool: + forwarded_proto = request.headers.get("X-Forwarded-Proto", "") + external_scheme = forwarded_proto.split(",", 1)[0].strip().lower() + if external_scheme not in {"http", "https"}: + external_scheme = request.scheme + external_netloc = request.host + + origin = request.headers.get("Origin") + if origin: + try: + parsed_origin = urlparse(origin) + except ValueError: + return False + return ( + parsed_origin.scheme == external_scheme + and parsed_origin.netloc == external_netloc + ) + + referer = request.headers.get("Referer") + if referer: + try: + parsed_referer = urlparse(referer) + except ValueError: + return False + return ( + parsed_referer.scheme == external_scheme + and parsed_referer.netloc == external_netloc + ) + + return True + + def is_valid_csrf_for_request() -> bool: + if g.current_user is None: + return True + expected = g.current_user["csrf_token"] or "" + if not expected: + return False + + supplied = request.form.get("_csrf_token", "") + if not supplied: + supplied = request.headers.get("X-CSRF-Token", "") + + if not supplied: + return False + return secrets.compare_digest(expected, supplied) def require_auth() -> Optional[Response]: if g.current_user is not None: @@ -864,6 +998,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: return candidate def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response: + g.is_hosted_site_response = True site_root = (cfg.sites_dir / site["storage_key"]).resolve() if not site_root.is_dir(): abort(404) @@ -889,12 +1024,61 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: @app.before_request def load_current_user() -> None: g.current_user = None + g.is_hosted_site_response = False token = request.cookies.get(SESSION_COOKIE) - if not token: - return - user = user_from_token(token) - if user is not None: - g.current_user = user + if token: + user = user_from_token(token) + if user is not None: + g.current_user = user + + if request.method in MUTATING_METHODS and request.path.startswith("/app/"): + if not is_same_origin_request(): + abort(403) + if not is_valid_csrf_for_request(): + abort(403) + + @app.after_request + def add_security_headers(response: Response) -> Response: + response.headers.setdefault("X-Content-Type-Options", "nosniff") + response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") + response.headers.setdefault("X-Permitted-Cross-Domain-Policies", "none") + + if cookie_secure_enabled(): + response.headers.setdefault( + "Strict-Transport-Security", + "max-age=31536000; includeSubDomains", + ) + + if not getattr(g, "is_hosted_site_response", False): + nonce = getattr(g, "script_nonce", "") + script_src = "script-src 'self'" + if nonce: + script_src += f" 'nonce-{nonce}'" + response.headers.setdefault("X-Frame-Options", "DENY") + response.headers.setdefault( + "Permissions-Policy", + "camera=(), microphone=(), geolocation=(), payment=()", + ) + response.headers.setdefault("Cross-Origin-Opener-Policy", "same-origin") + response.headers.setdefault("Cross-Origin-Resource-Policy", "same-origin") + response.headers.setdefault( + "Content-Security-Policy", + "default-src 'self'; " + + script_src + + "; " + "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " + "img-src 'self' data: blob: https:; " + "font-src 'self' data:; " + "connect-src 'self'; " + "object-src 'none'; " + "base-uri 'self'; " + "frame-ancestors 'none'; " + "form-action 'self'", + ) + if request.path.startswith("/app") and response.mimetype == "text/html": + response.headers["Cache-Control"] = "no-store" + + return response @app.errorhandler(413) def too_large(_: Exception) -> tuple[str, int]: @@ -1008,6 +1192,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", + secure=cookie_secure_enabled(), path="/", ) flash("admin account created", "success") @@ -1042,6 +1227,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", + secure=cookie_secure_enabled(), path="/", ) flash("signed in", "success") @@ -1053,7 +1239,15 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if token: destroy_session(token) response = redirect(url_for("login")) - response.set_cookie(SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", path="/") + response.set_cookie( + SESSION_COOKIE, + "", + max_age=0, + httponly=True, + samesite="Lax", + secure=cookie_secure_enabled(), + path="/", + ) return response @app.post("/app/upload") @@ -1131,6 +1325,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if auth_redirect is not None: return auth_redirect + if not uuid_is_valid(site_id): + flash("invalid site id", "error") + return redirect(url_for("dashboard")) + with connect_db() as conn: site = conn.execute( "SELECT site_uuid, owner_user_id, slug, storage_key FROM sites WHERE site_uuid = ?", @@ -1171,6 +1369,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + username = request.form.get("username", "") ok, message = auth_backend.update_username(user_uuid=user_id, new_username=username) flash(message, "success" if ok else "error") @@ -1182,6 +1384,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + password = request.form.get("password", "") ok, message = auth_backend.update_password(user_uuid=user_id, new_password=password) flash(message, "success" if ok else "error") @@ -1193,6 +1399,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(user_id): + flash("invalid user id", "error") + return redirect(url_for("dashboard")) + if g.current_user["id"] == user_id: flash("you cannot delete your own account", "error") return redirect(url_for("dashboard")) @@ -1225,6 +1435,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: if admin_redirect is not None: return admin_redirect + if not uuid_is_valid(site_id): + flash("invalid site id", "error") + return redirect(url_for("dashboard")) + new_slug = request.form.get("slug", "").strip().lower() if not slug_is_valid(new_slug): flash("invalid slug format", "error") @@ -1280,7 +1494,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask: def site_root_relative_fallback(subpath: str) -> Response: # handle absolute root paths from hosted apps by resolving the slug from referer/cookie first = subpath.split("/", 1)[0].lower() - if first in {"app", "api", "s", "_site", "healthz"}: + if first in {"app", "api", "s", "_site", "healthz", "favicon.svg", "robots.txt"}: abort(404) slug = extract_slug_from_referer(request.headers.get("Referer")) |
