aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkj_sh6042026-04-03 00:46:18 -0400
committerkj_sh6042026-04-03 00:46:18 -0400
commitb5b1a685bf9c2dd172545091c049717360a23648 (patch)
tree34db8bbad826d49ca85172b19326317741923d7d
initial: shim static site server
-rw-r--r--.gitignore9
-rw-r--r--LICENSE21
-rw-r--r--auth_backend.py168
-rw-r--r--data/sites/.gitkeep0
-rw-r--r--requirements.txt8
-rw-r--r--server.py10
-rw-r--r--shim_app.py1085
-rwxr-xr-xvendor/mojicrypt351
8 files changed, 1652 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..9524d87
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+__pycache__/
+*.pyc
+*.pyo
+*.pyd
+.venv/
+
+data/*.db
+data/sites/*
+!data/sites/.gitkeep \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..fdb19c0
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2026 kj_sh604/kj-sh604/kjsh604
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/auth_backend.py b/auth_backend.py
new file mode 100644
index 0000000..e1ac01c
--- /dev/null
+++ b/auth_backend.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python3
+
+import os
+import re
+import sqlite3
+import subprocess
+import sys
+from pathlib import Path
+from typing import Callable, Optional, Protocol
+
+
+AUTH_CHALLENGE = "SHIM_AUTH_VALID"
+USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$")
+
+ConnectFn = Callable[[], sqlite3.Connection]
+
+
+class AuthBackend(Protocol):
+ def bootstrap_required(self) -> bool:
+ ...
+
+ def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
+ ...
+
+ def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
+ ...
+
+
+def normalize_username(username: str) -> str:
+ return username.strip().lower()
+
+
+def looks_like_python_script(path: Path) -> bool:
+ try:
+ with open(path, "r", encoding="utf-8", errors="ignore") as f:
+ first_line = f.readline(200).lower()
+ except OSError:
+ return False
+ return first_line.startswith("#!") and "python" in first_line
+
+
+class LocalMojicryptAuthBackend:
+ """default auth backend using sqlite and local mojicrypt challenge encryption"""
+
+ def __init__(self, connect_db: ConnectFn, mojicrypt_bin: Path):
+ self.connect_db = connect_db
+ self.mojicrypt_bin = mojicrypt_bin
+ self.last_error = ""
+
+ def bootstrap_required(self) -> bool:
+ with self.connect_db() as conn:
+ row = conn.execute("SELECT COUNT(*) AS count FROM users").fetchone()
+ return int(row["count"]) == 0
+
+ def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
+ username = normalize_username(username)
+ if not USERNAME_RE.fullmatch(username):
+ return False, "username must be lowercase and use [a-z0-9_.-], 2-64 chars"
+ if len(password) < 2:
+ return False, "password must be at least 2 characters"
+ if role not in {"admin", "user"}:
+ return False, "invalid role"
+
+ encrypted = self.encrypt_password(password)
+ if not encrypted:
+ return False, "mojicrypt encryption failed"
+
+ try:
+ with self.connect_db() as conn:
+ conn.execute(
+ """
+ INSERT INTO users (username, role, encrypted_challenge, created_at)
+ VALUES (?, ?, ?, CURRENT_TIMESTAMP)
+ """,
+ (username, role, encrypted),
+ )
+ return True, "user created"
+ except sqlite3.IntegrityError:
+ return False, "username already exists"
+
+ def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
+ username = normalize_username(username)
+ with self.connect_db() as conn:
+ user = conn.execute(
+ """
+ SELECT id, username, role, encrypted_challenge
+ FROM users
+ WHERE username = ?
+ """,
+ (username,),
+ ).fetchone()
+ if not user:
+ return None
+ if not self.verify_password(user["encrypted_challenge"], password):
+ return None
+ return user
+
+ def encrypt_password(self, password: str) -> Optional[str]:
+ return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password)
+
+ def verify_password(self, encrypted_blob: str, password: str) -> bool:
+ decrypted = self._run_mojicrypt("decrypt", encrypted_blob, password)
+ return decrypted == AUTH_CHALLENGE
+
+ def _run_mojicrypt(self, command: str, payload: str, passphrase: str) -> Optional[str]:
+ self.last_error = ""
+
+ if not self.mojicrypt_bin.exists():
+ self.last_error = f"binary not found at {self.mojicrypt_bin}"
+ return None
+
+ is_python_script = looks_like_python_script(self.mojicrypt_bin)
+ direct_exec_ok = os.access(self.mojicrypt_bin, os.X_OK)
+ python_exec_ok = bool(sys.executable) and is_python_script
+
+ if not direct_exec_ok and not python_exec_ok:
+ self.last_error = "binary is not executable"
+ return None
+
+ commands_to_try = []
+ if direct_exec_ok:
+ commands_to_try.append([
+ str(self.mojicrypt_bin),
+ "-p",
+ passphrase,
+ command,
+ payload,
+ ])
+
+ # some systems do not expose a plain `python` binary for shebang execution.
+ # retrying with the active interpreter avoids false failures for vendored scripts.
+ if python_exec_ok:
+ fallback_cmd = [
+ sys.executable,
+ str(self.mojicrypt_bin),
+ "-p",
+ passphrase,
+ command,
+ payload,
+ ]
+ if fallback_cmd not in commands_to_try:
+ commands_to_try.append(fallback_cmd)
+
+ for cmd in commands_to_try:
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=90,
+ )
+ except FileNotFoundError as exc:
+ self.last_error = str(exc)
+ continue
+ except subprocess.TimeoutExpired:
+ self.last_error = "command timed out"
+ continue
+
+ if result.returncode == 0:
+ return result.stdout.strip()
+
+ stderr = (result.stderr or "").strip()
+ if stderr:
+ self.last_error = stderr
+ else:
+ self.last_error = f"command exited with status {result.returncode}"
+
+ return None \ No newline at end of file
diff --git a/data/sites/.gitkeep b/data/sites/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/data/sites/.gitkeep
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..cc83c7b
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,8 @@
+blinker==1.9.0
+click==8.3.1
+Flask==3.1.3
+itsdangerous==2.2.0
+Jinja2==3.1.6
+MarkupSafe==3.0.3
+pycryptodome==3.23.0
+Werkzeug==3.1.8
diff --git a/server.py b/server.py
new file mode 100644
index 0000000..e735a36
--- /dev/null
+++ b/server.py
@@ -0,0 +1,10 @@
+#!/usr/bin/env python3
+
+from shim_app import create_app
+
+
+app = create_app()
+
+
+if __name__ == "__main__":
+ app.run(host=app.config["SHIM_BIND"], port=app.config["SHIM_PORT"]) \ No newline at end of file
diff --git a/shim_app.py b/shim_app.py
new file mode 100644
index 0000000..7e6891a
--- /dev/null
+++ b/shim_app.py
@@ -0,0 +1,1085 @@
+#!/usr/bin/env python3
+
+import mimetypes
+import os
+import re
+import secrets
+import shutil
+import sqlite3
+import stat
+import tarfile
+import tempfile
+import time
+import zipfile
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Callable, Optional
+from urllib.parse import urlparse
+
+from flask import (
+ Flask,
+ Response,
+ abort,
+ flash,
+ g,
+ make_response,
+ redirect,
+ render_template_string,
+ request,
+ send_file,
+ url_for,
+)
+
+from auth_backend import AuthBackend, LocalMojicryptAuthBackend
+
+
+SESSION_TTL_SECONDS = int(os.getenv("SHIM_SESSION_TTL_SECONDS", "86400"))
+MAX_UPLOAD_BYTES = int(os.getenv("SHIM_MAX_UPLOAD_BYTES", str(1024 * 1024 * 1024)))
+MAX_EXTRACTED_BYTES = int(
+ os.getenv("SHIM_MAX_EXTRACTED_BYTES", str(2 * 1024 * 1024 * 1024))
+)
+MAX_EXTRACTED_FILES = int(os.getenv("SHIM_MAX_EXTRACTED_FILES", "20000"))
+SESSION_COOKIE = "shim_session"
+ACTIVE_SITE_COOKIE = "shim_active_site"
+SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$")
+ARCHIVE_SUFFIXES = (
+ ".tar.gz",
+ ".tgz",
+ ".tar.bz2",
+ ".tbz2",
+ ".tar.xz",
+ ".txz",
+ ".tar",
+ ".zip",
+)
+ROOT_ATTR_RE = re.compile(r"(?i)\b(href|src|action|poster)=([\"'])/([^\"']*)\2")
+CSS_URL_RE = re.compile(r"(?i)url\(\s*([\"']?)/([^\)'\"\s]+)\1\s*\)")
+
+
+SHELL_TEMPLATE = """<!doctype html>
+<html lang="en">
+<head>
+ <meta charset="utf-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <meta name="color-scheme" content="light dark">
+ <meta http-equiv="cache-control" content="no-cache">
+ <meta http-equiv="expires" content="0">
+ <meta http-equiv="pragma" content="no-cache">
+ <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css">
+ <title>{{ title }} - {{ app_name }}</title>
+ <style>
+ main { max-width: 1120px; margin: 0 auto; }
+ .topbar { display: flex; flex-wrap: wrap; align-items: center; gap: 0.75rem; justify-content: space-between; }
+ .topbar nav { display: flex; gap: 0.75rem; align-items: center; }
+ .inline { display: inline; }
+ .quiet { opacity: 0.8; }
+ .stack { display: grid; gap: 1rem; }
+ .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); gap: 1rem; }
+ .flash { padding: 0.75rem 1rem; border-left: 0.35rem solid; }
+ .flash.error { border-color: #b91c1c; }
+ .flash.success { border-color: #0f766e; }
+ table { width: 100%; }
+ td form { margin: 0; }
+ input[type="text"], input[type="password"], input[type="file"], select { width: 100%; }
+ @media (max-width: 760px) {
+ .topbar { align-items: flex-start; }
+ .topbar nav { flex-wrap: wrap; }
+ }
+ </style>
+</head>
+<body>
+<main>
+ <header class="topbar">
+ <h1>{{ app_name }}</h1>
+ <nav>
+ <a href="{{ url_for('dashboard') }}">dashboard</a>
+ {% if current_user %}
+ <span class="quiet">signed in as {{ current_user['username'] }} ({{ current_user['role'] }})</span>
+ <form method="post" action="{{ url_for('logout') }}" class="inline">
+ <button type="submit">logout</button>
+ </form>
+ {% else %}
+ <a href="{{ url_for('login') }}">login</a>
+ {% endif %}
+ </nav>
+ </header>
+ {% with messages = get_flashed_messages(with_categories=true) %}
+ {% if messages %}
+ <section class="stack">
+ {% for category, message in messages %}
+ <article class="flash {{ category }}">{{ message }}</article>
+ {% endfor %}
+ </section>
+ {% endif %}
+ {% endwith %}
+ {{ body | safe }}
+</main>
+</body>
+</html>
+"""
+
+
+SETUP_BODY_TEMPLATE = """
+<section class="stack">
+ <p>first startup detected. create the initial admin account to continue.</p>
+ <form method="post" action="{{ url_for('setup_submit') }}" class="stack" autocomplete="off">
+ <label>
+ username
+ <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ </label>
+ <label>
+ password
+ <input name="password" type="password" required minlength="2" autocomplete="new-password">
+ </label>
+ <label>
+ confirm password
+ <input name="confirm" type="password" required minlength="2" autocomplete="new-password">
+ </label>
+ <button type="submit">create admin account</button>
+ </form>
+</section>
+"""
+
+
+LOGIN_BODY_TEMPLATE = """
+<section class="stack">
+ <p>sign in to manage uploads and slugs.</p>
+ <form method="post" action="{{ url_for('login_submit') }}" class="stack" autocomplete="off">
+ <label>
+ username
+ <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ </label>
+ <label>
+ password
+ <input name="password" type="password" required minlength="2" autocomplete="current-password">
+ </label>
+ <button type="submit">sign in</button>
+ </form>
+</section>
+"""
+
+
+DASHBOARD_BODY_TEMPLATE = """
+<section class="stack">
+ <h2>upload static site</h2>
+ <p>upload one archive containing your built static files. users always get a random 40 character slug.</p>
+ <p class="quiet">accepted formats: {{ allowed_suffixes | join(', ') }}</p>
+ <form method="post" action="{{ url_for('upload_site') }}" enctype="multipart/form-data" class="stack">
+ <label>
+ archive file
+ <input type="file" name="archive" required>
+ </label>
+ <button type="submit">upload and publish</button>
+ </form>
+</section>
+
+<section class="stack">
+ <h2>{% if is_admin %}all uploaded sites{% else %}your uploaded sites{% endif %}</h2>
+ {% if sites %}
+ <table>
+ <thead>
+ <tr>
+ <th>id</th>
+ <th>slug</th>
+ {% if is_admin %}<th>owner</th>{% endif %}
+ <th>archive</th>
+ <th>created</th>
+ <th>actions</th>
+ </tr>
+ </thead>
+ <tbody>
+ {% for site in sites %}
+ <tr>
+ <td>{{ site['id'] }}</td>
+ <td>
+ {% if is_admin %}
+ <form method="post" action="{{ url_for('admin_update_slug', site_id=site['id']) }}" class="stack">
+ <input type="text" name="slug" value="{{ site['slug'] }}" required minlength="1" maxlength="128" pattern="[a-z0-9][a-z0-9-]*">
+ <button type="submit">rename slug</button>
+ </form>
+ {% else %}
+ {{ site['slug'] }}
+ {% endif %}
+ </td>
+ {% if is_admin %}<td>{{ site['owner_username'] }}</td>{% endif %}
+ <td>{{ site['original_filename'] }}</td>
+ <td>{{ site['created_at'] }}</td>
+ <td>
+ <a href="/s/{{ site['slug'] }}/" target="_blank" rel="noopener">open</a>
+ <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" onsubmit="return confirm('delete this site?');">
+ <button type="submit">delete</button>
+ </form>
+ </td>
+ </tr>
+ {% endfor %}
+ </tbody>
+ </table>
+ {% else %}
+ <p>no sites uploaded yet.</p>
+ {% endif %}
+</section>
+
+{% if is_admin %}
+<section class="grid">
+ <article class="stack">
+ <h2>create user</h2>
+ <form method="post" action="{{ url_for('admin_create_user') }}" class="stack" autocomplete="off">
+ <label>
+ username
+ <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');">
+ </label>
+ <label>
+ password
+ <input type="password" name="password" required minlength="2" autocomplete="new-password">
+ </label>
+ <label>
+ role
+ <select name="role" required>
+ <option value="user" selected>user</option>
+ <option value="admin">admin</option>
+ </select>
+ </label>
+ <button type="submit">create user</button>
+ </form>
+ </article>
+ <article class="stack">
+ <h2>users</h2>
+ {% if users %}
+ <table>
+ <thead>
+ <tr>
+ <th>id</th>
+ <th>username</th>
+ <th>role</th>
+ <th>created</th>
+ <th>actions</th>
+ </tr>
+ </thead>
+ <tbody>
+ {% for user in users %}
+ <tr>
+ <td>{{ user['id'] }}</td>
+ <td>{{ user['username'] }}</td>
+ <td>{{ user['role'] }}</td>
+ <td>{{ user['created_at'] }}</td>
+ <td>
+ {% if user['id'] != current_user['id'] %}
+ <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" onsubmit="return confirm('delete this user and all owned sites?');">
+ <button type="submit">delete</button>
+ </form>
+ {% else %}
+ current account
+ {% endif %}
+ </td>
+ </tr>
+ {% endfor %}
+ </tbody>
+ </table>
+ {% else %}
+ <p>no users found.</p>
+ {% endif %}
+ </article>
+</section>
+{% endif %}
+"""
+
+
+@dataclass(frozen=True)
+class AppConfig:
+ base_dir: Path
+ app_name: str
+ db_path: Path
+ sites_dir: Path
+ mojicrypt_bin: Path
+ bind: str
+ port: int
+
+
+ConnectFn = Callable[[], sqlite3.Connection]
+
+
+def slug_is_valid(slug: str) -> bool:
+ return bool(slug) and bool(SLUG_RE.fullmatch(slug))
+
+
+def detect_archive_suffix(filename: str) -> Optional[str]:
+ lower = filename.lower()
+ for suffix in ARCHIVE_SUFFIXES:
+ if lower.endswith(suffix):
+ return suffix
+ return None
+
+
+def normalize_archive_member_path(raw_name: str) -> Optional[Path]:
+ name = raw_name.replace("\\", "/").strip()
+ if not name:
+ return None
+ while name.startswith("./"):
+ name = name[2:]
+ if not name:
+ return None
+ if name.startswith("/"):
+ raise ValueError("archive contains absolute paths")
+ parts = [part for part in name.split("/") if part not in ("", ".")]
+ if not parts:
+ return None
+ if any(part == ".." for part in parts):
+ raise ValueError("archive contains parent path traversal")
+ if ":" in parts[0]:
+ raise ValueError("archive contains invalid drive path")
+ return Path(*parts)
+
+
+def ensure_path_under(root: Path, candidate: Path) -> None:
+ root_real = root.resolve()
+ candidate_real = candidate.resolve()
+ candidate_real.relative_to(root_real)
+
+
+def rewrite_root_path(path_without_leading_slash: str, slug: str) -> str:
+ value = path_without_leading_slash
+ lowered = value.lower()
+ if value.startswith("/"):
+ return "/" + value
+ if lowered.startswith("api/"):
+ return "/" + value
+ if lowered.startswith("app/"):
+ return "/" + value
+ if lowered.startswith("s/"):
+ return "/" + value
+ if lowered.startswith("_site/"):
+ return "/" + value
+ if not value:
+ return f"/s/{slug}/"
+ return f"/s/{slug}/{value}"
+
+
+def rewrite_html_for_slug(html_text: str, slug: str) -> str:
+ def repl(match: re.Match[str]) -> str:
+ attr = match.group(1)
+ quote = match.group(2)
+ path_value = match.group(3)
+ rewritten = rewrite_root_path(path_value, slug)
+ return f"{attr}={quote}{rewritten}{quote}"
+
+ updated = ROOT_ATTR_RE.sub(repl, html_text)
+ if "<base " not in updated.lower():
+ head_match = re.search(r"(?i)<head[^>]*>", updated)
+ if head_match:
+ base_tag = f"\n <base href=\"/s/{slug}/\">"
+ insert_at = head_match.end()
+ updated = updated[:insert_at] + base_tag + updated[insert_at:]
+ return updated
+
+
+def rewrite_css_for_slug(css_text: str, slug: str) -> str:
+ def repl(match: re.Match[str]) -> str:
+ quote = match.group(1)
+ path_value = match.group(2)
+ rewritten = rewrite_root_path(path_value, slug)
+ return f"url({quote}{rewritten}{quote})"
+
+ return CSS_URL_RE.sub(repl, css_text)
+
+
+def looks_like_spa_route(subpath: str) -> bool:
+ if not subpath:
+ return True
+ name = Path(subpath).name
+ return "." not in name
+
+
+def random_slug(length: int = 40) -> str:
+ alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
+ return "".join(secrets.choice(alphabet) for _ in range(length))
+
+
+def reserve_random_slug(connect_db: ConnectFn) -> str:
+ for _ in range(30):
+ slug = random_slug(40)
+ with connect_db() as conn:
+ row = conn.execute("SELECT 1 FROM sites WHERE slug = ?", (slug,)).fetchone()
+ if not row:
+ return slug
+ raise RuntimeError("failed to reserve a unique slug")
+
+
+def extract_slug_from_referer(referer: Optional[str]) -> Optional[str]:
+ if not referer:
+ return None
+ try:
+ path = urlparse(referer).path
+ except ValueError:
+ return None
+ for prefix in ("/s/", "/_site/"):
+ if path.startswith(prefix):
+ remainder = path[len(prefix) :]
+ slug = remainder.split("/", 1)[0]
+ if slug_is_valid(slug):
+ return slug
+ return None
+
+
+def extract_zip_secure(archive_path: Path, destination: Path) -> None:
+ total_size = 0
+ total_files = 0
+ with zipfile.ZipFile(archive_path, "r") as zf:
+ for info in zf.infolist():
+ member_path = normalize_archive_member_path(info.filename)
+ if member_path is None:
+ continue
+
+ # reject symlink entries
+ mode = (info.external_attr >> 16) & 0o170000
+ if mode == stat.S_IFLNK:
+ raise ValueError("zip symlinks are not allowed")
+
+ target = destination / member_path
+ ensure_path_under(destination, target)
+ if info.is_dir():
+ target.mkdir(parents=True, exist_ok=True)
+ continue
+
+ total_files += 1
+ if total_files > MAX_EXTRACTED_FILES:
+ raise ValueError("archive has too many files")
+
+ total_size += int(info.file_size)
+ if total_size > MAX_EXTRACTED_BYTES:
+ raise ValueError("extracted archive size exceeds limit")
+
+ target.parent.mkdir(parents=True, exist_ok=True)
+ with zf.open(info, "r") as src, open(target, "wb") as dst:
+ shutil.copyfileobj(src, dst, length=1024 * 1024)
+
+
+def extract_tar_secure(archive_path: Path, destination: Path) -> None:
+ total_size = 0
+ total_files = 0
+ with tarfile.open(archive_path, "r:*") as tf:
+ for member in tf.getmembers():
+ member_path = normalize_archive_member_path(member.name)
+ if member_path is None:
+ continue
+ if member.issym() or member.islnk() or member.isdev():
+ raise ValueError("tar links and device files are not allowed")
+
+ target = destination / member_path
+ ensure_path_under(destination, target)
+
+ if member.isdir():
+ target.mkdir(parents=True, exist_ok=True)
+ continue
+
+ if not member.isfile():
+ raise ValueError("unsupported tar member type")
+
+ total_files += 1
+ if total_files > MAX_EXTRACTED_FILES:
+ raise ValueError("archive has too many files")
+
+ total_size += int(member.size)
+ if total_size > MAX_EXTRACTED_BYTES:
+ raise ValueError("extracted archive size exceeds limit")
+
+ source = tf.extractfile(member)
+ if source is None:
+ raise ValueError("failed to extract tar file member")
+
+ target.parent.mkdir(parents=True, exist_ok=True)
+ with source, open(target, "wb") as dst:
+ shutil.copyfileobj(source, dst, length=1024 * 1024)
+
+
+def extract_archive_secure(archive_path: Path, suffix: str, destination: Path) -> None:
+ if suffix == ".zip":
+ extract_zip_secure(archive_path, destination)
+ return
+ extract_tar_secure(archive_path, destination)
+
+
+def find_site_root(extracted_dir: Path) -> Path:
+ root_index = extracted_dir / "index.html"
+ if root_index.is_file():
+ return extracted_dir
+
+ candidates = sorted(
+ extracted_dir.rglob("index.html"),
+ key=lambda path: len(path.relative_to(extracted_dir).parts),
+ )
+ if not candidates:
+ raise ValueError("archive must include an index.html file")
+ return candidates[0].parent
+
+
+def create_app(base_dir: Optional[Path] = None) -> Flask:
+ project_dir = Path(base_dir or Path(__file__).parent).resolve()
+ app_name = os.getenv("SHIM_APP_NAME", "shim").strip() or "shim"
+ db_path = project_dir / "data" / "shim.db"
+ sites_dir = project_dir / "data" / "sites"
+ default_mojicrypt = project_dir / "vendor" / "mojicrypt"
+ mojicrypt_env = os.getenv("SHIM_MOJICRYPT_BIN", str(default_mojicrypt))
+ mojicrypt_bin = Path(mojicrypt_env).expanduser()
+ if not mojicrypt_bin.is_absolute():
+ mojicrypt_bin = (project_dir / mojicrypt_bin).resolve()
+
+ cfg = AppConfig(
+ base_dir=project_dir,
+ app_name=app_name,
+ db_path=db_path,
+ sites_dir=sites_dir,
+ mojicrypt_bin=mojicrypt_bin,
+ bind=os.getenv("SHIM_BIND", "0.0.0.0"),
+ port=int(os.getenv("SHIM_PORT", "6767")),
+ )
+
+ cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
+ cfg.sites_dir.mkdir(parents=True, exist_ok=True)
+
+ app = Flask(__name__, static_folder=None)
+ app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES
+ app.config["SECRET_KEY"] = os.getenv("SHIM_SECRET_KEY", secrets.token_hex(32))
+ app.config["SHIM_PORT"] = cfg.port
+ app.config["SHIM_BIND"] = cfg.bind
+ app.config["SHIM_APP_NAME"] = cfg.app_name
+ app.config["SHIM_MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin)
+
+ def connect_db() -> sqlite3.Connection:
+ conn = sqlite3.connect(str(cfg.db_path))
+ conn.row_factory = sqlite3.Row
+ conn.execute("PRAGMA foreign_keys = ON")
+ return conn
+
+ with connect_db() as conn:
+ conn.executescript(
+ """
+ CREATE TABLE IF NOT EXISTS users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ username TEXT UNIQUE NOT NULL,
+ role TEXT NOT NULL CHECK (role IN ('admin', 'user')),
+ encrypted_challenge TEXT NOT NULL,
+ created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
+ );
+
+ CREATE TABLE IF NOT EXISTS sessions (
+ token TEXT PRIMARY KEY,
+ user_id INTEGER NOT NULL,
+ expires_at INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
+ );
+
+ CREATE TABLE IF NOT EXISTS sites (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ owner_user_id INTEGER NOT NULL,
+ slug TEXT UNIQUE NOT NULL,
+ storage_key TEXT UNIQUE NOT NULL,
+ original_filename TEXT NOT NULL,
+ created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY (owner_user_id) REFERENCES users(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
+ CREATE INDEX IF NOT EXISTS idx_sessions_expiry ON sessions(expires_at);
+ CREATE INDEX IF NOT EXISTS idx_sites_owner ON sites(owner_user_id);
+ """
+ )
+
+ # auth provider is injected as a single backend to keep swap-over simple.
+ auth_backend: AuthBackend = LocalMojicryptAuthBackend(
+ connect_db=connect_db,
+ mojicrypt_bin=cfg.mojicrypt_bin,
+ )
+
+ def render_page(title: str, body_template: str, **context: object) -> str:
+ body = render_template_string(body_template, **context)
+ return render_template_string(
+ SHELL_TEMPLATE,
+ title=title,
+ app_name=cfg.app_name,
+ current_user=g.current_user,
+ body=body,
+ )
+
+ def create_session(user_id: int) -> str:
+ token = secrets.token_urlsafe(48)
+ now = int(time.time())
+ with connect_db() as conn:
+ conn.execute(
+ """
+ INSERT INTO sessions (token, user_id, expires_at, created_at)
+ VALUES (?, ?, ?, ?)
+ """,
+ (token, user_id, now + SESSION_TTL_SECONDS, now),
+ )
+ return token
+
+ def destroy_session(token: str) -> None:
+ with connect_db() as conn:
+ conn.execute("DELETE FROM sessions WHERE token = ?", (token,))
+
+ def user_from_token(token: str) -> Optional[sqlite3.Row]:
+ now = int(time.time())
+ with connect_db() as conn:
+ conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
+ return conn.execute(
+ """
+ SELECT u.id, u.username, u.role
+ FROM sessions s
+ JOIN users u ON u.id = s.user_id
+ WHERE s.token = ? AND s.expires_at > ?
+ """,
+ (token, now),
+ ).fetchone()
+
+ def require_auth() -> Optional[Response]:
+ if g.current_user is not None:
+ return None
+ flash("login required", "error")
+ return redirect(url_for("login"))
+
+ def require_admin() -> Optional[Response]:
+ auth_redirect = require_auth()
+ if auth_redirect is not None:
+ return auth_redirect
+ if g.current_user["role"] != "admin":
+ flash("admin access required", "error")
+ return redirect(url_for("dashboard"))
+ return None
+
+ def get_site_by_slug(slug: str) -> Optional[sqlite3.Row]:
+ with connect_db() as conn:
+ return conn.execute(
+ "SELECT id, owner_user_id, slug, storage_key FROM sites WHERE slug = ?",
+ (slug,),
+ ).fetchone()
+
+ def send_site_file(file_path: Path, slug: str) -> Response:
+ ext = file_path.suffix.lower()
+ mime, _ = mimetypes.guess_type(str(file_path))
+ if ext == ".html":
+ html = file_path.read_text(encoding="utf-8", errors="replace")
+ body = rewrite_html_for_slug(html, slug)
+ response = make_response(body)
+ response.mimetype = "text/html"
+ response.headers["Cache-Control"] = "no-cache"
+ elif ext == ".css":
+ css = file_path.read_text(encoding="utf-8", errors="replace")
+ body = rewrite_css_for_slug(css, slug)
+ response = make_response(body)
+ response.mimetype = "text/css"
+ response.headers["Cache-Control"] = "public, max-age=300"
+ else:
+ response = make_response(
+ send_file(file_path, mimetype=mime or "application/octet-stream", conditional=True)
+ )
+ if ext in {".js", ".mjs"}:
+ response.headers["Cache-Control"] = "public, max-age=300"
+ elif ext in {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico"}:
+ response.headers["Cache-Control"] = "public, max-age=86400, immutable"
+ else:
+ response.headers["Cache-Control"] = "public, max-age=3600"
+ response.headers["X-Content-Type-Options"] = "nosniff"
+ response.set_cookie(ACTIVE_SITE_COOKIE, slug, max_age=1800, samesite="Lax", path="/")
+ return response
+
+ def resolve_site_path(site_root: Path, subpath: str) -> Optional[Path]:
+ cleaned = subpath.lstrip("/")
+ candidate = (site_root / cleaned).resolve()
+ try:
+ candidate.relative_to(site_root.resolve())
+ except ValueError:
+ return None
+ return candidate
+
+ def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response:
+ site_root = (cfg.sites_dir / site["storage_key"]).resolve()
+ if not site_root.is_dir():
+ abort(404)
+
+ target = resolve_site_path(site_root, subpath)
+ if target is None:
+ abort(404)
+
+ if target.is_dir():
+ index_path = target / "index.html"
+ if index_path.is_file():
+ return send_site_file(index_path, site["slug"])
+ elif target.is_file():
+ return send_site_file(target, site["slug"])
+
+ if allow_spa and looks_like_spa_route(subpath):
+ index_path = site_root / "index.html"
+ if index_path.is_file():
+ return send_site_file(index_path, site["slug"])
+
+ abort(404)
+
+ @app.before_request
+ def load_current_user() -> None:
+ g.current_user = None
+ token = request.cookies.get(SESSION_COOKIE)
+ if not token:
+ return
+ user = user_from_token(token)
+ if user is not None:
+ g.current_user = user
+
+ @app.errorhandler(413)
+ def too_large(_: Exception) -> tuple[str, int]:
+ return "upload exceeds configured max size", 413
+
+ @app.get("/")
+ def root() -> Response:
+ return redirect(url_for("dashboard"))
+
+ @app.get("/healthz")
+ def healthz() -> tuple[str, int]:
+ return "ok", 200
+
+ @app.get("/app")
+ def dashboard() -> Response:
+ if auth_backend.bootstrap_required():
+ return redirect(url_for("setup"))
+ auth_redirect = require_auth()
+ if auth_redirect is not None:
+ return auth_redirect
+
+ is_admin = g.current_user["role"] == "admin"
+ with connect_db() as conn:
+ if is_admin:
+ sites = conn.execute(
+ """
+ SELECT s.id, s.slug, s.original_filename, s.created_at, u.username AS owner_username
+ FROM sites s
+ JOIN users u ON u.id = s.owner_user_id
+ ORDER BY s.id DESC
+ """
+ ).fetchall()
+ users = conn.execute(
+ "SELECT id, username, role, created_at FROM users ORDER BY id ASC"
+ ).fetchall()
+ else:
+ sites = conn.execute(
+ """
+ SELECT id, slug, original_filename, created_at
+ FROM sites
+ WHERE owner_user_id = ?
+ ORDER BY id DESC
+ """,
+ (g.current_user["id"],),
+ ).fetchall()
+ users = []
+
+ return render_page(
+ title="dashboard",
+ body_template=DASHBOARD_BODY_TEMPLATE,
+ allowed_suffixes=ARCHIVE_SUFFIXES,
+ is_admin=is_admin,
+ sites=sites,
+ users=users,
+ current_user=g.current_user,
+ )
+
+ @app.get("/app/setup")
+ def setup() -> Response:
+ if not auth_backend.bootstrap_required():
+ return redirect(url_for("dashboard"))
+ return render_page(title="setup", body_template=SETUP_BODY_TEMPLATE)
+
+ @app.post("/app/setup")
+ def setup_submit() -> Response:
+ if not auth_backend.bootstrap_required():
+ flash("setup already completed", "error")
+ return redirect(url_for("login"))
+
+ username = request.form.get("username", "")
+ password = request.form.get("password", "")
+ confirm = request.form.get("confirm", "")
+
+ if password != confirm:
+ flash("passwords do not match", "error")
+ return redirect(url_for("setup"))
+
+ ok, message = auth_backend.create_user(username=username, password=password, role="admin")
+ if not ok:
+ flash(message, "error")
+ return redirect(url_for("setup"))
+
+ user = auth_backend.authenticate(username=username, password=password)
+ if user is None:
+ flash("admin created but login failed", "error")
+ return redirect(url_for("login"))
+
+ token = create_session(user["id"])
+ response = redirect(url_for("dashboard"))
+ response.set_cookie(
+ SESSION_COOKIE,
+ token,
+ max_age=SESSION_TTL_SECONDS,
+ httponly=True,
+ samesite="Lax",
+ path="/",
+ )
+ flash("admin account created", "success")
+ return response
+
+ @app.get("/app/login")
+ def login() -> Response:
+ if auth_backend.bootstrap_required():
+ return redirect(url_for("setup"))
+ if g.current_user is not None:
+ return redirect(url_for("dashboard"))
+ return render_page(title="login", body_template=LOGIN_BODY_TEMPLATE)
+
+ @app.post("/app/login")
+ def login_submit() -> Response:
+ if auth_backend.bootstrap_required():
+ flash("run setup first", "error")
+ return redirect(url_for("setup"))
+
+ username = request.form.get("username", "")
+ password = request.form.get("password", "")
+ user = auth_backend.authenticate(username=username, password=password)
+ if user is None:
+ flash("invalid credentials", "error")
+ return redirect(url_for("login"))
+
+ token = create_session(user["id"])
+ response = redirect(url_for("dashboard"))
+ response.set_cookie(
+ SESSION_COOKIE,
+ token,
+ max_age=SESSION_TTL_SECONDS,
+ httponly=True,
+ samesite="Lax",
+ path="/",
+ )
+ flash("signed in", "success")
+ return response
+
+ @app.post("/app/logout")
+ def logout() -> Response:
+ token = request.cookies.get(SESSION_COOKIE)
+ if token:
+ destroy_session(token)
+ response = redirect(url_for("login"))
+ response.set_cookie(SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", path="/")
+ return response
+
+ @app.post("/app/upload")
+ def upload_site() -> Response:
+ auth_redirect = require_auth()
+ if auth_redirect is not None:
+ return auth_redirect
+
+ archive = request.files.get("archive")
+ if archive is None or not archive.filename:
+ flash("select an archive file", "error")
+ return redirect(url_for("dashboard"))
+
+ suffix = detect_archive_suffix(archive.filename)
+ if suffix is None:
+ flash("unsupported archive format", "error")
+ return redirect(url_for("dashboard"))
+
+ temp_dir = Path(tempfile.mkdtemp(prefix="shim-upload-"))
+ archive_path = temp_dir / f"upload{suffix}"
+ extract_dir = temp_dir / "extract"
+ extract_dir.mkdir(parents=True, exist_ok=True)
+
+ final_dir: Optional[Path] = None
+ try:
+ archive.save(archive_path)
+ extract_archive_secure(archive_path, suffix, extract_dir)
+ site_root = find_site_root(extract_dir)
+
+ slug = reserve_random_slug(connect_db)
+ storage_key = secrets.token_hex(24)
+ final_dir = cfg.sites_dir / storage_key
+
+ shutil.copytree(site_root, final_dir)
+
+ with connect_db() as conn:
+ conn.execute(
+ """
+ INSERT INTO sites (owner_user_id, slug, storage_key, original_filename, created_at, updated_at)
+ VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
+ """,
+ (
+ int(g.current_user["id"]),
+ slug,
+ storage_key,
+ Path(archive.filename).name,
+ ),
+ )
+ except ValueError as exc:
+ flash(str(exc), "error")
+ if final_dir is not None:
+ shutil.rmtree(final_dir, ignore_errors=True)
+ return redirect(url_for("dashboard"))
+ except sqlite3.IntegrityError:
+ flash("failed to save site record", "error")
+ if final_dir is not None:
+ shutil.rmtree(final_dir, ignore_errors=True)
+ return redirect(url_for("dashboard"))
+ except (tarfile.TarError, zipfile.BadZipFile):
+ flash("archive is invalid or corrupted", "error")
+ if final_dir is not None:
+ shutil.rmtree(final_dir, ignore_errors=True)
+ return redirect(url_for("dashboard"))
+ finally:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+
+ flash("site uploaded", "success")
+ return redirect(url_for("dashboard"))
+
+ @app.post("/app/sites/<int:site_id>/delete")
+ def delete_site(site_id: int) -> Response:
+ auth_redirect = require_auth()
+ if auth_redirect is not None:
+ return auth_redirect
+
+ with connect_db() as conn:
+ site = conn.execute(
+ "SELECT id, owner_user_id, slug, storage_key FROM sites WHERE id = ?",
+ (site_id,),
+ ).fetchone()
+ if site is None:
+ flash("site not found", "error")
+ return redirect(url_for("dashboard"))
+
+ is_admin = g.current_user["role"] == "admin"
+ if not is_admin and int(site["owner_user_id"]) != int(g.current_user["id"]):
+ flash("not allowed", "error")
+ return redirect(url_for("dashboard"))
+
+ conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
+
+ shutil.rmtree(cfg.sites_dir / site["storage_key"], ignore_errors=True)
+ flash("site deleted", "success")
+ return redirect(url_for("dashboard"))
+
+ @app.post("/app/admin/users/create")
+ def admin_create_user() -> Response:
+ admin_redirect = require_admin()
+ if admin_redirect is not None:
+ return admin_redirect
+
+ username = request.form.get("username", "")
+ password = request.form.get("password", "")
+ role = request.form.get("role", "user").strip().lower()
+
+ ok, message = auth_backend.create_user(username=username, password=password, role=role)
+ flash(message, "success" if ok else "error")
+ return redirect(url_for("dashboard"))
+
+ @app.post("/app/admin/users/<int:user_id>/delete")
+ def admin_delete_user(user_id: int) -> Response:
+ admin_redirect = require_admin()
+ if admin_redirect is not None:
+ return admin_redirect
+
+ if int(g.current_user["id"]) == int(user_id):
+ flash("you cannot delete your own account", "error")
+ return redirect(url_for("dashboard"))
+
+ with connect_db() as conn:
+ user = conn.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)).fetchone()
+ if user is None:
+ flash("user not found", "error")
+ return redirect(url_for("dashboard"))
+
+ owned_site_keys = conn.execute(
+ "SELECT storage_key FROM sites WHERE owner_user_id = ?",
+ (user_id,),
+ ).fetchall()
+
+ conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
+
+ for row in owned_site_keys:
+ shutil.rmtree(cfg.sites_dir / row["storage_key"], ignore_errors=True)
+
+ flash("user deleted", "success")
+ return redirect(url_for("dashboard"))
+
+ @app.post("/app/admin/sites/<int:site_id>/slug")
+ def admin_update_slug(site_id: int) -> Response:
+ admin_redirect = require_admin()
+ if admin_redirect is not None:
+ return admin_redirect
+
+ new_slug = request.form.get("slug", "").strip().lower()
+ if not slug_is_valid(new_slug):
+ flash("invalid slug format", "error")
+ return redirect(url_for("dashboard"))
+
+ try:
+ with connect_db() as conn:
+ cursor = conn.execute(
+ """
+ UPDATE sites
+ SET slug = ?, updated_at = CURRENT_TIMESTAMP
+ WHERE id = ?
+ """,
+ (new_slug, site_id),
+ )
+ if cursor.rowcount == 0:
+ flash("site not found", "error")
+ return redirect(url_for("dashboard"))
+ except sqlite3.IntegrityError:
+ flash("slug is already in use", "error")
+ return redirect(url_for("dashboard"))
+
+ flash("slug updated", "success")
+ return redirect(url_for("dashboard"))
+
+ @app.get("/s/<slug>/")
+ def serve_site_root(slug: str) -> Response:
+ if not slug_is_valid(slug):
+ abort(404)
+ site = get_site_by_slug(slug)
+ if site is None:
+ abort(404)
+ return serve_site_resource(site, subpath="", allow_spa=True)
+
+ @app.get("/s/<slug>/<path:subpath>")
+ def serve_site_subpath(slug: str, subpath: str) -> Response:
+ if not slug_is_valid(slug):
+ abort(404)
+ site = get_site_by_slug(slug)
+ if site is None:
+ abort(404)
+ return serve_site_resource(site, subpath=subpath, allow_spa=True)
+
+ @app.get("/_site/<slug>/")
+ def serve_site_alias_root(slug: str) -> Response:
+ return serve_site_root(slug)
+
+ @app.get("/_site/<slug>/<path:subpath>")
+ def serve_site_alias_subpath(slug: str, subpath: str) -> Response:
+ return serve_site_subpath(slug, subpath)
+
+ @app.get("/<path:subpath>")
+ def site_root_relative_fallback(subpath: str) -> Response:
+ # handle absolute root paths from hosted apps by resolving the slug from referer/cookie
+ first = subpath.split("/", 1)[0].lower()
+ if first in {"app", "api", "s", "_site", "healthz"}:
+ abort(404)
+
+ slug = extract_slug_from_referer(request.headers.get("Referer"))
+ if slug is None:
+ cookie_slug = request.cookies.get(ACTIVE_SITE_COOKIE)
+ if cookie_slug and slug_is_valid(cookie_slug):
+ slug = cookie_slug
+
+ if slug is None:
+ abort(404)
+
+ site = get_site_by_slug(slug)
+ if site is None:
+ abort(404)
+ return serve_site_resource(site, subpath=subpath, allow_spa=True)
+
+ return app \ No newline at end of file
diff --git a/vendor/mojicrypt b/vendor/mojicrypt
new file mode 100755
index 0000000..4f5c048
--- /dev/null
+++ b/vendor/mojicrypt
@@ -0,0 +1,351 @@
+#!/usr/bin/env python
+
+# mojicrypt — aes-256-gcm encryption with unicode-encoded output
+# turns your secrets into a wall of emojis and symbols
+# works on text, binary files, images, whatever you throw at it
+
+import sys
+import os
+import argparse
+import secrets
+import time
+import uuid
+
+try:
+ from Crypto.Cipher import AES
+ from Crypto.Protocol.KDF import scrypt
+ from Crypto.Random import get_random_bytes
+except ModuleNotFoundError:
+ from Cryptodome.Cipher import AES
+ from Cryptodome.Protocol.KDF import scrypt
+ from Cryptodome.Random import get_random_bytes
+
+APP_NAME = "mojicrypt"
+VERSION = "20260303"
+KEYFILE_LEN = 8192 # characters of hex entropy written to auto-generated keyfiles
+
+# params
+SCRYPT_N = 2**18 # cpu/memory cost
+SCRYPT_R = 8 # block size
+SCRYPT_P = 1 # parallelization
+SALT_LEN = 16 # 128-bit salt
+KEY_LEN = 32 # 256-bit key
+NONCE_LEN = 12 # 96-bit nonce (gcm standard)
+TAG_LEN = 16 # 128-bit auth tag
+HEADER_LEN = SALT_LEN + NONCE_LEN + TAG_LEN # 44 bytes overhead
+
+
+# glyphs
+# 256 unique non-alphanumeric unicode symbols
+# each byte (0x00-0xFF) maps to one glyph — simple 1:1 encoding
+# curated from: emoji faces, nature, animals, food, misc symbols,
+# arrows, geometric shapes, and math operators
+
+def _build_glyph_table():
+ """construct the 256-char glyph table from curated unicode ranges"""
+ codepoints = []
+
+ # emoji faces — smileys and expressions (64)
+ codepoints.extend(range(0x1F600, 0x1F640))
+
+ # weather and nature — cyclone through shooting star (32)
+ codepoints.extend(range(0x1F300, 0x1F320))
+
+ # animals — rat through blowfish (32)
+ codepoints.extend(range(0x1F400, 0x1F420))
+
+ # food — tomato through shrimp (32)
+ codepoints.extend(range(0x1F345, 0x1F365))
+
+ # misc symbols — sun through pointing fingers (32)
+ codepoints.extend(range(0x2600, 0x2620))
+
+ # arrows — left through rightwards paired (16)
+ codepoints.extend(range(0x2190, 0x21A0))
+
+ # geometric shapes — black square through white vertical rectangle (16)
+ codepoints.extend(range(0x25A0, 0x25B0))
+
+ # mathematical operators — for all through right angle (32)
+ codepoints.extend(range(0x2200, 0x2220))
+
+ table = "".join(chr(cp) for cp in codepoints)
+
+ # sanity checks
+ assert len(table) == 256, f"glyph table has {len(table)} entries, expected 256"
+ assert len(set(table)) == 256, "duplicate glyphs detected in table"
+ for g in table:
+ assert not g.isalnum(), f"alphanumeric char found: {g} (U+{ord(g):04X})"
+ assert not g.isspace(), f"whitespace char found: {g} (U+{ord(g):04X})"
+ # no joiners (ZWJ, ZWNJ, BOM)
+ assert ord(g) not in (0x200D, 0x200C, 0xFEFF), \
+ f"joiner/bom found: U+{ord(g):04X}"
+
+ return table
+
+
+GLYPH_TABLE = _build_glyph_table()
+GLYPH_TO_BYTE = {g: i for i, g in enumerate(GLYPH_TABLE)}
+
+
+# encode/decode
+
+def bytes_to_glyphs(data: bytes) -> str:
+ """encode raw bytes as unicode glyphs"""
+ return "".join(GLYPH_TABLE[b] for b in data)
+
+
+def glyphs_to_bytes(text: str) -> bytes:
+ """decode unicode glyphs back to raw bytes"""
+ result = []
+ for g in text:
+ if g not in GLYPH_TO_BYTE:
+ print(
+ f"error: invalid glyph in ciphertext: {g} (U+{ord(g):04X})",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ result.append(GLYPH_TO_BYTE[g])
+ return bytes(result)
+
+
+# crypto operations
+
+def derive_key(passphrase: str, salt: bytes) -> bytes:
+ """derive a 256-bit key from passphrase using scrypt"""
+ return scrypt(
+ passphrase.encode("utf-8"),
+ salt,
+ key_len=KEY_LEN,
+ N=SCRYPT_N,
+ r=SCRYPT_R,
+ p=SCRYPT_P,
+ )
+
+
+def encrypt_bytes(data: bytes, passphrase: str) -> str:
+ """encrypt raw bytes with aes-256-gcm, return unicode-encoded ciphertext"""
+ salt = get_random_bytes(SALT_LEN)
+ key = derive_key(passphrase, salt)
+
+ nonce = get_random_bytes(NONCE_LEN)
+ cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
+ ciphertext, tag = cipher.encrypt_and_digest(data)
+
+ # wire format: salt(16) || nonce(12) || tag(16) || ciphertext(N)
+ packed = salt + nonce + tag + ciphertext
+ return bytes_to_glyphs(packed)
+
+
+def decrypt_bytes(encoded: str, passphrase: str) -> bytes:
+ """decode unicode glyphs and decrypt with aes-256-gcm, return raw bytes"""
+ raw = glyphs_to_bytes(encoded)
+
+ if len(raw) < HEADER_LEN:
+ print("error: ciphertext too short to be valid", file=sys.stderr)
+ sys.exit(1)
+
+ # unpack: salt(16) || nonce(12) || tag(16) || ciphertext(N)
+ salt = raw[:SALT_LEN]
+ nonce = raw[SALT_LEN:SALT_LEN + NONCE_LEN]
+ tag = raw[SALT_LEN + NONCE_LEN:HEADER_LEN]
+ ciphertext = raw[HEADER_LEN:]
+
+ key = derive_key(passphrase, salt)
+
+ try:
+ cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
+ return cipher.decrypt_and_verify(ciphertext, tag)
+ except (ValueError, KeyError):
+ print(
+ "error: decryption failed — wrong passphrase or corrupted data",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+# text convenience wrappers
+
+def encrypt_text(plaintext: str, passphrase: str) -> str:
+ """encode text to utf-8, encrypt, return glyph string"""
+ return encrypt_bytes(plaintext.encode("utf-8"), passphrase)
+
+
+def decrypt_text(encoded: str, passphrase: str) -> str:
+ """decrypt and decode result as utf-8 text"""
+ raw = decrypt_bytes(encoded, passphrase)
+ try:
+ return raw.decode("utf-8")
+ except UnicodeDecodeError:
+ print(
+ "error: decrypted data is not valid utf-8 — "
+ "did you mean to use --file/-f to write to a binary output?",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+# keyfile helpers
+
+def generate_keyfile() -> tuple[str, str]:
+ """generate an 8192-char hex key, write it to a .ukey file, return (key, path)"""
+ key = secrets.token_hex(KEYFILE_LEN // 2) # token_hex(N) gives 2*N hex chars
+ epoch = int(time.time())
+ uid = uuid.uuid4()
+ filename = f"mojicrypt-key{epoch}_{uid}.ukey"
+ with open(filename, "w", encoding="utf-8") as fh:
+ fh.write(key)
+ # lock it down — readable only by the owner
+ os.chmod(filename, 0o600)
+ return key, filename
+
+
+def load_keyfile(path: str) -> str:
+ """read a .ukey file and return its contents as the passphrase"""
+ if not os.path.isfile(path):
+ print(f"error: keyfile not found: {path}", file=sys.stderr)
+ sys.exit(1)
+ with open(path, "r", encoding="utf-8") as fh:
+ key = fh.read().strip()
+ if not key:
+ print(f"error: keyfile is empty: {path}", file=sys.stderr)
+ sys.exit(1)
+ return key
+
+
+# cli
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ prog=APP_NAME,
+ description="aes-256-gcm encryption with unicode-encoded output",
+ )
+ parser.add_argument(
+ "-v", "--version",
+ action="version",
+ version=f"{APP_NAME} {VERSION}",
+ )
+ parser.add_argument(
+ "mode",
+ choices=["encrypt", "decrypt"],
+ help="operation to perform",
+ )
+ parser.add_argument(
+ "text",
+ nargs="?",
+ help="text to encrypt/decrypt (reads stdin if omitted, ignored when -f is used)",
+ )
+ parser.add_argument(
+ "-p", "--passphrase",
+ help="passphrase (prompted securely if omitted)",
+ )
+ parser.add_argument(
+ "-f", "--file",
+ metavar="PATH",
+ help="input file to encrypt/decrypt",
+ )
+ parser.add_argument(
+ "-o", "--output",
+ metavar="PATH",
+ help=(
+ "output file path "
+ "(encrypt default: <input>.uc | decrypt default: strips .uc or appends .dec)"
+ ),
+ )
+ parser.add_argument(
+ "-k", "--keyfile",
+ metavar="PATH",
+ help="load passphrase from a .ukey file instead of -p",
+ )
+
+ args = parser.parse_args()
+
+ # resolve passphrase — priority: -p > -k > auto-generate (encrypt) / error (decrypt)
+ if args.passphrase:
+ passphrase = args.passphrase
+ elif args.keyfile:
+ passphrase = load_keyfile(args.keyfile)
+ print(f"using keyfile: {args.keyfile}", file=sys.stderr)
+ elif args.mode == "encrypt":
+ passphrase, kf_path = generate_keyfile()
+ print(f"no passphrase given - generated keyfile: {kf_path}", file=sys.stderr)
+ print("keep that file safe — you'll need it to decrypt", file=sys.stderr)
+ else:
+ print(
+ "error: no passphrase provided for decryption.\n"
+ " use -p <passphrase> or -k <keyfile.ukey>",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ # file mode
+ if args.file:
+ in_path = args.file
+
+ if not os.path.isfile(in_path):
+ print(f"error: file not found: {in_path}", file=sys.stderr)
+ sys.exit(1)
+
+ if args.mode == "encrypt":
+ # default output: <input>.uc
+ out_path = args.output or (in_path + ".uc")
+
+ with open(in_path, "rb") as fh:
+ data = fh.read()
+
+ glyphs = encrypt_bytes(data, passphrase)
+
+ with open(out_path, "w", encoding="utf-8") as fh:
+ fh.write(glyphs)
+
+ print(f"encrypted → {out_path}", file=sys.stderr)
+
+ else: # decrypt
+ # default output: strip .uc if present, else append .dec
+ if args.output:
+ out_path = args.output
+ elif in_path.endswith(".uc"):
+ out_path = in_path[:-3]
+ else:
+ out_path = in_path + ".dec"
+
+ with open(in_path, "r", encoding="utf-8") as fh:
+ encoded = fh.read().strip()
+
+ raw = decrypt_bytes(encoded, passphrase)
+
+ with open(out_path, "wb") as fh:
+ fh.write(raw)
+
+ print(f"decrypted → {out_path}", file=sys.stderr)
+
+ return
+
+ # ── text / stdin mode ──────────────────────────────────────
+ if args.text:
+ text = args.text
+ elif not sys.stdin.isatty():
+ text = sys.stdin.read()
+ else:
+ if args.mode == "encrypt":
+ print("enter plaintext (ctrl+d to finish):", file=sys.stderr)
+ else:
+ print("paste ciphertext (ctrl+d to finish):", file=sys.stderr)
+ text = sys.stdin.read()
+
+ if not text:
+ print("error: no input provided", file=sys.stderr)
+ sys.exit(1)
+
+ if args.mode == "encrypt":
+ result = encrypt_text(text, passphrase)
+ print(result)
+ else:
+ # strip whitespace from pasted ciphertext (none of our glyphs are whitespace)
+ result = decrypt_text(text.strip(), passphrase)
+ print(result)
+
+
+if __name__ == "__main__":
+ main()