#!/usr/bin/env python3
import mimetypes
import json
import os
import re
import secrets
import shutil
import sqlite3
import stat
import tarfile
import tempfile
import time
import uuid
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional
from urllib.parse import urlparse
from flask import (
Flask,
Response,
abort,
flash,
g,
make_response,
redirect,
render_template_string,
request,
send_file,
url_for,
)
from auth_backend import AuthBackend, LocalMojicryptAuthBackend
# config
APP_NAME = "shim"
BIND_HOST = "0.0.0.0"
PORT = 8585
SESSION_TTL_SECONDS = 86400
MAX_UPLOAD_BYTES = 1024 * 1024 * 1024
MAX_EXTRACTED_BYTES = 2 * 1024 * 1024 * 1024
MAX_EXTRACTED_FILES = 20000
MAX_FORM_MEMORY_SIZE = 2 * 1024 * 1024
SQLITE_TIMEOUT_SECONDS = 30.0
SQLITE_BUSY_TIMEOUT_MS = 30000
SQLITE_CACHE_SIZE_KIB = 32768
SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024
SQLITE_WAL_AUTOCHECKPOINT_PAGES = 1000
SESSION_COOKIE = "shim_session"
ACTIVE_SITE_COOKIE = "shim_active_site"
MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$")
UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
)
ARCHIVE_SUFFIXES = (
".zip",
".tar",
".tar.bz2",
".tar.gz",
".tar.xz",
".tbz2",
".tgz",
".txz",
)
ROOT_ATTR_RE = re.compile(r"(?i)\b(href|src|action|poster)=([\"'])/([^\"']*)\2")
CSS_URL_RE = re.compile(r"(?i)url\(\s*([\"']?)/([^\)'\"\s]+)\1\s*\)")
# template configs
SHELL_TEMPLATE = """
{{ title }} - {{ app_name }}
{{ app_name }}
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
{{ body | safe }}
"""
SETUP_BODY_TEMPLATE = """
first startup detected.
create the initial admin account to continue.
"""
LOGIN_BODY_TEMPLATE = """
small static site host for archive uploads
"""
DASHBOARD_BODY_TEMPLATE = """
upload static site
upload one archive containing your built static files.
this will generate a random 40 character slug, you can ask your admin to change it to a custom one.
accepted formats: {{ allowed_suffixes | join(', ') }}
{% if is_admin %}all uploaded sites{% else %}your uploaded sites{% endif %}
{% if sites %}
{% if is_admin %}| id | {% endif %}
link |
{% if is_admin %}owner | {% endif %}
archive |
created |
actions |
{% for site in sites %}
{% if is_admin %}| {{ site['id'] }} | {% endif %}
{{ site['slug'] }}
|
{% if is_admin %}{{ site['owner_username'] }} | {% endif %}
{{ site['original_filename'] }} |
{{ site['created_at'] }} |
|
{% endfor %}
{% else %}
no sites uploaded yet.
{% endif %}
{% if is_admin %}
rename slugs
{% if sites %}
| archive |
current slug |
owner |
rename |
{% for site in sites %}
| {{ site['original_filename'] }} |
{{ site['slug'] }} |
{{ site['owner_username'] }} |
|
{% endfor %}
{% else %}
no sites uploaded yet.
{% endif %}
create user
users
{% if users %}
| id |
username |
role |
created |
actions |
{% for user in users %}
| {{ user['id'] }} |
{{ user['username'] }} |
{{ user['role'] }} |
{{ user['created_at'] }} |
{% if user['id'] != current_user['id'] %}
{% else %}
current account
{% endif %}
|
{% endfor %}
{% else %}
no users found.
{% endif %}
{% endif %}
"""
# code and server logic
@dataclass(frozen=True)
class AppConfig:
base_dir: Path
app_name: str
db_path: Path
sites_dir: Path
mojicrypt_bin: Path
bind: str
port: int
ConnectFn = Callable[[], sqlite3.Connection]
def slug_is_valid(slug: str) -> bool:
return bool(slug) and bool(SLUG_RE.fullmatch(slug))
def uuid_is_valid(value: str) -> bool:
value = (value or "").strip().lower()
return bool(UUID_RE.fullmatch(value))
def request_path_is_suspicious(path: str) -> bool:
if "\x00" in path:
return True
return any(part in {".", ".."} for part in path.split("/"))
def detect_archive_suffix(filename: str) -> Optional[str]:
lower = filename.lower()
for suffix in ARCHIVE_SUFFIXES:
if lower.endswith(suffix):
return suffix
return None
def normalize_archive_member_path(raw_name: str) -> Optional[Path]:
name = raw_name.replace("\\", "/").strip()
if not name:
return None
if "\x00" in name:
raise ValueError("archive contains invalid null byte path")
while name.startswith("./"):
name = name[2:]
if not name:
return None
if name.startswith("/"):
raise ValueError("archive contains absolute paths")
parts = [part for part in name.split("/") if part not in ("", ".")]
if not parts:
return None
if any(len(part) > 255 for part in parts):
raise ValueError("archive contains overlong path component")
if len(parts) > 64:
raise ValueError("archive path depth is too large")
if any(part == ".." for part in parts):
raise ValueError("archive contains parent path traversal")
if ":" in parts[0]:
raise ValueError("archive contains invalid drive path")
return Path(*parts)
def ensure_path_under(root: Path, candidate: Path) -> None:
# raises when candidate escapes root via traversal or symlink tricks.
root_real = root.resolve()
candidate_real = candidate.resolve()
candidate_real.relative_to(root_real)
def rewrite_root_path(path_without_leading_slash: str, slug: str) -> str:
value = path_without_leading_slash
lowered = value.lower()
if value.startswith("/"):
return "/" + value
if lowered.startswith("api/"):
return "/" + value
if lowered.startswith("app/"):
return "/" + value
if lowered.startswith("s/"):
return "/" + value
if lowered.startswith("_site/"):
return "/" + value
if not value:
return f"/s/{slug}/"
return f"/s/{slug}/{value}"
def build_slug_runtime_guard(slug: str) -> str:
# browser storage is origin-scoped, so this injects a best-effort slug namespace shim.
slug_json = json.dumps(slug)
return (
'"
)
def rewrite_html_for_slug(html_text: str, slug: str) -> str:
def repl(match: re.Match[str]) -> str:
attr = match.group(1)
quote = match.group(2)
path_value = match.group(3)
rewritten = rewrite_root_path(path_value, slug)
return f"{attr}={quote}{rewritten}{quote}"
# rewrite absolute-root links so hosted apps stay inside /s//.
updated = ROOT_ATTR_RE.sub(repl, html_text)
head_match = re.search(r"(?i)]*>", updated)
if head_match:
inject_parts = []
if "id=\"shim-slug-runtime-guard\"" not in updated:
inject_parts.append("\n " + build_slug_runtime_guard(slug))
if "")
if inject_parts:
insert_at = head_match.end()
updated = updated[:insert_at] + "".join(inject_parts) + updated[insert_at:]
return updated
def rewrite_css_for_slug(css_text: str, slug: str) -> str:
def repl(match: re.Match[str]) -> str:
quote = match.group(1)
path_value = match.group(2)
rewritten = rewrite_root_path(path_value, slug)
return f"url({quote}{rewritten}{quote})"
return CSS_URL_RE.sub(repl, css_text)
def looks_like_spa_route(subpath: str) -> bool:
if not subpath:
return True
name = Path(subpath).name
return "." not in name
def random_slug(length: int = 40) -> str:
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
return "".join(secrets.choice(alphabet) for _ in range(length))
def reserve_random_slug(connect_db: ConnectFn) -> str:
for _ in range(30):
slug = random_slug(40)
with connect_db() as conn:
row = conn.execute("SELECT 1 FROM sites WHERE slug = ?", (slug,)).fetchone()
if not row:
return slug
raise RuntimeError("failed to reserve a unique slug")
def extract_slug_from_referer(referer: Optional[str]) -> Optional[str]:
if not referer:
return None
try:
path = urlparse(referer).path
except ValueError:
return None
for prefix in ("/s/", "/_site/"):
if path.startswith(prefix):
remainder = path[len(prefix) :]
slug = remainder.split("/", 1)[0]
if slug_is_valid(slug):
return slug
return None
def extract_zip_secure(archive_path: Path, destination: Path) -> None:
total_size = 0
total_files = 0
with zipfile.ZipFile(archive_path, "r") as zf:
for info in zf.infolist():
member_path = normalize_archive_member_path(info.filename)
if member_path is None:
continue
# reject symlink entries
mode = (info.external_attr >> 16) & 0o170000
if mode == stat.S_IFLNK:
raise ValueError("zip symlinks are not allowed")
target = destination / member_path
ensure_path_under(destination, target)
if info.is_dir():
target.mkdir(parents=True, exist_ok=True)
continue
total_files += 1
if total_files > MAX_EXTRACTED_FILES:
raise ValueError("archive has too many files")
total_size += int(info.file_size)
if total_size > MAX_EXTRACTED_BYTES:
raise ValueError("extracted archive size exceeds limit")
target.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info, "r") as src, open(target, "wb") as dst:
shutil.copyfileobj(src, dst, length=1024 * 1024)
def extract_tar_secure(archive_path: Path, destination: Path) -> None:
total_size = 0
total_files = 0
with tarfile.open(archive_path, "r:*") as tf:
for member in tf.getmembers():
member_path = normalize_archive_member_path(member.name)
if member_path is None:
continue
if member.issym() or member.islnk() or member.isdev():
raise ValueError("tar links and device files are not allowed")
target = destination / member_path
ensure_path_under(destination, target)
if member.isdir():
target.mkdir(parents=True, exist_ok=True)
continue
if not member.isfile():
raise ValueError("unsupported tar member type")
total_files += 1
if total_files > MAX_EXTRACTED_FILES:
raise ValueError("archive has too many files")
total_size += int(member.size)
if total_size > MAX_EXTRACTED_BYTES:
raise ValueError("extracted archive size exceeds limit")
source = tf.extractfile(member)
if source is None:
raise ValueError("failed to extract tar file member")
target.parent.mkdir(parents=True, exist_ok=True)
with source, open(target, "wb") as dst:
shutil.copyfileobj(source, dst, length=1024 * 1024)
def extract_archive_secure(archive_path: Path, suffix: str, destination: Path) -> None:
if suffix == ".zip":
extract_zip_secure(archive_path, destination)
return
extract_tar_secure(archive_path, destination)
def find_site_root(extracted_dir: Path) -> Path:
root_index = extracted_dir / "index.html"
if root_index.is_file():
return extracted_dir
candidates = sorted(
extracted_dir.rglob("index.html"),
key=lambda path: len(path.relative_to(extracted_dir).parts),
)
if not candidates:
raise ValueError("archive must include an index.html file")
return candidates[0].parent
def env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name, "true" if default else "false").strip().lower()
if raw in {"1", "true", "yes", "on"}:
return True
if raw in {"0", "false", "no", "off"}:
return False
return default
def create_app(base_dir: Optional[Path] = None) -> Flask:
project_dir = Path(base_dir or Path(__file__).parent).resolve()
app_name = APP_NAME
db_path = project_dir / "data" / "shim.db"
sites_dir = project_dir / "data" / "sites"
mojicrypt_bin = (project_dir / "vendor" / "mojicrypt").resolve()
cfg = AppConfig(
base_dir=project_dir,
app_name=app_name,
db_path=db_path,
sites_dir=sites_dir,
mojicrypt_bin=mojicrypt_bin,
bind=BIND_HOST,
port=PORT,
)
cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
cfg.sites_dir.mkdir(parents=True, exist_ok=True)
app = Flask(__name__, static_folder=None)
app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES
app.config["MAX_FORM_MEMORY_SIZE"] = MAX_FORM_MEMORY_SIZE
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", secrets.token_hex(32))
app.config["PORT"] = cfg.port
app.config["BIND"] = cfg.bind
app.config["APP_NAME"] = cfg.app_name
app.config["MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin)
sqlite_timeout_seconds = SQLITE_TIMEOUT_SECONDS
sqlite_busy_timeout_ms = SQLITE_BUSY_TIMEOUT_MS
sqlite_cache_size_kib = SQLITE_CACHE_SIZE_KIB
sqlite_mmap_size_bytes = SQLITE_MMAP_SIZE_BYTES
sqlite_wal_autocheckpoint_pages = SQLITE_WAL_AUTOCHECKPOINT_PAGES
enforce_app_request_guards = env_bool("ENFORCE_APP_REQUEST_GUARDS", False)
def connect_db() -> sqlite3.Connection:
conn = sqlite3.connect(str(cfg.db_path), timeout=sqlite_timeout_seconds)
# row objects keep query call sites readable and less index-fragile.
conn.row_factory = sqlite3.Row
# wal + busy timeout gives sqlite much better mixed read/write concurrency.
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA synchronous = NORMAL")
conn.execute(f"PRAGMA busy_timeout = {sqlite_busy_timeout_ms}")
conn.execute("PRAGMA temp_store = MEMORY")
conn.execute(f"PRAGMA cache_size = {-sqlite_cache_size_kib}")
conn.execute(f"PRAGMA wal_autocheckpoint = {sqlite_wal_autocheckpoint_pages}")
try:
conn.execute(f"PRAGMA mmap_size = {sqlite_mmap_size_bytes}")
except sqlite3.DatabaseError:
pass
# keep fk checks enabled even on sqlite defaults that disable them.
conn.execute("PRAGMA foreign_keys = ON")
try:
conn.enable_load_extension(False)
except (AttributeError, sqlite3.OperationalError):
pass
try:
conn.execute("PRAGMA trusted_schema = OFF")
except sqlite3.DatabaseError:
pass
return conn
with connect_db() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_uuid TEXT UNIQUE NOT NULL,
username TEXT UNIQUE NOT NULL,
role TEXT NOT NULL CHECK (role IN ('admin', 'user')),
encrypted_challenge TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
csrf_token TEXT NOT NULL,
expires_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_uuid TEXT UNIQUE NOT NULL,
owner_user_id INTEGER NOT NULL,
slug TEXT UNIQUE NOT NULL,
storage_key TEXT UNIQUE NOT NULL,
original_filename TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (owner_user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_expiry ON sessions(expires_at);
CREATE INDEX IF NOT EXISTS idx_sites_owner ON sites(owner_user_id);
"""
)
# auth provider is injected as a single backend to keep swap-over simple.
auth_backend: AuthBackend = LocalMojicryptAuthBackend(
connect_db=connect_db,
mojicrypt_bin=cfg.mojicrypt_bin,
)
def render_page(title: str, body_template: str, **context: object) -> str:
body = render_template_string(body_template, **context)
csrf_token = ""
script_nonce = secrets.token_urlsafe(16)
g.script_nonce = script_nonce
if g.current_user is not None:
csrf_token = g.current_user["csrf_token"]
return render_template_string(
SHELL_TEMPLATE,
title=title,
app_name=cfg.app_name,
current_user=g.current_user,
csrf_token=csrf_token,
script_nonce=script_nonce,
body=body,
)
def cookie_secure_enabled() -> bool:
if request.is_secure:
return True
xfp = request.headers.get("X-Forwarded-Proto", "")
forwarded_proto = xfp.split(",", 1)[0].strip().lower()
return forwarded_proto == "https"
def create_session(user_id: int) -> str:
token = secrets.token_urlsafe(48)
csrf_token = secrets.token_urlsafe(32)
now = int(time.time())
with connect_db() as conn:
conn.execute(
"""
INSERT INTO sessions (token, user_id, csrf_token, expires_at, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(token, user_id, csrf_token, now + SESSION_TTL_SECONDS, now),
)
return token
def destroy_session(token: str) -> None:
with connect_db() as conn:
conn.execute("DELETE FROM sessions WHERE token = ?", (token,))
def user_from_token(token: str) -> Optional[sqlite3.Row]:
now = int(time.time())
with connect_db() as conn:
conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
row = conn.execute(
"""
SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token
FROM sessions s
JOIN users u ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > ?
""",
(token, now),
).fetchone()
if row is None:
return None
if not row["csrf_token"]:
conn.execute(
"UPDATE sessions SET csrf_token = ? WHERE token = ?",
(secrets.token_urlsafe(32), token),
)
row = conn.execute(
"""
SELECT u.id AS internal_id, u.user_uuid AS id, u.username, u.role, s.csrf_token
FROM sessions s
JOIN users u ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > ?
""",
(token, now),
).fetchone()
return row
def is_same_origin_request() -> bool:
forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
external_scheme = forwarded_proto.split(",", 1)[0].strip().lower()
if external_scheme not in {"http", "https"}:
external_scheme = request.scheme
external_netloc = request.host
origin = request.headers.get("Origin")
if origin:
try:
parsed_origin = urlparse(origin)
except ValueError:
return False
return (
parsed_origin.scheme == external_scheme
and parsed_origin.netloc == external_netloc
)
referer = request.headers.get("Referer")
if referer:
try:
parsed_referer = urlparse(referer)
except ValueError:
return False
return (
parsed_referer.scheme == external_scheme
and parsed_referer.netloc == external_netloc
)
return True
def is_valid_csrf_for_request() -> bool:
if g.current_user is None:
return True
expected = g.current_user["csrf_token"] or ""
if not expected:
return False
supplied = request.form.get("_csrf_token", "")
if not supplied:
supplied = request.headers.get("X-CSRF-Token", "")
if not supplied:
return False
return secrets.compare_digest(expected, supplied)
def require_auth() -> Optional[Response]:
if g.current_user is not None:
return None
flash("login required", "error")
return redirect(url_for("login"))
def require_admin() -> Optional[Response]:
auth_redirect = require_auth()
if auth_redirect is not None:
return auth_redirect
if g.current_user["role"] != "admin":
flash("admin access required", "error")
return redirect(url_for("dashboard"))
return None
def get_site_by_slug(slug: str) -> Optional[sqlite3.Row]:
with connect_db() as conn:
return conn.execute(
"SELECT id, owner_user_id, slug, storage_key FROM sites WHERE slug = ?",
(slug,),
).fetchone()
def send_site_file(file_path: Path, slug: str) -> Response:
ext = file_path.suffix.lower()
mime, _ = mimetypes.guess_type(str(file_path))
if ext == ".html":
html = file_path.read_text(encoding="utf-8", errors="replace")
body = rewrite_html_for_slug(html, slug)
response = make_response(body)
response.mimetype = "text/html"
response.headers["Cache-Control"] = "no-cache"
elif ext == ".css":
css = file_path.read_text(encoding="utf-8", errors="replace")
body = rewrite_css_for_slug(css, slug)
response = make_response(body)
response.mimetype = "text/css"
response.headers["Cache-Control"] = "public, max-age=300"
else:
response = make_response(
send_file(file_path, mimetype=mime or "application/octet-stream", conditional=True)
)
if ext in {".js", ".mjs"}:
response.headers["Cache-Control"] = "public, max-age=300"
elif ext in {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico"}:
response.headers["Cache-Control"] = "public, max-age=86400, immutable"
else:
response.headers["Cache-Control"] = "public, max-age=3600"
response.headers["X-Content-Type-Options"] = "nosniff"
response.set_cookie(
ACTIVE_SITE_COOKIE,
slug,
max_age=1800,
httponly=True,
samesite="Lax",
secure=cookie_secure_enabled(),
path="/",
)
return response
def resolve_site_path(site_root: Path, subpath: str) -> Optional[Path]:
cleaned = subpath.lstrip("/")
candidate = (site_root / cleaned).resolve()
try:
candidate.relative_to(site_root.resolve())
except ValueError:
return None
return candidate
def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response:
g.is_hosted_site_response = True
site_root = (cfg.sites_dir / site["storage_key"]).resolve()
if not site_root.is_dir():
abort(404)
target = resolve_site_path(site_root, subpath)
if target is None:
abort(404)
if target.is_dir():
index_path = target / "index.html"
if index_path.is_file():
return send_site_file(index_path, site["slug"])
elif target.is_file():
return send_site_file(target, site["slug"])
if allow_spa and looks_like_spa_route(subpath):
index_path = site_root / "index.html"
if index_path.is_file():
return send_site_file(index_path, site["slug"])
abort(404)
@app.before_request
def load_current_user() -> None:
if request_path_is_suspicious(request.path or "/"):
abort(400)
g.current_user = None
g.is_hosted_site_response = False
token = request.cookies.get(SESSION_COOKIE)
if token:
user = user_from_token(token)
if user is not None:
g.current_user = user
if (
enforce_app_request_guards
and g.current_user is not None
and request.method in MUTATING_METHODS
and request.path.startswith("/app/")
):
if not is_same_origin_request():
abort(403)
if not is_valid_csrf_for_request():
abort(403)
@app.after_request
def add_security_headers(response: Response) -> Response:
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
response.headers.setdefault("X-Permitted-Cross-Domain-Policies", "none")
if cookie_secure_enabled():
response.headers.setdefault(
"Strict-Transport-Security",
"max-age=31536000; includeSubDomains",
)
if not getattr(g, "is_hosted_site_response", False):
nonce = getattr(g, "script_nonce", "")
script_src = "script-src 'self'"
if nonce:
script_src += f" 'nonce-{nonce}'"
response.headers.setdefault("X-Frame-Options", "DENY")
response.headers.setdefault(
"Permissions-Policy",
"camera=(), microphone=(), geolocation=(), payment=()",
)
response.headers.setdefault("Cross-Origin-Opener-Policy", "same-origin")
response.headers.setdefault("Cross-Origin-Resource-Policy", "same-origin")
response.headers.setdefault(
"Content-Security-Policy",
"default-src 'self'; "
+ script_src
+ "; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"img-src 'self' data: blob: https:; "
"font-src 'self' data:; "
"connect-src 'self'; "
"object-src 'none'; "
"base-uri 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'",
)
if request.path.startswith("/app") and response.mimetype == "text/html":
response.headers["Cache-Control"] = "no-store"
return response
@app.errorhandler(413)
def too_large(_: Exception) -> tuple[str, int]:
return "upload exceeds configured max size", 413
@app.get("/")
def root() -> Response:
return redirect(url_for("dashboard"))
@app.get("/healthz")
def healthz() -> tuple[str, int]:
return "ok", 200
@app.get("/favicon.svg")
def favicon() -> Response:
icon_path = cfg.base_dir / "favicon.svg"
if not icon_path.is_file():
abort(404)
response = make_response(send_file(icon_path, mimetype="image/svg+xml", conditional=True))
response.headers["Cache-Control"] = "public, max-age=3600"
return response
@app.get("/app/noir-overrides.css")
def noir_overrides_css() -> Response:
css_path = cfg.base_dir / "noir-overrides.css"
if not css_path.is_file():
abort(404)
response = make_response(send_file(css_path, mimetype="text/css", conditional=True))
response.headers["Cache-Control"] = "no-cache"
return response
@app.get("/app")
def dashboard() -> Response:
if auth_backend.bootstrap_required():
return redirect(url_for("setup"))
auth_redirect = require_auth()
if auth_redirect is not None:
return auth_redirect
is_admin = g.current_user["role"] == "admin"
with connect_db() as conn:
if is_admin:
sites = conn.execute(
"""
SELECT s.site_uuid AS id, s.slug, s.original_filename, s.created_at, u.username AS owner_username
FROM sites s
JOIN users u ON u.id = s.owner_user_id
ORDER BY s.id DESC
"""
).fetchall()
users = conn.execute(
"SELECT user_uuid AS id, username, role, created_at FROM users ORDER BY id ASC"
).fetchall()
else:
sites = conn.execute(
"""
SELECT site_uuid AS id, slug, original_filename, created_at
FROM sites
WHERE owner_user_id = ?
ORDER BY id DESC
""",
(g.current_user["internal_id"],),
).fetchall()
users = []
return render_page(
title="dashboard",
body_template=DASHBOARD_BODY_TEMPLATE,
allowed_suffixes=ARCHIVE_SUFFIXES,
is_admin=is_admin,
sites=sites,
users=users,
current_user=g.current_user,
)
@app.get("/app/setup")
def setup() -> Response:
if not auth_backend.bootstrap_required():
return redirect(url_for("dashboard"))
return render_page(title="setup", body_template=SETUP_BODY_TEMPLATE)
@app.post("/app/setup")
def setup_submit() -> Response:
if not auth_backend.bootstrap_required():
flash("setup already completed", "error")
return redirect(url_for("login"))
username = request.form.get("username", "")
password = request.form.get("password", "")
confirm = request.form.get("confirm", "")
if password != confirm:
flash("passwords do not match", "error")
return redirect(url_for("setup"))
ok, message = auth_backend.create_user(username=username, password=password, role="admin")
if not ok:
flash(message, "error")
return redirect(url_for("setup"))
user = auth_backend.authenticate(username=username, password=password)
if user is None:
flash("admin created but login failed", "error")
return redirect(url_for("login"))
existing_token = request.cookies.get(SESSION_COOKIE)
if existing_token:
destroy_session(existing_token)
token = create_session(user["id"])
response = redirect(url_for("dashboard"))
response.set_cookie(
SESSION_COOKIE,
token,
max_age=SESSION_TTL_SECONDS,
httponly=True,
samesite="Lax",
secure=cookie_secure_enabled(),
path="/",
)
flash("admin account created", "success")
return response
@app.get("/app/login")
def login() -> Response:
if auth_backend.bootstrap_required():
return redirect(url_for("setup"))
if g.current_user is not None:
return redirect(url_for("dashboard"))
return render_page(title="login", body_template=LOGIN_BODY_TEMPLATE)
@app.post("/app/login")
def login_submit() -> Response:
if auth_backend.bootstrap_required():
flash("run setup first", "error")
return redirect(url_for("setup"))
username = request.form.get("username", "")
password = request.form.get("password", "")
user = auth_backend.authenticate(username=username, password=password)
if user is None:
flash("invalid credentials", "error")
return redirect(url_for("login"))
existing_token = request.cookies.get(SESSION_COOKIE)
if existing_token:
destroy_session(existing_token)
token = create_session(user["id"])
response = redirect(url_for("dashboard"))
response.set_cookie(
SESSION_COOKIE,
token,
max_age=SESSION_TTL_SECONDS,
httponly=True,
samesite="Lax",
secure=cookie_secure_enabled(),
path="/",
)
flash("signed in", "success")
return response
@app.post("/app/logout")
def logout() -> Response:
token = request.cookies.get(SESSION_COOKIE)
if token:
destroy_session(token)
response = redirect(url_for("login"))
response.set_cookie(
SESSION_COOKIE,
"",
max_age=0,
httponly=True,
samesite="Lax",
secure=cookie_secure_enabled(),
path="/",
)
response.set_cookie(
ACTIVE_SITE_COOKIE,
"",
max_age=0,
httponly=True,
samesite="Lax",
secure=cookie_secure_enabled(),
path="/",
)
return response
@app.post("/app/upload")
def upload_site() -> Response:
auth_redirect = require_auth()
if auth_redirect is not None:
return auth_redirect
archive = request.files.get("archive")
if archive is None or not archive.filename:
flash("select an archive file", "error")
return redirect(url_for("dashboard"))
suffix = detect_archive_suffix(archive.filename)
if suffix is None:
flash("unsupported archive format", "error")
return redirect(url_for("dashboard"))
original_filename = Path(archive.filename).name.strip()
if not original_filename:
flash("invalid archive filename", "error")
return redirect(url_for("dashboard"))
if len(original_filename) > 255:
flash("archive filename is too long", "error")
return redirect(url_for("dashboard"))
if any(ord(ch) < 32 for ch in original_filename):
flash("archive filename contains invalid characters", "error")
return redirect(url_for("dashboard"))
temp_dir = Path(tempfile.mkdtemp(prefix="shim-upload-"))
archive_path = temp_dir / f"upload{suffix}"
extract_dir = temp_dir / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
final_dir: Optional[Path] = None
try:
archive.save(archive_path)
extract_archive_secure(archive_path, suffix, extract_dir)
site_root = find_site_root(extract_dir)
slug = reserve_random_slug(connect_db)
site_uuid = str(uuid.uuid4())
storage_key = secrets.token_hex(24)
final_dir = cfg.sites_dir / storage_key
shutil.copytree(site_root, final_dir)
with connect_db() as conn:
conn.execute(
"""
INSERT INTO sites (site_uuid, owner_user_id, slug, storage_key, original_filename, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""",
(
site_uuid,
int(g.current_user["internal_id"]),
slug,
storage_key,
original_filename,
),
)
except ValueError as exc:
flash(str(exc), "error")
if final_dir is not None:
shutil.rmtree(final_dir, ignore_errors=True)
return redirect(url_for("dashboard"))
except sqlite3.IntegrityError:
flash("failed to save site record", "error")
if final_dir is not None:
shutil.rmtree(final_dir, ignore_errors=True)
return redirect(url_for("dashboard"))
except (tarfile.TarError, zipfile.BadZipFile):
flash("archive is invalid or corrupted", "error")
if final_dir is not None:
shutil.rmtree(final_dir, ignore_errors=True)
return redirect(url_for("dashboard"))
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
flash("site uploaded", "success")
return redirect(url_for("dashboard"))
@app.post("/app/sites//delete")
def delete_site(site_id: str) -> Response:
auth_redirect = require_auth()
if auth_redirect is not None:
return auth_redirect
if not uuid_is_valid(site_id):
flash("invalid site id", "error")
return redirect(url_for("dashboard"))
with connect_db() as conn:
site = conn.execute(
"SELECT site_uuid, owner_user_id, slug, storage_key FROM sites WHERE site_uuid = ?",
(site_id,),
).fetchone()
if site is None:
flash("site not found", "error")
return redirect(url_for("dashboard"))
is_admin = g.current_user["role"] == "admin"
if not is_admin and int(site["owner_user_id"]) != int(g.current_user["internal_id"]):
flash("not allowed", "error")
return redirect(url_for("dashboard"))
conn.execute("DELETE FROM sites WHERE site_uuid = ?", (site_id,))
shutil.rmtree(cfg.sites_dir / site["storage_key"], ignore_errors=True)
flash("site deleted", "success")
return redirect(url_for("dashboard"))
@app.post("/app/admin/users/create")
def admin_create_user() -> Response:
admin_redirect = require_admin()
if admin_redirect is not None:
return admin_redirect
username = request.form.get("username", "")
password = request.form.get("password", "")
role = request.form.get("role", "user").strip().lower()
ok, message = auth_backend.create_user(username=username, password=password, role=role)
flash(message, "success" if ok else "error")
return redirect(url_for("dashboard"))
@app.post("/app/admin/users//username")
def admin_update_user_username(user_id: str) -> Response:
admin_redirect = require_admin()
if admin_redirect is not None:
return admin_redirect
if not uuid_is_valid(user_id):
flash("invalid user id", "error")
return redirect(url_for("dashboard"))
username = request.form.get("username", "")
ok, message = auth_backend.update_username(user_uuid=user_id, new_username=username)
flash(message, "success" if ok else "error")
return redirect(url_for("dashboard"))
@app.post("/app/admin/users//password")
def admin_update_user_password(user_id: str) -> Response:
admin_redirect = require_admin()
if admin_redirect is not None:
return admin_redirect
if not uuid_is_valid(user_id):
flash("invalid user id", "error")
return redirect(url_for("dashboard"))
password = request.form.get("password", "")
ok, message = auth_backend.update_password(user_uuid=user_id, new_password=password)
flash(message, "success" if ok else "error")
return redirect(url_for("dashboard"))
@app.post("/app/admin/users//delete")
def admin_delete_user(user_id: str) -> Response:
admin_redirect = require_admin()
if admin_redirect is not None:
return admin_redirect
if not uuid_is_valid(user_id):
flash("invalid user id", "error")
return redirect(url_for("dashboard"))
if g.current_user["id"] == user_id:
flash("you cannot delete your own account", "error")
return redirect(url_for("dashboard"))
with connect_db() as conn:
user = conn.execute(
"SELECT id AS internal_id, user_uuid, username FROM users WHERE user_uuid = ?",
(user_id,),
).fetchone()
if user is None:
flash("user not found", "error")
return redirect(url_for("dashboard"))
owned_site_keys = conn.execute(
"SELECT storage_key FROM sites WHERE owner_user_id = ?",
(user["internal_id"],),
).fetchall()
conn.execute("DELETE FROM users WHERE id = ?", (user["internal_id"],))
for row in owned_site_keys:
shutil.rmtree(cfg.sites_dir / row["storage_key"], ignore_errors=True)
flash("user deleted", "success")
return redirect(url_for("dashboard"))
@app.post("/app/admin/sites//slug")
def admin_update_slug(site_id: str) -> Response:
admin_redirect = require_admin()
if admin_redirect is not None:
return admin_redirect
if not uuid_is_valid(site_id):
flash("invalid site id", "error")
return redirect(url_for("dashboard"))
new_slug = request.form.get("slug", "").strip().lower()
if not slug_is_valid(new_slug):
flash("invalid slug format", "error")
return redirect(url_for("dashboard"))
try:
with connect_db() as conn:
cursor = conn.execute(
"""
UPDATE sites
SET slug = ?, updated_at = CURRENT_TIMESTAMP
WHERE site_uuid = ?
""",
(new_slug, site_id),
)
if cursor.rowcount == 0:
flash("site not found", "error")
return redirect(url_for("dashboard"))
except sqlite3.IntegrityError:
flash("slug is already in use", "error")
return redirect(url_for("dashboard"))
flash("slug updated", "success")
return redirect(url_for("dashboard"))
@app.get("/s//")
def serve_site_root(slug: str) -> Response:
if not slug_is_valid(slug):
abort(404)
site = get_site_by_slug(slug)
if site is None:
abort(404)
return serve_site_resource(site, subpath="", allow_spa=True)
@app.get("/s//")
def serve_site_subpath(slug: str, subpath: str) -> Response:
if not slug_is_valid(slug):
abort(404)
site = get_site_by_slug(slug)
if site is None:
abort(404)
return serve_site_resource(site, subpath=subpath, allow_spa=True)
@app.get("/_site//")
def serve_site_alias_root(slug: str) -> Response:
return serve_site_root(slug)
@app.get("/_site//")
def serve_site_alias_subpath(slug: str, subpath: str) -> Response:
return serve_site_subpath(slug, subpath)
@app.get("/")
def site_root_relative_fallback(subpath: str) -> Response:
# handle absolute root paths from hosted apps by resolving the slug from referer/cookie
first = subpath.split("/", 1)[0].lower()
if first in {"app", "api", "s", "_site", "healthz", "favicon.svg", "robots.txt"}:
abort(404)
slug = extract_slug_from_referer(request.headers.get("Referer"))
if slug is None:
cookie_slug = request.cookies.get(ACTIVE_SITE_COOKIE)
if cookie_slug and slug_is_valid(cookie_slug):
slug = cookie_slug
if slug is None:
abort(404)
site = get_site_by_slug(slug)
if site is None:
abort(404)
return serve_site_resource(site, subpath=subpath, allow_spa=True)
return app