aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README31
-rw-r--r--auth_backend.py57
-rw-r--r--requirements.txt3
-rw-r--r--shim_app.py360
4 files changed, 372 insertions, 79 deletions
diff --git a/README b/README
new file mode 100644
index 0000000..1494a32
--- /dev/null
+++ b/README
@@ -0,0 +1,31 @@
+shim
+----
+
+small static site host for archive uploads.
+
+what it does
+ - users upload one archive, app publishes it under a slug
+ - public routes: /s/<slug>/... and /_site/<slug>/...
+
+quick start (assumes POSIX)
+ - python3 -m venv .venv
+ - source .venv/bin/activate
+ - pip install -r requirements.txt
+ - python3 server.py
+ - open http://127.0.0.1:8585/app
+
+config
+ - SHIM_APP_NAME: ui/app name (default: shim)
+ - SHIM_BIND: bind address (default: 0.0.0.0)
+ - SHIM_PORT: port (default: 8585)
+ - SHIM_MOJICRYPT_BIN: mojicrypt path (default: ./vendor/mojicrypt)
+ - SHIM_COOKIE_SECURE: auto|true|false (default: auto)
+ - SHIM_SQLITE_TIMEOUT_SECONDS (default: 30.0)
+ - SHIM_SQLITE_BUSY_TIMEOUT_MS (default: 30000)
+ - SHIM_SQLITE_CACHE_SIZE_KIB (default: 32768)
+ - SHIM_SQLITE_MMAP_SIZE_BYTES (default: 268435456)
+ - SHIM_SQLITE_WAL_AUTOCHECKPOINT_PAGES (default: 1000)
+
+data paths
+ - db: data/shim.db
+ - site files: data/sites/
diff --git a/auth_backend.py b/auth_backend.py
index a8ad782..817f8d4 100644
--- a/auth_backend.py
+++ b/auth_backend.py
@@ -9,11 +9,16 @@ import uuid
from pathlib import Path
from typing import Callable, Optional, Protocol
+from email_validator import EmailNotValidError, validate_email
+
# fixed challenge text used for passphrase verification.
# we store encrypted challenge output instead of storing passwords.
AUTH_CHALLENGE = "SHIM_AUTH_VALID"
-USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$")
+HANDLE_RE = re.compile(r"^[a-z0-9_.-]{2,64}$")
+UUID_RE = re.compile(
+ r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
+)
ConnectFn = Callable[[], sqlite3.Connection]
@@ -39,6 +44,30 @@ def normalize_username(username: str) -> str:
return username.strip().lower()
+def normalize_uuid(value: str) -> Optional[str]:
+ candidate = (value or "").strip().lower()
+ if not UUID_RE.fullmatch(candidate):
+ return None
+ return candidate
+
+
+def is_valid_email_username(value: str) -> bool:
+ if len(value) > 254:
+ return False
+ try:
+ validated = validate_email(
+ value,
+ check_deliverability=False,
+ allow_smtputf8=False,
+ allow_display_name=False,
+ )
+ except EmailNotValidError:
+ return False
+
+ # keep stored usernames predictable - only accept already normalized forms.
+ return validated.normalized == value
+
+
def looks_like_python_script(path: Path) -> bool:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
@@ -93,6 +122,10 @@ class LocalMojicryptAuthBackend:
return False, "username already exists"
def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]:
+ normalized_user_uuid = normalize_uuid(user_uuid)
+ if normalized_user_uuid is None:
+ return False, "invalid user id"
+
normalized = normalize_username(new_username)
username_error = self._validate_username(normalized)
if username_error:
@@ -102,7 +135,7 @@ class LocalMojicryptAuthBackend:
with self.connect_db() as conn:
cursor = conn.execute(
"UPDATE users SET username = ? WHERE user_uuid = ?",
- (normalized, user_uuid),
+ (normalized, normalized_user_uuid),
)
if cursor.rowcount == 0:
return False, "user not found"
@@ -112,6 +145,10 @@ class LocalMojicryptAuthBackend:
return True, "username updated"
def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]:
+ normalized_user_uuid = normalize_uuid(user_uuid)
+ if normalized_user_uuid is None:
+ return False, "invalid user id"
+
password_error = self._validate_password(new_password)
if password_error:
return False, password_error
@@ -124,7 +161,7 @@ class LocalMojicryptAuthBackend:
with self.connect_db() as conn:
cursor = conn.execute(
"UPDATE users SET encrypted_challenge = ? WHERE user_uuid = ?",
- (encrypted, user_uuid),
+ (encrypted, normalized_user_uuid),
)
if cursor.rowcount == 0:
return False, "user not found"
@@ -152,9 +189,17 @@ class LocalMojicryptAuthBackend:
return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password)
def _validate_username(self, username: str) -> Optional[str]:
- if not USERNAME_RE.fullmatch(username):
- return "username must be lowercase and use [a-z0-9_.-], 2-64 chars"
- return None
+ if len(username) < 2:
+ return "username must be at least 2 characters"
+ if not username.isascii():
+ return "username must use ascii characters only"
+ if HANDLE_RE.fullmatch(username):
+ return None
+ if is_valid_email_username(username):
+ return None
+ return (
+ "username must be a handle [a-z0-9_.-] (2-64 chars) or a valid email"
+ )
def _validate_password(self, password: str) -> Optional[str]:
if len(password) < 2:
diff --git a/requirements.txt b/requirements.txt
index cc83c7b..6a98c4f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,9 @@
blinker==1.9.0
click==8.3.1
+dnspython==2.8.0
+email_validator==2.2.0
Flask==3.1.3
+idna==3.11
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.3
diff --git a/shim_app.py b/shim_app.py
index f7e68af..2e9bcdd 100644
--- a/shim_app.py
+++ b/shim_app.py
@@ -43,7 +43,11 @@ MAX_EXTRACTED_BYTES = int(
MAX_EXTRACTED_FILES = int(os.getenv("SHIM_MAX_EXTRACTED_FILES", "20000"))
SESSION_COOKIE = "shim_session"
ACTIVE_SITE_COOKIE = "shim_active_site"
+MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$")
+UUID_RE = re.compile(
+ r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
+)
ARCHIVE_SUFFIXES = (
".zip",
".tar",
@@ -67,10 +71,11 @@ SHELL_TEMPLATE = """<!doctype html>
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<meta http-equiv="pragma" content="no-cache">
+ <meta name="shim-csrf-token" content="{{ csrf_token or '' }}">
<link rel="icon" href="{{ url_for('favicon') }}" type="image/svg+xml">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css">
<link rel="stylesheet" href="{{ url_for('noir_overrides_css') }}">
- <script>
+ <script nonce="{{ script_nonce or '' }}">
if ("serviceWorker" in navigator && navigator.serviceWorker.getRegistrations) {
navigator.serviceWorker.getRegistrations().then(function (registrations) {
registrations.forEach(function (registration) {
@@ -87,6 +92,8 @@ SHELL_TEMPLATE = """<!doctype html>
(function () {
var scrollKey = "__shim_form_scroll_restore__";
+ var csrfMeta = document.querySelector("meta[name='shim-csrf-token']");
+ var csrfToken = csrfMeta ? (csrfMeta.getAttribute("content") || "") : "";
try {
var saved = sessionStorage.getItem(scrollKey);
@@ -110,6 +117,24 @@ SHELL_TEMPLATE = """<!doctype html>
if (!(form instanceof HTMLFormElement)) {
return;
}
+
+ var confirmMessage = form.getAttribute("data-confirm");
+ if (confirmMessage && !window.confirm(confirmMessage)) {
+ event.preventDefault();
+ return;
+ }
+
+ if (csrfToken && form.method && form.method.toUpperCase() === "POST") {
+ var hasToken = form.querySelector("input[name='_csrf_token']");
+ if (!hasToken) {
+ var hidden = document.createElement("input");
+ hidden.type = "hidden";
+ hidden.name = "_csrf_token";
+ hidden.value = csrfToken;
+ form.appendChild(hidden);
+ }
+ }
+
try {
sessionStorage.setItem(
scrollKey,
@@ -121,6 +146,16 @@ SHELL_TEMPLATE = """<!doctype html>
} catch (_err) {
}
}, true);
+
+ document.addEventListener("focusin", function (event) {
+ var input = event.target;
+ if (!(input instanceof HTMLInputElement)) {
+ return;
+ }
+ if (input.hasAttribute("data-unlock-readonly")) {
+ input.removeAttribute("readonly");
+ }
+ });
})();
</script>
<title>{{ title }} - {{ app_name }}</title>
@@ -130,14 +165,13 @@ SHELL_TEMPLATE = """<!doctype html>
<header class="topbar">
<h1>{{ app_name }}</h1>
<nav>
- <a href="{{ url_for('dashboard') }}">dashboard</a>
{% if current_user %}
- <span class="quiet">signed in as {{ current_user['username'] }} ({{ current_user['role'] }})</span>
+ <span class="quiet">{{ current_user['username'] }}{% if current_user['role'] == 'admin' %} ({{ current_user['role'] }}){% endif %}</span>
<form method="post" action="{{ url_for('logout') }}" class="inline">
<button type="submit">logout</button>
</form>
{% else %}
- <a href="{{ url_for('login') }}">login</a>
+ <a href="https://youtu.be/XGxIE1hr0w4" target="_blank">🧷</a>
{% endif %}
</nav>
</header>
@@ -162,8 +196,8 @@ SETUP_BODY_TEMPLATE = """
<p>first startup detected. <br><br> create the initial admin account to continue.</p>
<form method="post" action="{{ url_for('setup_submit') }}" class="stack" autocomplete="off">
<label>
- username
- <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ username <strong>(admin)</strong>
+ <input name="username" type="text" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly>
</label>
<label>
password
@@ -185,7 +219,7 @@ LOGIN_BODY_TEMPLATE = """
<form method="post" action="{{ url_for('login_submit') }}" class="stack" autocomplete="off">
<label>
username
- <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ <input name="username" type="text" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly>
</label>
<label>
password
@@ -236,7 +270,7 @@ DASHBOARD_BODY_TEMPLATE = """
<td>{{ site['original_filename'] }}</td>
<td>{{ site['created_at'] }}</td>
<td>
- <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" onsubmit="return confirm('delete this site?');">
+ <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" data-confirm="delete this site?">
<button type="submit" style="all: revert;">delete</button>
</form>
</td>
@@ -289,7 +323,7 @@ DASHBOARD_BODY_TEMPLATE = """
<form method="post" action="{{ url_for('admin_create_user') }}" class="stack" autocomplete="off">
<label>
username
- <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ <input type="text" name="username" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" data-unlock-readonly="1" readonly>
</label>
<label>
password
@@ -327,7 +361,7 @@ DASHBOARD_BODY_TEMPLATE = """
<td>{{ user['created_at'] }}</td>
<td>
<form method="post" action="{{ url_for('admin_update_user_username', user_id=user['id']) }}" class="stack" autocomplete="off">
- <input type="text" name="username" value="{{ user['username'] }}" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" style="all: revert;">
+ <input type="text" name="username" value="{{ user['username'] }}" required minlength="2" maxlength="254" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" style="all: revert;">
<button type="submit" style="all: revert;">rename user</button>
</form>
<br>
@@ -336,7 +370,7 @@ DASHBOARD_BODY_TEMPLATE = """
<button type="submit" style="all: revert;">set password</button>
</form><br>
{% if user['id'] != current_user['id'] %}
- <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" onsubmit="return confirm('delete this user and all owned sites?');">
+ <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" data-confirm="delete this user and all owned sites?">
<button type="submit" style="all: revert;">delete</button>
</form>
{% else %}
@@ -374,6 +408,11 @@ def slug_is_valid(slug: str) -> bool:
return bool(slug) and bool(SLUG_RE.fullmatch(slug))
+def uuid_is_valid(value: str) -> bool:
+ value = (value or "").strip().lower()
+ return bool(UUID_RE.fullmatch(value))
+
+
def detect_archive_suffix(filename: str) -> Optional[str]:
lower = filename.lower()
for suffix in ARCHIVE_SUFFIXES:
@@ -386,6 +425,8 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]:
name = raw_name.replace("\\", "/").strip()
if not name:
return None
+ if "\x00" in name:
+ raise ValueError("archive contains invalid null byte path")
while name.startswith("./"):
name = name[2:]
if not name:
@@ -395,6 +436,10 @@ def normalize_archive_member_path(raw_name: str) -> Optional[Path]:
parts = [part for part in name.split("/") if part not in ("", ".")]
if not parts:
return None
+ if any(len(part) > 255 for part in parts):
+ raise ValueError("archive contains overlong path component")
+ if len(parts) > 64:
+ raise ValueError("archive path depth is too large")
if any(part == ".." for part in parts):
raise ValueError("archive contains parent path traversal")
if ":" in parts[0]:
@@ -630,6 +675,24 @@ def find_site_root(extracted_dir: Path) -> Path:
return candidates[0].parent
+def env_int(name: str, default: int, minimum: int) -> int:
+ raw = os.getenv(name, str(default)).strip()
+ try:
+ value = int(raw)
+ except ValueError:
+ value = default
+ return max(value, minimum)
+
+
+def env_float(name: str, default: float, minimum: float) -> float:
+ raw = os.getenv(name, str(default)).strip()
+ try:
+ value = float(raw)
+ except ValueError:
+ value = default
+ return max(value, minimum)
+
+
def create_app(base_dir: Optional[Path] = None) -> Flask:
project_dir = Path(base_dir or Path(__file__).parent).resolve()
app_name = os.getenv("SHIM_APP_NAME", "shim").strip() or "shim"
@@ -648,7 +711,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
sites_dir=sites_dir,
mojicrypt_bin=mojicrypt_bin,
bind=os.getenv("SHIM_BIND", "0.0.0.0"),
- port=int(os.getenv("SHIM_PORT", "6767")),
+ port=int(os.getenv("SHIM_PORT", "8585")),
)
cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
@@ -656,18 +719,54 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
app = Flask(__name__, static_folder=None)
app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES
+ app.config["MAX_FORM_MEMORY_SIZE"] = int(
+ os.getenv("SHIM_MAX_FORM_MEMORY_SIZE", str(2 * 1024 * 1024))
+ )
app.config["SECRET_KEY"] = os.getenv("SHIM_SECRET_KEY", secrets.token_hex(32))
app.config["SHIM_PORT"] = cfg.port
app.config["SHIM_BIND"] = cfg.bind
app.config["SHIM_APP_NAME"] = cfg.app_name
app.config["SHIM_MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin)
+ sqlite_timeout_seconds = env_float("SHIM_SQLITE_TIMEOUT_SECONDS", 30.0, 1.0)
+ sqlite_busy_timeout_ms = env_int("SHIM_SQLITE_BUSY_TIMEOUT_MS", 30000, 1000)
+ sqlite_cache_size_kib = env_int("SHIM_SQLITE_CACHE_SIZE_KIB", 32768, 4096)
+ sqlite_mmap_size_bytes = env_int(
+ "SHIM_SQLITE_MMAP_SIZE_BYTES", 256 * 1024 * 1024, 0
+ )
+ sqlite_wal_autocheckpoint_pages = env_int(
+ "SHIM_SQLITE_WAL_AUTOCHECKPOINT_PAGES", 1000, 100
+ )
+
+ cookie_secure_mode = os.getenv("SHIM_COOKIE_SECURE", "auto").strip().lower()
+ if cookie_secure_mode not in {"auto", "true", "false"}:
+ cookie_secure_mode = "auto"
+
def connect_db() -> sqlite3.Connection:
- conn = sqlite3.connect(str(cfg.db_path))
+ conn = sqlite3.connect(str(cfg.db_path), timeout=sqlite_timeout_seconds)
# row objects keep query call sites readable and less index-fragile.
conn.row_factory = sqlite3.Row
+ # wal + busy timeout gives sqlite much better mixed read/write concurrency.
+ conn.execute("PRAGMA journal_mode = WAL")
+ conn.execute("PRAGMA synchronous = NORMAL")
+ conn.execute(f"PRAGMA busy_timeout = {sqlite_busy_timeout_ms}")
+ conn.execute("PRAGMA temp_store = MEMORY")
+ conn.execute(f"PRAGMA cache_size = {-sqlite_cache_size_kib}")
+ conn.execute(f"PRAGMA wal_autocheckpoint = {sqlite_wal_autocheckpoint_pages}")
+ try:
+ conn.execute(f"PRAGMA mmap_size = {sqlite_mmap_size_bytes}")
+ except sqlite3.DatabaseError:
+ pass
# keep fk checks enabled even on sqlite defaults that disable them.
conn.execute("PRAGMA foreign_keys = ON")
+ try:
+ conn.enable_load_extension(False)
+ except (AttributeError, sqlite3.OperationalError):
+ pass
+ try:
+ conn.execute("PRAGMA trusted_schema = OFF")
+ except sqlite3.DatabaseError:
+ pass
return conn
with connect_db() as conn:
@@ -685,6 +784,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
+ csrf_token TEXT NOT NULL,
expires_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
@@ -708,54 +808,6 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
"""
)
- # migration for existing installs - add and backfill uuid ids for users.
- user_columns = {
- row["name"]
- for row in conn.execute("PRAGMA table_info(users)").fetchall()
- }
- if "user_uuid" not in user_columns:
- conn.execute("ALTER TABLE users ADD COLUMN user_uuid TEXT")
-
- missing_user_ids = conn.execute(
- """
- SELECT id FROM users
- WHERE user_uuid IS NULL OR user_uuid = ''
- """
- ).fetchall()
- for row in missing_user_ids:
- conn.execute(
- "UPDATE users SET user_uuid = ? WHERE id = ?",
- (str(uuid.uuid4()), row["id"]),
- )
-
- conn.execute(
- "CREATE UNIQUE INDEX IF NOT EXISTS idx_users_user_uuid ON users(user_uuid)"
- )
-
- # migration for existing installs - add and backfill uuid ids for sites.
- site_columns = {
- row["name"]
- for row in conn.execute("PRAGMA table_info(sites)").fetchall()
- }
- if "site_uuid" not in site_columns:
- conn.execute("ALTER TABLE sites ADD COLUMN site_uuid TEXT")
-
- missing_site_ids = conn.execute(
- """
- SELECT id FROM sites
- WHERE site_uuid IS NULL OR site_uuid = ''
- """
- ).fetchall()
- for row in missing_site_ids:
- conn.execute(
- "UPDATE sites SET site_uuid = ? WHERE id = ?",
- (str(uuid.uuid4()), row["id"]),
- )
-
- conn.execute(
- "CREATE UNIQUE INDEX IF NOT EXISTS idx_sites_site_uuid ON sites(site_uuid)"
- )
-
# auth provider is injected as a single backend to keep swap-over simple.
auth_backend: AuthBackend = LocalMojicryptAuthBackend(
connect_db=connect_db,
@@ -764,24 +816,43 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
def render_page(title: str, body_template: str, **context: object) -> str:
body = render_template_string(body_template, **context)
+ csrf_token = ""
+ script_nonce = secrets.token_urlsafe(16)
+ g.script_nonce = script_nonce
+ if g.current_user is not None:
+ csrf_token = g.current_user["csrf_token"]
return render_template_string(
SHELL_TEMPLATE,
title=title,
app_name=cfg.app_name,
current_user=g.current_user,
+ csrf_token=csrf_token,
+ script_nonce=script_nonce,
body=body,
)
+ def cookie_secure_enabled() -> bool:
+ if cookie_secure_mode == "true":
+ return True
+ if cookie_secure_mode == "false":
+ return False
+ if request.is_secure:
+ return True
+ xfp = request.headers.get("X-Forwarded-Proto", "")
+ forwarded_proto = xfp.split(",", 1)[0].strip().lower()
+ return forwarded_proto == "https"
+
def create_session(user_id: int) -> str:
token = secrets.token_urlsafe(48)
+ csrf_token = secrets.token_urlsafe(32)
now = int(time.time())
with connect_db() as conn:
conn.execute(
"""
- INSERT INTO sessions (token, user_id, expires_at, created_at)
- VALUES (?, ?, ?, ?)
+ INSERT INTO sessions (token, user_id, csrf_token, expires_at, created_at)
+ VALUES (?, ?, ?, ?, ?)
""",
- (token, user_id, now + SESSION_TTL_SECONDS, now),
+ (token, user_id, csrf_token, now + SESSION_TTL_SECONDS, now),
)
return token
@@ -793,15 +864,78 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
now = int(time.time())
with connect_db() as conn:
conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
- return conn.execute(
+ row = conn.execute(
"""
- SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role
+ SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token
FROM sessions s
JOIN users u ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > ?
""",
(token, now),
).fetchone()
+ if row is None:
+ return None
+ if not row["csrf_token"]:
+ conn.execute(
+ "UPDATE sessions SET csrf_token = ? WHERE token = ?",
+ (secrets.token_urlsafe(32), token),
+ )
+ row = conn.execute(
+ """
+ SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token
+ FROM sessions s
+ JOIN users u ON u.id = s.user_id
+ WHERE s.token = ? AND s.expires_at > ?
+ """,
+ (token, now),
+ ).fetchone()
+ return row
+
+ def is_same_origin_request() -> bool:
+ forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
+ external_scheme = forwarded_proto.split(",", 1)[0].strip().lower()
+ if external_scheme not in {"http", "https"}:
+ external_scheme = request.scheme
+ external_netloc = request.host
+
+ origin = request.headers.get("Origin")
+ if origin:
+ try:
+ parsed_origin = urlparse(origin)
+ except ValueError:
+ return False
+ return (
+ parsed_origin.scheme == external_scheme
+ and parsed_origin.netloc == external_netloc
+ )
+
+ referer = request.headers.get("Referer")
+ if referer:
+ try:
+ parsed_referer = urlparse(referer)
+ except ValueError:
+ return False
+ return (
+ parsed_referer.scheme == external_scheme
+ and parsed_referer.netloc == external_netloc
+ )
+
+ return True
+
+ def is_valid_csrf_for_request() -> bool:
+ if g.current_user is None:
+ return True
+ expected = g.current_user["csrf_token"] or ""
+ if not expected:
+ return False
+
+ supplied = request.form.get("_csrf_token", "")
+ if not supplied:
+ supplied = request.headers.get("X-CSRF-Token", "")
+
+ if not supplied:
+ return False
+ return secrets.compare_digest(expected, supplied)
def require_auth() -> Optional[Response]:
if g.current_user is not None:
@@ -864,6 +998,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
return candidate
def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response:
+ g.is_hosted_site_response = True
site_root = (cfg.sites_dir / site["storage_key"]).resolve()
if not site_root.is_dir():
abort(404)
@@ -889,12 +1024,61 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
@app.before_request
def load_current_user() -> None:
g.current_user = None
+ g.is_hosted_site_response = False
token = request.cookies.get(SESSION_COOKIE)
- if not token:
- return
- user = user_from_token(token)
- if user is not None:
- g.current_user = user
+ if token:
+ user = user_from_token(token)
+ if user is not None:
+ g.current_user = user
+
+ if request.method in MUTATING_METHODS and request.path.startswith("/app/"):
+ if not is_same_origin_request():
+ abort(403)
+ if not is_valid_csrf_for_request():
+ abort(403)
+
+ @app.after_request
+ def add_security_headers(response: Response) -> Response:
+ response.headers.setdefault("X-Content-Type-Options", "nosniff")
+ response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
+ response.headers.setdefault("X-Permitted-Cross-Domain-Policies", "none")
+
+ if cookie_secure_enabled():
+ response.headers.setdefault(
+ "Strict-Transport-Security",
+ "max-age=31536000; includeSubDomains",
+ )
+
+ if not getattr(g, "is_hosted_site_response", False):
+ nonce = getattr(g, "script_nonce", "")
+ script_src = "script-src 'self'"
+ if nonce:
+ script_src += f" 'nonce-{nonce}'"
+ response.headers.setdefault("X-Frame-Options", "DENY")
+ response.headers.setdefault(
+ "Permissions-Policy",
+ "camera=(), microphone=(), geolocation=(), payment=()",
+ )
+ response.headers.setdefault("Cross-Origin-Opener-Policy", "same-origin")
+ response.headers.setdefault("Cross-Origin-Resource-Policy", "same-origin")
+ response.headers.setdefault(
+ "Content-Security-Policy",
+ "default-src 'self'; "
+ + script_src
+ + "; "
+ "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
+ "img-src 'self' data: blob: https:; "
+ "font-src 'self' data:; "
+ "connect-src 'self'; "
+ "object-src 'none'; "
+ "base-uri 'self'; "
+ "frame-ancestors 'none'; "
+ "form-action 'self'",
+ )
+ if request.path.startswith("/app") and response.mimetype == "text/html":
+ response.headers["Cache-Control"] = "no-store"
+
+ return response
@app.errorhandler(413)
def too_large(_: Exception) -> tuple[str, int]:
@@ -1008,6 +1192,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
max_age=SESSION_TTL_SECONDS,
httponly=True,
samesite="Lax",
+ secure=cookie_secure_enabled(),
path="/",
)
flash("admin account created", "success")
@@ -1042,6 +1227,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
max_age=SESSION_TTL_SECONDS,
httponly=True,
samesite="Lax",
+ secure=cookie_secure_enabled(),
path="/",
)
flash("signed in", "success")
@@ -1053,7 +1239,15 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if token:
destroy_session(token)
response = redirect(url_for("login"))
- response.set_cookie(SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", path="/")
+ response.set_cookie(
+ SESSION_COOKIE,
+ "",
+ max_age=0,
+ httponly=True,
+ samesite="Lax",
+ secure=cookie_secure_enabled(),
+ path="/",
+ )
return response
@app.post("/app/upload")
@@ -1131,6 +1325,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if auth_redirect is not None:
return auth_redirect
+ if not uuid_is_valid(site_id):
+ flash("invalid site id", "error")
+ return redirect(url_for("dashboard"))
+
with connect_db() as conn:
site = conn.execute(
"SELECT site_uuid, owner_user_id, slug, storage_key FROM sites WHERE site_uuid = ?",
@@ -1171,6 +1369,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if admin_redirect is not None:
return admin_redirect
+ if not uuid_is_valid(user_id):
+ flash("invalid user id", "error")
+ return redirect(url_for("dashboard"))
+
username = request.form.get("username", "")
ok, message = auth_backend.update_username(user_uuid=user_id, new_username=username)
flash(message, "success" if ok else "error")
@@ -1182,6 +1384,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if admin_redirect is not None:
return admin_redirect
+ if not uuid_is_valid(user_id):
+ flash("invalid user id", "error")
+ return redirect(url_for("dashboard"))
+
password = request.form.get("password", "")
ok, message = auth_backend.update_password(user_uuid=user_id, new_password=password)
flash(message, "success" if ok else "error")
@@ -1193,6 +1399,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if admin_redirect is not None:
return admin_redirect
+ if not uuid_is_valid(user_id):
+ flash("invalid user id", "error")
+ return redirect(url_for("dashboard"))
+
if g.current_user["id"] == user_id:
flash("you cannot delete your own account", "error")
return redirect(url_for("dashboard"))
@@ -1225,6 +1435,10 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
if admin_redirect is not None:
return admin_redirect
+ if not uuid_is_valid(site_id):
+ flash("invalid site id", "error")
+ return redirect(url_for("dashboard"))
+
new_slug = request.form.get("slug", "").strip().lower()
if not slug_is_valid(new_slug):
flash("invalid slug format", "error")
@@ -1280,7 +1494,7 @@ def create_app(base_dir: Optional[Path] = None) -> Flask:
def site_root_relative_fallback(subpath: str) -> Response:
# handle absolute root paths from hosted apps by resolving the slug from referer/cookie
first = subpath.split("/", 1)[0].lower()
- if first in {"app", "api", "s", "_site", "healthz"}:
+ if first in {"app", "api", "s", "_site", "healthz", "favicon.svg", "robots.txt"}:
abort(404)
slug = extract_slug_from_referer(request.headers.get("Referer"))