#!/usr/bin/env python3 import mimetypes import json import os import re import secrets import shutil import sqlite3 import stat import tarfile import tempfile import time import uuid import zipfile from dataclasses import dataclass from pathlib import Path from typing import Callable, Optional from urllib.parse import urlparse from flask import ( Flask, Response, abort, flash, g, make_response, redirect, render_template_string, request, send_file, url_for, ) from auth_backend import AuthBackend, LocalMojicryptAuthBackend # config APP_NAME = "shim" BIND_HOST = "0.0.0.0" PORT = 8585 SESSION_TTL_SECONDS = 86400 MAX_UPLOAD_BYTES = 1024 * 1024 * 1024 MAX_EXTRACTED_BYTES = 2 * 1024 * 1024 * 1024 MAX_EXTRACTED_FILES = 20000 MAX_FORM_MEMORY_SIZE = 2 * 1024 * 1024 SQLITE_TIMEOUT_SECONDS = 30.0 SQLITE_BUSY_TIMEOUT_MS = 30000 SQLITE_CACHE_SIZE_KIB = 32768 SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024 SQLITE_WAL_AUTOCHECKPOINT_PAGES = 1000 SESSION_COOKIE = "shim_session" ACTIVE_SITE_COOKIE = "shim_active_site" MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"} SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$") UUID_RE = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" ) ARCHIVE_SUFFIXES = ( ".zip", ".tar", ".tar.bz2", ".tar.gz", ".tar.xz", ".tbz2", ".tgz", ".txz", ) ROOT_ATTR_RE = re.compile(r"(?i)\b(href|src|action|poster)=([\"'])/([^\"']*)\2") CSS_URL_RE = re.compile(r"(?i)url\(\s*([\"']?)/([^\)'\"\s]+)\1\s*\)") # template configs SHELL_TEMPLATE = """ {{ title }} - {{ app_name }}

{{ app_name }}

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %} {% endwith %} {{ body | safe }}
""" SETUP_BODY_TEMPLATE = """

first startup detected.

create the initial admin account to continue.

""" LOGIN_BODY_TEMPLATE = """

small static site host for archive uploads

""" DASHBOARD_BODY_TEMPLATE = """

upload static site

upload one archive containing your built static files.

this will generate a random 40 character slug, you can ask your admin to change it to a custom one.

accepted formats: {{ allowed_suffixes | join(', ') }}

{% if is_admin %}all uploaded sites{% else %}your uploaded sites{% endif %}

{% if sites %} {% if is_admin %}{% endif %} {% if is_admin %}{% endif %} {% for site in sites %} {% if is_admin %}{% endif %} {% if is_admin %}{% endif %} {% endfor %}
idlinkownerarchive created actions
{{ site['id'] }} {{ site['slug'] }} {{ site['owner_username'] }}{{ site['original_filename'] }} {{ site['created_at'] }}
{% else %}

no sites uploaded yet.

{% endif %}
{% if is_admin %}

rename slugs

{% if sites %} {% for site in sites %} {% endfor %}
archive current slug owner rename
{{ site['original_filename'] }} {{ site['slug'] }} {{ site['owner_username'] }}
{% else %}

no sites uploaded yet.

{% endif %}

create user

users

{% if users %} {% for user in users %} {% endfor %}
id username role created actions
{{ user['id'] }} {{ user['username'] }} {{ user['role'] }} {{ user['created_at'] }}


{% if user['id'] != current_user['id'] %}
{% else %} current account {% endif %}
{% else %}

no users found.

{% endif %}
{% endif %} """ # code and server logic @dataclass(frozen=True) class AppConfig: base_dir: Path app_name: str db_path: Path sites_dir: Path mojicrypt_bin: Path bind: str port: int ConnectFn = Callable[[], sqlite3.Connection] def slug_is_valid(slug: str) -> bool: return bool(slug) and bool(SLUG_RE.fullmatch(slug)) def uuid_is_valid(value: str) -> bool: value = (value or "").strip().lower() return bool(UUID_RE.fullmatch(value)) def request_path_is_suspicious(path: str) -> bool: if "\x00" in path: return True return any(part in {".", ".."} for part in path.split("/")) def detect_archive_suffix(filename: str) -> Optional[str]: lower = filename.lower() for suffix in ARCHIVE_SUFFIXES: if lower.endswith(suffix): return suffix return None def normalize_archive_member_path(raw_name: str) -> Optional[Path]: name = raw_name.replace("\\", "/").strip() if not name: return None if "\x00" in name: raise ValueError("archive contains invalid null byte path") while name.startswith("./"): name = name[2:] if not name: return None if name.startswith("/"): raise ValueError("archive contains absolute paths") parts = [part for part in name.split("/") if part not in ("", ".")] if not parts: return None if any(len(part) > 255 for part in parts): raise ValueError("archive contains overlong path component") if len(parts) > 64: raise ValueError("archive path depth is too large") if any(part == ".." for part in parts): raise ValueError("archive contains parent path traversal") if ":" in parts[0]: raise ValueError("archive contains invalid drive path") return Path(*parts) def ensure_path_under(root: Path, candidate: Path) -> None: # raises when candidate escapes root via traversal or symlink tricks. root_real = root.resolve() candidate_real = candidate.resolve() candidate_real.relative_to(root_real) def rewrite_root_path(path_without_leading_slash: str, slug: str) -> str: value = path_without_leading_slash lowered = value.lower() if value.startswith("/"): return "/" + value if lowered.startswith("api/"): return "/" + value if lowered.startswith("app/"): return "/" + value if lowered.startswith("s/"): return "/" + value if lowered.startswith("_site/"): return "/" + value if not value: return f"/s/{slug}/" return f"/s/{slug}/{value}" def build_slug_runtime_guard(slug: str) -> str: # browser storage is origin-scoped, so this injects a best-effort slug namespace shim. slug_json = json.dumps(slug) return ( '" ) def rewrite_html_for_slug(html_text: str, slug: str) -> str: def repl(match: re.Match[str]) -> str: attr = match.group(1) quote = match.group(2) path_value = match.group(3) rewritten = rewrite_root_path(path_value, slug) return f"{attr}={quote}{rewritten}{quote}" # rewrite absolute-root links so hosted apps stay inside /s//. updated = ROOT_ATTR_RE.sub(repl, html_text) head_match = re.search(r"(?i)]*>", updated) if head_match: inject_parts = [] if "id=\"shim-slug-runtime-guard\"" not in updated: inject_parts.append("\n " + build_slug_runtime_guard(slug)) if "") if inject_parts: insert_at = head_match.end() updated = updated[:insert_at] + "".join(inject_parts) + updated[insert_at:] return updated def rewrite_css_for_slug(css_text: str, slug: str) -> str: def repl(match: re.Match[str]) -> str: quote = match.group(1) path_value = match.group(2) rewritten = rewrite_root_path(path_value, slug) return f"url({quote}{rewritten}{quote})" return CSS_URL_RE.sub(repl, css_text) def looks_like_spa_route(subpath: str) -> bool: if not subpath: return True name = Path(subpath).name return "." not in name def random_slug(length: int = 40) -> str: alphabet = "abcdefghijklmnopqrstuvwxyz0123456789" return "".join(secrets.choice(alphabet) for _ in range(length)) def reserve_random_slug(connect_db: ConnectFn) -> str: for _ in range(30): slug = random_slug(40) with connect_db() as conn: row = conn.execute("SELECT 1 FROM sites WHERE slug = ?", (slug,)).fetchone() if not row: return slug raise RuntimeError("failed to reserve a unique slug") def extract_slug_from_referer(referer: Optional[str]) -> Optional[str]: if not referer: return None try: path = urlparse(referer).path except ValueError: return None for prefix in ("/s/", "/_site/"): if path.startswith(prefix): remainder = path[len(prefix) :] slug = remainder.split("/", 1)[0] if slug_is_valid(slug): return slug return None def extract_zip_secure(archive_path: Path, destination: Path) -> None: total_size = 0 total_files = 0 with zipfile.ZipFile(archive_path, "r") as zf: for info in zf.infolist(): member_path = normalize_archive_member_path(info.filename) if member_path is None: continue # reject symlink entries mode = (info.external_attr >> 16) & 0o170000 if mode == stat.S_IFLNK: raise ValueError("zip symlinks are not allowed") target = destination / member_path ensure_path_under(destination, target) if info.is_dir(): target.mkdir(parents=True, exist_ok=True) continue total_files += 1 if total_files > MAX_EXTRACTED_FILES: raise ValueError("archive has too many files") total_size += int(info.file_size) if total_size > MAX_EXTRACTED_BYTES: raise ValueError("extracted archive size exceeds limit") target.parent.mkdir(parents=True, exist_ok=True) with zf.open(info, "r") as src, open(target, "wb") as dst: shutil.copyfileobj(src, dst, length=1024 * 1024) def extract_tar_secure(archive_path: Path, destination: Path) -> None: total_size = 0 total_files = 0 with tarfile.open(archive_path, "r:*") as tf: for member in tf.getmembers(): member_path = normalize_archive_member_path(member.name) if member_path is None: continue if member.issym() or member.islnk() or member.isdev(): raise ValueError("tar links and device files are not allowed") target = destination / member_path ensure_path_under(destination, target) if member.isdir(): target.mkdir(parents=True, exist_ok=True) continue if not member.isfile(): raise ValueError("unsupported tar member type") total_files += 1 if total_files > MAX_EXTRACTED_FILES: raise ValueError("archive has too many files") total_size += int(member.size) if total_size > MAX_EXTRACTED_BYTES: raise ValueError("extracted archive size exceeds limit") source = tf.extractfile(member) if source is None: raise ValueError("failed to extract tar file member") target.parent.mkdir(parents=True, exist_ok=True) with source, open(target, "wb") as dst: shutil.copyfileobj(source, dst, length=1024 * 1024) def extract_archive_secure(archive_path: Path, suffix: str, destination: Path) -> None: if suffix == ".zip": extract_zip_secure(archive_path, destination) return extract_tar_secure(archive_path, destination) def find_site_root(extracted_dir: Path) -> Path: root_index = extracted_dir / "index.html" if root_index.is_file(): return extracted_dir candidates = sorted( extracted_dir.rglob("index.html"), key=lambda path: len(path.relative_to(extracted_dir).parts), ) if not candidates: raise ValueError("archive must include an index.html file") return candidates[0].parent def env_bool(name: str, default: bool) -> bool: raw = os.getenv(name, "true" if default else "false").strip().lower() if raw in {"1", "true", "yes", "on"}: return True if raw in {"0", "false", "no", "off"}: return False return default def create_app(base_dir: Optional[Path] = None) -> Flask: project_dir = Path(base_dir or Path(__file__).parent).resolve() app_name = APP_NAME db_path = project_dir / "data" / "shim.db" sites_dir = project_dir / "data" / "sites" mojicrypt_bin = (project_dir / "vendor" / "mojicrypt").resolve() cfg = AppConfig( base_dir=project_dir, app_name=app_name, db_path=db_path, sites_dir=sites_dir, mojicrypt_bin=mojicrypt_bin, bind=BIND_HOST, port=PORT, ) cfg.db_path.parent.mkdir(parents=True, exist_ok=True) cfg.sites_dir.mkdir(parents=True, exist_ok=True) app = Flask(__name__, static_folder=None) app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES app.config["MAX_FORM_MEMORY_SIZE"] = MAX_FORM_MEMORY_SIZE app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", secrets.token_hex(32)) app.config["PORT"] = cfg.port app.config["BIND"] = cfg.bind app.config["APP_NAME"] = cfg.app_name app.config["MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin) sqlite_timeout_seconds = SQLITE_TIMEOUT_SECONDS sqlite_busy_timeout_ms = SQLITE_BUSY_TIMEOUT_MS sqlite_cache_size_kib = SQLITE_CACHE_SIZE_KIB sqlite_mmap_size_bytes = SQLITE_MMAP_SIZE_BYTES sqlite_wal_autocheckpoint_pages = SQLITE_WAL_AUTOCHECKPOINT_PAGES enforce_app_request_guards = env_bool("ENFORCE_APP_REQUEST_GUARDS", False) def connect_db() -> sqlite3.Connection: conn = sqlite3.connect(str(cfg.db_path), timeout=sqlite_timeout_seconds) # row objects keep query call sites readable and less index-fragile. conn.row_factory = sqlite3.Row # wal + busy timeout gives sqlite much better mixed read/write concurrency. conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA synchronous = NORMAL") conn.execute(f"PRAGMA busy_timeout = {sqlite_busy_timeout_ms}") conn.execute("PRAGMA temp_store = MEMORY") conn.execute(f"PRAGMA cache_size = {-sqlite_cache_size_kib}") conn.execute(f"PRAGMA wal_autocheckpoint = {sqlite_wal_autocheckpoint_pages}") try: conn.execute(f"PRAGMA mmap_size = {sqlite_mmap_size_bytes}") except sqlite3.DatabaseError: pass # keep fk checks enabled even on sqlite defaults that disable them. conn.execute("PRAGMA foreign_keys = ON") try: conn.enable_load_extension(False) except (AttributeError, sqlite3.OperationalError): pass try: conn.execute("PRAGMA trusted_schema = OFF") except sqlite3.DatabaseError: pass return conn with connect_db() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_uuid TEXT UNIQUE NOT NULL, username TEXT UNIQUE NOT NULL, role TEXT NOT NULL CHECK (role IN ('admin', 'user')), encrypted_challenge TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, csrf_token TEXT NOT NULL, expires_at INTEGER NOT NULL, created_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS sites ( id INTEGER PRIMARY KEY AUTOINCREMENT, site_uuid TEXT UNIQUE NOT NULL, owner_user_id INTEGER NOT NULL, slug TEXT UNIQUE NOT NULL, storage_key TEXT UNIQUE NOT NULL, original_filename TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (owner_user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id); CREATE INDEX IF NOT EXISTS idx_sessions_expiry ON sessions(expires_at); CREATE INDEX IF NOT EXISTS idx_sites_owner ON sites(owner_user_id); """ ) # auth provider is injected as a single backend to keep swap-over simple. auth_backend: AuthBackend = LocalMojicryptAuthBackend( connect_db=connect_db, mojicrypt_bin=cfg.mojicrypt_bin, ) def render_page(title: str, body_template: str, **context: object) -> str: body = render_template_string(body_template, **context) csrf_token = "" script_nonce = secrets.token_urlsafe(16) g.script_nonce = script_nonce if g.current_user is not None: csrf_token = g.current_user["csrf_token"] return render_template_string( SHELL_TEMPLATE, title=title, app_name=cfg.app_name, current_user=g.current_user, csrf_token=csrf_token, script_nonce=script_nonce, body=body, ) def cookie_secure_enabled() -> bool: if request.is_secure: return True xfp = request.headers.get("X-Forwarded-Proto", "") forwarded_proto = xfp.split(",", 1)[0].strip().lower() return forwarded_proto == "https" def create_session(user_id: int) -> str: token = secrets.token_urlsafe(48) csrf_token = secrets.token_urlsafe(32) now = int(time.time()) with connect_db() as conn: conn.execute( """ INSERT INTO sessions (token, user_id, csrf_token, expires_at, created_at) VALUES (?, ?, ?, ?, ?) """, (token, user_id, csrf_token, now + SESSION_TTL_SECONDS, now), ) return token def destroy_session(token: str) -> None: with connect_db() as conn: conn.execute("DELETE FROM sessions WHERE token = ?", (token,)) def user_from_token(token: str) -> Optional[sqlite3.Row]: now = int(time.time()) with connect_db() as conn: conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,)) row = conn.execute( """ SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token FROM sessions s JOIN users u ON u.id = s.user_id WHERE s.token = ? AND s.expires_at > ? """, (token, now), ).fetchone() if row is None: return None if not row["csrf_token"]: conn.execute( "UPDATE sessions SET csrf_token = ? WHERE token = ?", (secrets.token_urlsafe(32), token), ) row = conn.execute( """ SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token FROM sessions s JOIN users u ON u.id = s.user_id WHERE s.token = ? AND s.expires_at > ? """, (token, now), ).fetchone() return row def is_same_origin_request() -> bool: forwarded_proto = request.headers.get("X-Forwarded-Proto", "") external_scheme = forwarded_proto.split(",", 1)[0].strip().lower() if external_scheme not in {"http", "https"}: external_scheme = request.scheme external_netloc = request.host origin = request.headers.get("Origin") if origin: try: parsed_origin = urlparse(origin) except ValueError: return False return ( parsed_origin.scheme == external_scheme and parsed_origin.netloc == external_netloc ) referer = request.headers.get("Referer") if referer: try: parsed_referer = urlparse(referer) except ValueError: return False return ( parsed_referer.scheme == external_scheme and parsed_referer.netloc == external_netloc ) return True def is_valid_csrf_for_request() -> bool: if g.current_user is None: return True expected = g.current_user["csrf_token"] or "" if not expected: return False supplied = request.form.get("_csrf_token", "") if not supplied: supplied = request.headers.get("X-CSRF-Token", "") if not supplied: return False return secrets.compare_digest(expected, supplied) def require_auth() -> Optional[Response]: if g.current_user is not None: return None flash("login required", "error") return redirect(url_for("login")) def require_admin() -> Optional[Response]: auth_redirect = require_auth() if auth_redirect is not None: return auth_redirect if g.current_user["role"] != "admin": flash("admin access required", "error") return redirect(url_for("dashboard")) return None def get_site_by_slug(slug: str) -> Optional[sqlite3.Row]: with connect_db() as conn: return conn.execute( "SELECT id, owner_user_id, slug, storage_key FROM sites WHERE slug = ?", (slug,), ).fetchone() def send_site_file(file_path: Path, slug: str) -> Response: ext = file_path.suffix.lower() mime, _ = mimetypes.guess_type(str(file_path)) if ext == ".html": html = file_path.read_text(encoding="utf-8", errors="replace") body = rewrite_html_for_slug(html, slug) response = make_response(body) response.mimetype = "text/html" response.headers["Cache-Control"] = "no-cache" elif ext == ".css": css = file_path.read_text(encoding="utf-8", errors="replace") body = rewrite_css_for_slug(css, slug) response = make_response(body) response.mimetype = "text/css" response.headers["Cache-Control"] = "public, max-age=300" else: response = make_response( send_file(file_path, mimetype=mime or "application/octet-stream", conditional=True) ) if ext in {".js", ".mjs"}: response.headers["Cache-Control"] = "public, max-age=300" elif ext in {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico"}: response.headers["Cache-Control"] = "public, max-age=86400, immutable" else: response.headers["Cache-Control"] = "public, max-age=3600" response.headers["X-Content-Type-Options"] = "nosniff" response.set_cookie( ACTIVE_SITE_COOKIE, slug, max_age=1800, httponly=True, samesite="Lax", secure=cookie_secure_enabled(), path="/", ) return response def resolve_site_path(site_root: Path, subpath: str) -> Optional[Path]: cleaned = subpath.lstrip("/") candidate = (site_root / cleaned).resolve() try: candidate.relative_to(site_root.resolve()) except ValueError: return None return candidate def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response: g.is_hosted_site_response = True site_root = (cfg.sites_dir / site["storage_key"]).resolve() if not site_root.is_dir(): abort(404) target = resolve_site_path(site_root, subpath) if target is None: abort(404) if target.is_dir(): index_path = target / "index.html" if index_path.is_file(): return send_site_file(index_path, site["slug"]) elif target.is_file(): return send_site_file(target, site["slug"]) if allow_spa and looks_like_spa_route(subpath): index_path = site_root / "index.html" if index_path.is_file(): return send_site_file(index_path, site["slug"]) abort(404) @app.before_request def load_current_user() -> None: if request_path_is_suspicious(request.path or "/"): abort(400) g.current_user = None g.is_hosted_site_response = False token = request.cookies.get(SESSION_COOKIE) if token: user = user_from_token(token) if user is not None: g.current_user = user if ( enforce_app_request_guards and g.current_user is not None and request.method in MUTATING_METHODS and request.path.startswith("/app/") ): if not is_same_origin_request(): abort(403) if not is_valid_csrf_for_request(): abort(403) @app.after_request def add_security_headers(response: Response) -> Response: response.headers.setdefault("X-Content-Type-Options", "nosniff") response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") response.headers.setdefault("X-Permitted-Cross-Domain-Policies", "none") if cookie_secure_enabled(): response.headers.setdefault( "Strict-Transport-Security", "max-age=31536000; includeSubDomains", ) if not getattr(g, "is_hosted_site_response", False): nonce = getattr(g, "script_nonce", "") script_src = "script-src 'self'" if nonce: script_src += f" 'nonce-{nonce}'" response.headers.setdefault("X-Frame-Options", "DENY") response.headers.setdefault( "Permissions-Policy", "camera=(), microphone=(), geolocation=(), payment=()", ) response.headers.setdefault("Cross-Origin-Opener-Policy", "same-origin") response.headers.setdefault("Cross-Origin-Resource-Policy", "same-origin") response.headers.setdefault( "Content-Security-Policy", "default-src 'self'; " + script_src + "; " "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " "img-src 'self' data: blob: https:; " "font-src 'self' data:; " "connect-src 'self'; " "object-src 'none'; " "base-uri 'self'; " "frame-ancestors 'none'; " "form-action 'self'", ) if request.path.startswith("/app") and response.mimetype == "text/html": response.headers["Cache-Control"] = "no-store" return response @app.errorhandler(413) def too_large(_: Exception) -> tuple[str, int]: return "upload exceeds configured max size", 413 @app.get("/") def root() -> Response: return redirect(url_for("dashboard")) @app.get("/healthz") def healthz() -> tuple[str, int]: return "ok", 200 @app.get("/favicon.svg") def favicon() -> Response: icon_path = cfg.base_dir / "favicon.svg" if not icon_path.is_file(): abort(404) response = make_response(send_file(icon_path, mimetype="image/svg+xml", conditional=True)) response.headers["Cache-Control"] = "public, max-age=3600" return response @app.get("/app/noir-overrides.css") def noir_overrides_css() -> Response: css_path = cfg.base_dir / "noir-overrides.css" if not css_path.is_file(): abort(404) response = make_response(send_file(css_path, mimetype="text/css", conditional=True)) response.headers["Cache-Control"] = "no-cache" return response @app.get("/app") def dashboard() -> Response: if auth_backend.bootstrap_required(): return redirect(url_for("setup")) auth_redirect = require_auth() if auth_redirect is not None: return auth_redirect is_admin = g.current_user["role"] == "admin" with connect_db() as conn: if is_admin: sites = conn.execute( """ SELECT s.site_uuid AS id, s.slug, s.original_filename, s.created_at, u.username AS owner_username FROM sites s JOIN users u ON u.id = s.owner_user_id ORDER BY s.id DESC """ ).fetchall() users = conn.execute( "SELECT user_uuid AS id, username, role, created_at FROM users ORDER BY id ASC" ).fetchall() else: sites = conn.execute( """ SELECT site_uuid AS id, slug, original_filename, created_at FROM sites WHERE owner_user_id = ? ORDER BY id DESC """, (g.current_user["internal_id"],), ).fetchall() users = [] return render_page( title="dashboard", body_template=DASHBOARD_BODY_TEMPLATE, allowed_suffixes=ARCHIVE_SUFFIXES, is_admin=is_admin, sites=sites, users=users, current_user=g.current_user, ) @app.get("/app/setup") def setup() -> Response: if not auth_backend.bootstrap_required(): return redirect(url_for("dashboard")) return render_page(title="setup", body_template=SETUP_BODY_TEMPLATE) @app.post("/app/setup") def setup_submit() -> Response: if not auth_backend.bootstrap_required(): flash("setup already completed", "error") return redirect(url_for("login")) username = request.form.get("username", "") password = request.form.get("password", "") confirm = request.form.get("confirm", "") if password != confirm: flash("passwords do not match", "error") return redirect(url_for("setup")) ok, message = auth_backend.create_user(username=username, password=password, role="admin") if not ok: flash(message, "error") return redirect(url_for("setup")) user = auth_backend.authenticate(username=username, password=password) if user is None: flash("admin created but login failed", "error") return redirect(url_for("login")) existing_token = request.cookies.get(SESSION_COOKIE) if existing_token: destroy_session(existing_token) token = create_session(user["id"]) response = redirect(url_for("dashboard")) response.set_cookie( SESSION_COOKIE, token, max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", secure=cookie_secure_enabled(), path="/", ) flash("admin account created", "success") return response @app.get("/app/login") def login() -> Response: if auth_backend.bootstrap_required(): return redirect(url_for("setup")) if g.current_user is not None: return redirect(url_for("dashboard")) return render_page(title="login", body_template=LOGIN_BODY_TEMPLATE) @app.post("/app/login") def login_submit() -> Response: if auth_backend.bootstrap_required(): flash("run setup first", "error") return redirect(url_for("setup")) username = request.form.get("username", "") password = request.form.get("password", "") user = auth_backend.authenticate(username=username, password=password) if user is None: flash("invalid credentials", "error") return redirect(url_for("login")) existing_token = request.cookies.get(SESSION_COOKIE) if existing_token: destroy_session(existing_token) token = create_session(user["id"]) response = redirect(url_for("dashboard")) response.set_cookie( SESSION_COOKIE, token, max_age=SESSION_TTL_SECONDS, httponly=True, samesite="Lax", secure=cookie_secure_enabled(), path="/", ) flash("signed in", "success") return response @app.post("/app/logout") def logout() -> Response: token = request.cookies.get(SESSION_COOKIE) if token: destroy_session(token) response = redirect(url_for("login")) response.set_cookie( SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", secure=cookie_secure_enabled(), path="/", ) response.set_cookie( ACTIVE_SITE_COOKIE, "", max_age=0, httponly=True, samesite="Lax", secure=cookie_secure_enabled(), path="/", ) return response @app.post("/app/upload") def upload_site() -> Response: auth_redirect = require_auth() if auth_redirect is not None: return auth_redirect archive = request.files.get("archive") if archive is None or not archive.filename: flash("select an archive file", "error") return redirect(url_for("dashboard")) suffix = detect_archive_suffix(archive.filename) if suffix is None: flash("unsupported archive format", "error") return redirect(url_for("dashboard")) original_filename = Path(archive.filename).name.strip() if not original_filename: flash("invalid archive filename", "error") return redirect(url_for("dashboard")) if len(original_filename) > 255: flash("archive filename is too long", "error") return redirect(url_for("dashboard")) if any(ord(ch) < 32 for ch in original_filename): flash("archive filename contains invalid characters", "error") return redirect(url_for("dashboard")) temp_dir = Path(tempfile.mkdtemp(prefix="shim-upload-")) archive_path = temp_dir / f"upload{suffix}" extract_dir = temp_dir / "extract" extract_dir.mkdir(parents=True, exist_ok=True) final_dir: Optional[Path] = None try: archive.save(archive_path) extract_archive_secure(archive_path, suffix, extract_dir) site_root = find_site_root(extract_dir) slug = reserve_random_slug(connect_db) site_uuid = str(uuid.uuid4()) storage_key = secrets.token_hex(24) final_dir = cfg.sites_dir / storage_key shutil.copytree(site_root, final_dir) with connect_db() as conn: conn.execute( """ INSERT INTO sites (site_uuid, owner_user_id, slug, storage_key, original_filename, created_at, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) """, ( site_uuid, int(g.current_user["internal_id"]), slug, storage_key, original_filename, ), ) except ValueError as exc: flash(str(exc), "error") if final_dir is not None: shutil.rmtree(final_dir, ignore_errors=True) return redirect(url_for("dashboard")) except sqlite3.IntegrityError: flash("failed to save site record", "error") if final_dir is not None: shutil.rmtree(final_dir, ignore_errors=True) return redirect(url_for("dashboard")) except (tarfile.TarError, zipfile.BadZipFile): flash("archive is invalid or corrupted", "error") if final_dir is not None: shutil.rmtree(final_dir, ignore_errors=True) return redirect(url_for("dashboard")) finally: shutil.rmtree(temp_dir, ignore_errors=True) flash("site uploaded", "success") return redirect(url_for("dashboard")) @app.post("/app/sites//delete") def delete_site(site_id: str) -> Response: auth_redirect = require_auth() if auth_redirect is not None: return auth_redirect if not uuid_is_valid(site_id): flash("invalid site id", "error") return redirect(url_for("dashboard")) with connect_db() as conn: site = conn.execute( "SELECT site_uuid, owner_user_id, slug, storage_key FROM sites WHERE site_uuid = ?", (site_id,), ).fetchone() if site is None: flash("site not found", "error") return redirect(url_for("dashboard")) is_admin = g.current_user["role"] == "admin" if not is_admin and int(site["owner_user_id"]) != int(g.current_user["internal_id"]): flash("not allowed", "error") return redirect(url_for("dashboard")) conn.execute("DELETE FROM sites WHERE site_uuid = ?", (site_id,)) shutil.rmtree(cfg.sites_dir / site["storage_key"], ignore_errors=True) flash("site deleted", "success") return redirect(url_for("dashboard")) @app.post("/app/admin/users/create") def admin_create_user() -> Response: admin_redirect = require_admin() if admin_redirect is not None: return admin_redirect username = request.form.get("username", "") password = request.form.get("password", "") role = request.form.get("role", "user").strip().lower() ok, message = auth_backend.create_user(username=username, password=password, role=role) flash(message, "success" if ok else "error") return redirect(url_for("dashboard")) @app.post("/app/admin/users//username") def admin_update_user_username(user_id: str) -> Response: admin_redirect = require_admin() if admin_redirect is not None: return admin_redirect if not uuid_is_valid(user_id): flash("invalid user id", "error") return redirect(url_for("dashboard")) username = request.form.get("username", "") ok, message = auth_backend.update_username(user_uuid=user_id, new_username=username) flash(message, "success" if ok else "error") return redirect(url_for("dashboard")) @app.post("/app/admin/users//password") def admin_update_user_password(user_id: str) -> Response: admin_redirect = require_admin() if admin_redirect is not None: return admin_redirect if not uuid_is_valid(user_id): flash("invalid user id", "error") return redirect(url_for("dashboard")) password = request.form.get("password", "") ok, message = auth_backend.update_password(user_uuid=user_id, new_password=password) flash(message, "success" if ok else "error") return redirect(url_for("dashboard")) @app.post("/app/admin/users//delete") def admin_delete_user(user_id: str) -> Response: admin_redirect = require_admin() if admin_redirect is not None: return admin_redirect if not uuid_is_valid(user_id): flash("invalid user id", "error") return redirect(url_for("dashboard")) if g.current_user["id"] == user_id: flash("you cannot delete your own account", "error") return redirect(url_for("dashboard")) with connect_db() as conn: user = conn.execute( "SELECT id AS internal_id, user_uuid, username FROM users WHERE user_uuid = ?", (user_id,), ).fetchone() if user is None: flash("user not found", "error") return redirect(url_for("dashboard")) owned_site_keys = conn.execute( "SELECT storage_key FROM sites WHERE owner_user_id = ?", (user["internal_id"],), ).fetchall() conn.execute("DELETE FROM users WHERE id = ?", (user["internal_id"],)) for row in owned_site_keys: shutil.rmtree(cfg.sites_dir / row["storage_key"], ignore_errors=True) flash("user deleted", "success") return redirect(url_for("dashboard")) @app.post("/app/admin/sites//slug") def admin_update_slug(site_id: str) -> Response: admin_redirect = require_admin() if admin_redirect is not None: return admin_redirect if not uuid_is_valid(site_id): flash("invalid site id", "error") return redirect(url_for("dashboard")) new_slug = request.form.get("slug", "").strip().lower() if not slug_is_valid(new_slug): flash("invalid slug format", "error") return redirect(url_for("dashboard")) try: with connect_db() as conn: cursor = conn.execute( """ UPDATE sites SET slug = ?, updated_at = CURRENT_TIMESTAMP WHERE site_uuid = ? """, (new_slug, site_id), ) if cursor.rowcount == 0: flash("site not found", "error") return redirect(url_for("dashboard")) except sqlite3.IntegrityError: flash("slug is already in use", "error") return redirect(url_for("dashboard")) flash("slug updated", "success") return redirect(url_for("dashboard")) @app.get("/s//") def serve_site_root(slug: str) -> Response: if not slug_is_valid(slug): abort(404) site = get_site_by_slug(slug) if site is None: abort(404) return serve_site_resource(site, subpath="", allow_spa=True) @app.get("/s//") def serve_site_subpath(slug: str, subpath: str) -> Response: if not slug_is_valid(slug): abort(404) site = get_site_by_slug(slug) if site is None: abort(404) return serve_site_resource(site, subpath=subpath, allow_spa=True) @app.get("/_site//") def serve_site_alias_root(slug: str) -> Response: return serve_site_root(slug) @app.get("/_site//") def serve_site_alias_subpath(slug: str, subpath: str) -> Response: return serve_site_subpath(slug, subpath) @app.get("/") def site_root_relative_fallback(subpath: str) -> Response: # handle absolute root paths from hosted apps by resolving the slug from referer/cookie first = subpath.split("/", 1)[0].lower() if first in {"app", "api", "s", "_site", "healthz", "favicon.svg", "robots.txt"}: abort(404) slug = extract_slug_from_referer(request.headers.get("Referer")) if slug is None: cookie_slug = request.cookies.get(ACTIVE_SITE_COOKIE) if cookie_slug and slug_is_valid(cookie_slug): slug = cookie_slug if slug is None: abort(404) site = get_site_by_slug(slug) if site is None: abort(404) return serve_site_resource(site, subpath=subpath, allow_spa=True) return app