diff options
| author | kj_sh604 | 2026-04-03 00:46:18 -0400 |
|---|---|---|
| committer | kj_sh604 | 2026-04-03 00:46:18 -0400 |
| commit | b5b1a685bf9c2dd172545091c049717360a23648 (patch) | |
| tree | 34db8bbad826d49ca85172b19326317741923d7d | |
initial: shim static site server
| -rw-r--r-- | .gitignore | 9 | ||||
| -rw-r--r-- | LICENSE | 21 | ||||
| -rw-r--r-- | auth_backend.py | 168 | ||||
| -rw-r--r-- | data/sites/.gitkeep | 0 | ||||
| -rw-r--r-- | requirements.txt | 8 | ||||
| -rw-r--r-- | server.py | 10 | ||||
| -rw-r--r-- | shim_app.py | 1085 | ||||
| -rwxr-xr-x | vendor/mojicrypt | 351 |
8 files changed, 1652 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9524d87 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +.venv/ + +data/*.db +data/sites/* +!data/sites/.gitkeep
\ No newline at end of file @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 kj_sh604/kj-sh604/kjsh604 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/auth_backend.py b/auth_backend.py new file mode 100644 index 0000000..e1ac01c --- /dev/null +++ b/auth_backend.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +import os +import re +import sqlite3 +import subprocess +import sys +from pathlib import Path +from typing import Callable, Optional, Protocol + + +AUTH_CHALLENGE = "SHIM_AUTH_VALID" +USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$") + +ConnectFn = Callable[[], sqlite3.Connection] + + +class AuthBackend(Protocol): + def bootstrap_required(self) -> bool: + ... + + def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]: + ... + + def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]: + ... + + +def normalize_username(username: str) -> str: + return username.strip().lower() + + +def looks_like_python_script(path: Path) -> bool: + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + first_line = f.readline(200).lower() + except OSError: + return False + return first_line.startswith("#!") and "python" in first_line + + +class LocalMojicryptAuthBackend: + """default auth backend using sqlite and local mojicrypt challenge encryption""" + + def __init__(self, connect_db: ConnectFn, mojicrypt_bin: Path): + self.connect_db = connect_db + self.mojicrypt_bin = mojicrypt_bin + self.last_error = "" + + def bootstrap_required(self) -> bool: + with self.connect_db() as conn: + row = conn.execute("SELECT COUNT(*) AS count FROM users").fetchone() + return int(row["count"]) == 0 + + def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]: + username = normalize_username(username) + if not USERNAME_RE.fullmatch(username): + return False, "username must be lowercase and use [a-z0-9_.-], 2-64 chars" + if len(password) < 2: + return False, "password must be at least 2 characters" + if role not in {"admin", "user"}: + return False, "invalid role" + + encrypted = self.encrypt_password(password) + if not encrypted: + return False, "mojicrypt encryption failed" + + try: + with self.connect_db() as conn: + conn.execute( + """ + INSERT INTO users (username, role, encrypted_challenge, created_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + """, + (username, role, encrypted), + ) + return True, "user created" + except sqlite3.IntegrityError: + return False, "username already exists" + + def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]: + username = normalize_username(username) + with self.connect_db() as conn: + user = conn.execute( + """ + SELECT id, username, role, encrypted_challenge + FROM users + WHERE username = ? + """, + (username,), + ).fetchone() + if not user: + return None + if not self.verify_password(user["encrypted_challenge"], password): + return None + return user + + def encrypt_password(self, password: str) -> Optional[str]: + return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password) + + def verify_password(self, encrypted_blob: str, password: str) -> bool: + decrypted = self._run_mojicrypt("decrypt", encrypted_blob, password) + return decrypted == AUTH_CHALLENGE + + def _run_mojicrypt(self, command: str, payload: str, passphrase: str) -> Optional[str]: + self.last_error = "" + + if not self.mojicrypt_bin.exists(): + self.last_error = f"binary not found at {self.mojicrypt_bin}" + return None + + is_python_script = looks_like_python_script(self.mojicrypt_bin) + direct_exec_ok = os.access(self.mojicrypt_bin, os.X_OK) + python_exec_ok = bool(sys.executable) and is_python_script + + if not direct_exec_ok and not python_exec_ok: + self.last_error = "binary is not executable" + return None + + commands_to_try = [] + if direct_exec_ok: + commands_to_try.append([ + str(self.mojicrypt_bin), + "-p", + passphrase, + command, + payload, + ]) + + # some systems do not expose a plain `python` binary for shebang execution. + # retrying with the active interpreter avoids false failures for vendored scripts. + if python_exec_ok: + fallback_cmd = [ + sys.executable, + str(self.mojicrypt_bin), + "-p", + passphrase, + command, + payload, + ] + if fallback_cmd not in commands_to_try: + commands_to_try.append(fallback_cmd) + + for cmd in commands_to_try: + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=90, + ) + except FileNotFoundError as exc: + self.last_error = str(exc) + continue + except subprocess.TimeoutExpired: + self.last_error = "command timed out" + continue + + if result.returncode == 0: + return result.stdout.strip() + + stderr = (result.stderr or "").strip() + if stderr: + self.last_error = stderr + else: + self.last_error = f"command exited with status {result.returncode}" + + return None
\ No newline at end of file diff --git a/data/sites/.gitkeep b/data/sites/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/data/sites/.gitkeep diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cc83c7b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +blinker==1.9.0 +click==8.3.1 +Flask==3.1.3 +itsdangerous==2.2.0 +Jinja2==3.1.6 +MarkupSafe==3.0.3 +pycryptodome==3.23.0 +Werkzeug==3.1.8 diff --git a/server.py b/server.py new file mode 100644 index 0000000..e735a36 --- /dev/null +++ b/server.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 + +from shim_app import create_app + + +app = create_app() + + +if __name__ == "__main__": + app.run(host=app.config["SHIM_BIND"], port=app.config["SHIM_PORT"])
\ No newline at end of file diff --git a/shim_app.py b/shim_app.py new file mode 100644 index 0000000..7e6891a --- /dev/null +++ b/shim_app.py @@ -0,0 +1,1085 @@ +#!/usr/bin/env python3 + +import mimetypes +import os +import re +import secrets +import shutil +import sqlite3 +import stat +import tarfile +import tempfile +import time +import zipfile +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Optional +from urllib.parse import urlparse + +from flask import ( + Flask, + Response, + abort, + flash, + g, + make_response, + redirect, + render_template_string, + request, + send_file, + url_for, +) + +from auth_backend import AuthBackend, LocalMojicryptAuthBackend + + +SESSION_TTL_SECONDS = int(os.getenv("SHIM_SESSION_TTL_SECONDS", "86400")) +MAX_UPLOAD_BYTES = int(os.getenv("SHIM_MAX_UPLOAD_BYTES", str(1024 * 1024 * 1024))) +MAX_EXTRACTED_BYTES = int( + os.getenv("SHIM_MAX_EXTRACTED_BYTES", str(2 * 1024 * 1024 * 1024)) +) +MAX_EXTRACTED_FILES = int(os.getenv("SHIM_MAX_EXTRACTED_FILES", "20000")) +SESSION_COOKIE = "shim_session" +ACTIVE_SITE_COOKIE = "shim_active_site" +SLUG_RE = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,126}[a-z0-9])?$") +ARCHIVE_SUFFIXES = ( + ".tar.gz", + ".tgz", + ".tar.bz2", + ".tbz2", + ".tar.xz", + ".txz", + ".tar", + ".zip", +) +ROOT_ATTR_RE = re.compile(r"(?i)\b(href|src|action|poster)=([\"'])/([^\"']*)\2") +CSS_URL_RE = re.compile(r"(?i)url\(\s*([\"']?)/([^\)'\"\s]+)\1\s*\)") + + +SHELL_TEMPLATE = """<!doctype html> +<html lang="en"> +<head> + <meta charset="utf-8"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <meta name="color-scheme" content="light dark"> + <meta http-equiv="cache-control" content="no-cache"> + <meta http-equiv="expires" content="0"> + <meta http-equiv="pragma" content="no-cache"> + <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> + <title>{{ title }} - {{ app_name }}</title> + <style> + main { max-width: 1120px; margin: 0 auto; } + .topbar { display: flex; flex-wrap: wrap; align-items: center; gap: 0.75rem; justify-content: space-between; } + .topbar nav { display: flex; gap: 0.75rem; align-items: center; } + .inline { display: inline; } + .quiet { opacity: 0.8; } + .stack { display: grid; gap: 1rem; } + .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); gap: 1rem; } + .flash { padding: 0.75rem 1rem; border-left: 0.35rem solid; } + .flash.error { border-color: #b91c1c; } + .flash.success { border-color: #0f766e; } + table { width: 100%; } + td form { margin: 0; } + input[type="text"], input[type="password"], input[type="file"], select { width: 100%; } + @media (max-width: 760px) { + .topbar { align-items: flex-start; } + .topbar nav { flex-wrap: wrap; } + } + </style> +</head> +<body> +<main> + <header class="topbar"> + <h1>{{ app_name }}</h1> + <nav> + <a href="{{ url_for('dashboard') }}">dashboard</a> + {% if current_user %} + <span class="quiet">signed in as {{ current_user['username'] }} ({{ current_user['role'] }})</span> + <form method="post" action="{{ url_for('logout') }}" class="inline"> + <button type="submit">logout</button> + </form> + {% else %} + <a href="{{ url_for('login') }}">login</a> + {% endif %} + </nav> + </header> + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + <section class="stack"> + {% for category, message in messages %} + <article class="flash {{ category }}">{{ message }}</article> + {% endfor %} + </section> + {% endif %} + {% endwith %} + {{ body | safe }} +</main> +</body> +</html> +""" + + +SETUP_BODY_TEMPLATE = """ +<section class="stack"> + <p>first startup detected. create the initial admin account to continue.</p> + <form method="post" action="{{ url_for('setup_submit') }}" class="stack" autocomplete="off"> + <label> + username + <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" placeholder="admin" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + </label> + <label> + password + <input name="password" type="password" required minlength="2" autocomplete="new-password"> + </label> + <label> + confirm password + <input name="confirm" type="password" required minlength="2" autocomplete="new-password"> + </label> + <button type="submit">create admin account</button> + </form> +</section> +""" + + +LOGIN_BODY_TEMPLATE = """ +<section class="stack"> + <p>sign in to manage uploads and slugs.</p> + <form method="post" action="{{ url_for('login_submit') }}" class="stack" autocomplete="off"> + <label> + username + <input name="username" type="text" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + </label> + <label> + password + <input name="password" type="password" required minlength="2" autocomplete="current-password"> + </label> + <button type="submit">sign in</button> + </form> +</section> +""" + + +DASHBOARD_BODY_TEMPLATE = """ +<section class="stack"> + <h2>upload static site</h2> + <p>upload one archive containing your built static files. users always get a random 40 character slug.</p> + <p class="quiet">accepted formats: {{ allowed_suffixes | join(', ') }}</p> + <form method="post" action="{{ url_for('upload_site') }}" enctype="multipart/form-data" class="stack"> + <label> + archive file + <input type="file" name="archive" required> + </label> + <button type="submit">upload and publish</button> + </form> +</section> + +<section class="stack"> + <h2>{% if is_admin %}all uploaded sites{% else %}your uploaded sites{% endif %}</h2> + {% if sites %} + <table> + <thead> + <tr> + <th>id</th> + <th>slug</th> + {% if is_admin %}<th>owner</th>{% endif %} + <th>archive</th> + <th>created</th> + <th>actions</th> + </tr> + </thead> + <tbody> + {% for site in sites %} + <tr> + <td>{{ site['id'] }}</td> + <td> + {% if is_admin %} + <form method="post" action="{{ url_for('admin_update_slug', site_id=site['id']) }}" class="stack"> + <input type="text" name="slug" value="{{ site['slug'] }}" required minlength="1" maxlength="128" pattern="[a-z0-9][a-z0-9-]*"> + <button type="submit">rename slug</button> + </form> + {% else %} + {{ site['slug'] }} + {% endif %} + </td> + {% if is_admin %}<td>{{ site['owner_username'] }}</td>{% endif %} + <td>{{ site['original_filename'] }}</td> + <td>{{ site['created_at'] }}</td> + <td> + <a href="/s/{{ site['slug'] }}/" target="_blank" rel="noopener">open</a> + <form method="post" action="{{ url_for('delete_site', site_id=site['id']) }}" class="inline" onsubmit="return confirm('delete this site?');"> + <button type="submit">delete</button> + </form> + </td> + </tr> + {% endfor %} + </tbody> + </table> + {% else %} + <p>no sites uploaded yet.</p> + {% endif %} +</section> + +{% if is_admin %} +<section class="grid"> + <article class="stack"> + <h2>create user</h2> + <form method="post" action="{{ url_for('admin_create_user') }}" class="stack" autocomplete="off"> + <label> + username + <input type="text" name="username" required minlength="2" maxlength="64" pattern="[a-z0-9_.-]+" autocomplete="new-password" autocapitalize="none" autocorrect="off" spellcheck="false" data-lpignore="true" readonly onfocus="this.removeAttribute('readonly');"> + </label> + <label> + password + <input type="password" name="password" required minlength="2" autocomplete="new-password"> + </label> + <label> + role + <select name="role" required> + <option value="user" selected>user</option> + <option value="admin">admin</option> + </select> + </label> + <button type="submit">create user</button> + </form> + </article> + <article class="stack"> + <h2>users</h2> + {% if users %} + <table> + <thead> + <tr> + <th>id</th> + <th>username</th> + <th>role</th> + <th>created</th> + <th>actions</th> + </tr> + </thead> + <tbody> + {% for user in users %} + <tr> + <td>{{ user['id'] }}</td> + <td>{{ user['username'] }}</td> + <td>{{ user['role'] }}</td> + <td>{{ user['created_at'] }}</td> + <td> + {% if user['id'] != current_user['id'] %} + <form method="post" action="{{ url_for('admin_delete_user', user_id=user['id']) }}" class="inline" onsubmit="return confirm('delete this user and all owned sites?');"> + <button type="submit">delete</button> + </form> + {% else %} + current account + {% endif %} + </td> + </tr> + {% endfor %} + </tbody> + </table> + {% else %} + <p>no users found.</p> + {% endif %} + </article> +</section> +{% endif %} +""" + + +@dataclass(frozen=True) +class AppConfig: + base_dir: Path + app_name: str + db_path: Path + sites_dir: Path + mojicrypt_bin: Path + bind: str + port: int + + +ConnectFn = Callable[[], sqlite3.Connection] + + +def slug_is_valid(slug: str) -> bool: + return bool(slug) and bool(SLUG_RE.fullmatch(slug)) + + +def detect_archive_suffix(filename: str) -> Optional[str]: + lower = filename.lower() + for suffix in ARCHIVE_SUFFIXES: + if lower.endswith(suffix): + return suffix + return None + + +def normalize_archive_member_path(raw_name: str) -> Optional[Path]: + name = raw_name.replace("\\", "/").strip() + if not name: + return None + while name.startswith("./"): + name = name[2:] + if not name: + return None + if name.startswith("/"): + raise ValueError("archive contains absolute paths") + parts = [part for part in name.split("/") if part not in ("", ".")] + if not parts: + return None + if any(part == ".." for part in parts): + raise ValueError("archive contains parent path traversal") + if ":" in parts[0]: + raise ValueError("archive contains invalid drive path") + return Path(*parts) + + +def ensure_path_under(root: Path, candidate: Path) -> None: + root_real = root.resolve() + candidate_real = candidate.resolve() + candidate_real.relative_to(root_real) + + +def rewrite_root_path(path_without_leading_slash: str, slug: str) -> str: + value = path_without_leading_slash + lowered = value.lower() + if value.startswith("/"): + return "/" + value + if lowered.startswith("api/"): + return "/" + value + if lowered.startswith("app/"): + return "/" + value + if lowered.startswith("s/"): + return "/" + value + if lowered.startswith("_site/"): + return "/" + value + if not value: + return f"/s/{slug}/" + return f"/s/{slug}/{value}" + + +def rewrite_html_for_slug(html_text: str, slug: str) -> str: + def repl(match: re.Match[str]) -> str: + attr = match.group(1) + quote = match.group(2) + path_value = match.group(3) + rewritten = rewrite_root_path(path_value, slug) + return f"{attr}={quote}{rewritten}{quote}" + + updated = ROOT_ATTR_RE.sub(repl, html_text) + if "<base " not in updated.lower(): + head_match = re.search(r"(?i)<head[^>]*>", updated) + if head_match: + base_tag = f"\n <base href=\"/s/{slug}/\">" + insert_at = head_match.end() + updated = updated[:insert_at] + base_tag + updated[insert_at:] + return updated + + +def rewrite_css_for_slug(css_text: str, slug: str) -> str: + def repl(match: re.Match[str]) -> str: + quote = match.group(1) + path_value = match.group(2) + rewritten = rewrite_root_path(path_value, slug) + return f"url({quote}{rewritten}{quote})" + + return CSS_URL_RE.sub(repl, css_text) + + +def looks_like_spa_route(subpath: str) -> bool: + if not subpath: + return True + name = Path(subpath).name + return "." not in name + + +def random_slug(length: int = 40) -> str: + alphabet = "abcdefghijklmnopqrstuvwxyz0123456789" + return "".join(secrets.choice(alphabet) for _ in range(length)) + + +def reserve_random_slug(connect_db: ConnectFn) -> str: + for _ in range(30): + slug = random_slug(40) + with connect_db() as conn: + row = conn.execute("SELECT 1 FROM sites WHERE slug = ?", (slug,)).fetchone() + if not row: + return slug + raise RuntimeError("failed to reserve a unique slug") + + +def extract_slug_from_referer(referer: Optional[str]) -> Optional[str]: + if not referer: + return None + try: + path = urlparse(referer).path + except ValueError: + return None + for prefix in ("/s/", "/_site/"): + if path.startswith(prefix): + remainder = path[len(prefix) :] + slug = remainder.split("/", 1)[0] + if slug_is_valid(slug): + return slug + return None + + +def extract_zip_secure(archive_path: Path, destination: Path) -> None: + total_size = 0 + total_files = 0 + with zipfile.ZipFile(archive_path, "r") as zf: + for info in zf.infolist(): + member_path = normalize_archive_member_path(info.filename) + if member_path is None: + continue + + # reject symlink entries + mode = (info.external_attr >> 16) & 0o170000 + if mode == stat.S_IFLNK: + raise ValueError("zip symlinks are not allowed") + + target = destination / member_path + ensure_path_under(destination, target) + if info.is_dir(): + target.mkdir(parents=True, exist_ok=True) + continue + + total_files += 1 + if total_files > MAX_EXTRACTED_FILES: + raise ValueError("archive has too many files") + + total_size += int(info.file_size) + if total_size > MAX_EXTRACTED_BYTES: + raise ValueError("extracted archive size exceeds limit") + + target.parent.mkdir(parents=True, exist_ok=True) + with zf.open(info, "r") as src, open(target, "wb") as dst: + shutil.copyfileobj(src, dst, length=1024 * 1024) + + +def extract_tar_secure(archive_path: Path, destination: Path) -> None: + total_size = 0 + total_files = 0 + with tarfile.open(archive_path, "r:*") as tf: + for member in tf.getmembers(): + member_path = normalize_archive_member_path(member.name) + if member_path is None: + continue + if member.issym() or member.islnk() or member.isdev(): + raise ValueError("tar links and device files are not allowed") + + target = destination / member_path + ensure_path_under(destination, target) + + if member.isdir(): + target.mkdir(parents=True, exist_ok=True) + continue + + if not member.isfile(): + raise ValueError("unsupported tar member type") + + total_files += 1 + if total_files > MAX_EXTRACTED_FILES: + raise ValueError("archive has too many files") + + total_size += int(member.size) + if total_size > MAX_EXTRACTED_BYTES: + raise ValueError("extracted archive size exceeds limit") + + source = tf.extractfile(member) + if source is None: + raise ValueError("failed to extract tar file member") + + target.parent.mkdir(parents=True, exist_ok=True) + with source, open(target, "wb") as dst: + shutil.copyfileobj(source, dst, length=1024 * 1024) + + +def extract_archive_secure(archive_path: Path, suffix: str, destination: Path) -> None: + if suffix == ".zip": + extract_zip_secure(archive_path, destination) + return + extract_tar_secure(archive_path, destination) + + +def find_site_root(extracted_dir: Path) -> Path: + root_index = extracted_dir / "index.html" + if root_index.is_file(): + return extracted_dir + + candidates = sorted( + extracted_dir.rglob("index.html"), + key=lambda path: len(path.relative_to(extracted_dir).parts), + ) + if not candidates: + raise ValueError("archive must include an index.html file") + return candidates[0].parent + + +def create_app(base_dir: Optional[Path] = None) -> Flask: + project_dir = Path(base_dir or Path(__file__).parent).resolve() + app_name = os.getenv("SHIM_APP_NAME", "shim").strip() or "shim" + db_path = project_dir / "data" / "shim.db" + sites_dir = project_dir / "data" / "sites" + default_mojicrypt = project_dir / "vendor" / "mojicrypt" + mojicrypt_env = os.getenv("SHIM_MOJICRYPT_BIN", str(default_mojicrypt)) + mojicrypt_bin = Path(mojicrypt_env).expanduser() + if not mojicrypt_bin.is_absolute(): + mojicrypt_bin = (project_dir / mojicrypt_bin).resolve() + + cfg = AppConfig( + base_dir=project_dir, + app_name=app_name, + db_path=db_path, + sites_dir=sites_dir, + mojicrypt_bin=mojicrypt_bin, + bind=os.getenv("SHIM_BIND", "0.0.0.0"), + port=int(os.getenv("SHIM_PORT", "6767")), + ) + + cfg.db_path.parent.mkdir(parents=True, exist_ok=True) + cfg.sites_dir.mkdir(parents=True, exist_ok=True) + + app = Flask(__name__, static_folder=None) + app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_BYTES + app.config["SECRET_KEY"] = os.getenv("SHIM_SECRET_KEY", secrets.token_hex(32)) + app.config["SHIM_PORT"] = cfg.port + app.config["SHIM_BIND"] = cfg.bind + app.config["SHIM_APP_NAME"] = cfg.app_name + app.config["SHIM_MOJICRYPT_BIN"] = str(cfg.mojicrypt_bin) + + def connect_db() -> sqlite3.Connection: + conn = sqlite3.connect(str(cfg.db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + return conn + + with connect_db() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + role TEXT NOT NULL CHECK (role IN ('admin', 'user')), + encrypted_challenge TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS sessions ( + token TEXT PRIMARY KEY, + user_id INTEGER NOT NULL, + expires_at INTEGER NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS sites ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + owner_user_id INTEGER NOT NULL, + slug TEXT UNIQUE NOT NULL, + storage_key TEXT UNIQUE NOT NULL, + original_filename TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (owner_user_id) REFERENCES users(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id); + CREATE INDEX IF NOT EXISTS idx_sessions_expiry ON sessions(expires_at); + CREATE INDEX IF NOT EXISTS idx_sites_owner ON sites(owner_user_id); + """ + ) + + # auth provider is injected as a single backend to keep swap-over simple. + auth_backend: AuthBackend = LocalMojicryptAuthBackend( + connect_db=connect_db, + mojicrypt_bin=cfg.mojicrypt_bin, + ) + + def render_page(title: str, body_template: str, **context: object) -> str: + body = render_template_string(body_template, **context) + return render_template_string( + SHELL_TEMPLATE, + title=title, + app_name=cfg.app_name, + current_user=g.current_user, + body=body, + ) + + def create_session(user_id: int) -> str: + token = secrets.token_urlsafe(48) + now = int(time.time()) + with connect_db() as conn: + conn.execute( + """ + INSERT INTO sessions (token, user_id, expires_at, created_at) + VALUES (?, ?, ?, ?) + """, + (token, user_id, now + SESSION_TTL_SECONDS, now), + ) + return token + + def destroy_session(token: str) -> None: + with connect_db() as conn: + conn.execute("DELETE FROM sessions WHERE token = ?", (token,)) + + def user_from_token(token: str) -> Optional[sqlite3.Row]: + now = int(time.time()) + with connect_db() as conn: + conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,)) + return conn.execute( + """ + SELECT u.id, u.username, u.role + FROM sessions s + JOIN users u ON u.id = s.user_id + WHERE s.token = ? AND s.expires_at > ? + """, + (token, now), + ).fetchone() + + def require_auth() -> Optional[Response]: + if g.current_user is not None: + return None + flash("login required", "error") + return redirect(url_for("login")) + + def require_admin() -> Optional[Response]: + auth_redirect = require_auth() + if auth_redirect is not None: + return auth_redirect + if g.current_user["role"] != "admin": + flash("admin access required", "error") + return redirect(url_for("dashboard")) + return None + + def get_site_by_slug(slug: str) -> Optional[sqlite3.Row]: + with connect_db() as conn: + return conn.execute( + "SELECT id, owner_user_id, slug, storage_key FROM sites WHERE slug = ?", + (slug,), + ).fetchone() + + def send_site_file(file_path: Path, slug: str) -> Response: + ext = file_path.suffix.lower() + mime, _ = mimetypes.guess_type(str(file_path)) + if ext == ".html": + html = file_path.read_text(encoding="utf-8", errors="replace") + body = rewrite_html_for_slug(html, slug) + response = make_response(body) + response.mimetype = "text/html" + response.headers["Cache-Control"] = "no-cache" + elif ext == ".css": + css = file_path.read_text(encoding="utf-8", errors="replace") + body = rewrite_css_for_slug(css, slug) + response = make_response(body) + response.mimetype = "text/css" + response.headers["Cache-Control"] = "public, max-age=300" + else: + response = make_response( + send_file(file_path, mimetype=mime or "application/octet-stream", conditional=True) + ) + if ext in {".js", ".mjs"}: + response.headers["Cache-Control"] = "public, max-age=300" + elif ext in {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico"}: + response.headers["Cache-Control"] = "public, max-age=86400, immutable" + else: + response.headers["Cache-Control"] = "public, max-age=3600" + response.headers["X-Content-Type-Options"] = "nosniff" + response.set_cookie(ACTIVE_SITE_COOKIE, slug, max_age=1800, samesite="Lax", path="/") + return response + + def resolve_site_path(site_root: Path, subpath: str) -> Optional[Path]: + cleaned = subpath.lstrip("/") + candidate = (site_root / cleaned).resolve() + try: + candidate.relative_to(site_root.resolve()) + except ValueError: + return None + return candidate + + def serve_site_resource(site: sqlite3.Row, subpath: str, allow_spa: bool = True) -> Response: + site_root = (cfg.sites_dir / site["storage_key"]).resolve() + if not site_root.is_dir(): + abort(404) + + target = resolve_site_path(site_root, subpath) + if target is None: + abort(404) + + if target.is_dir(): + index_path = target / "index.html" + if index_path.is_file(): + return send_site_file(index_path, site["slug"]) + elif target.is_file(): + return send_site_file(target, site["slug"]) + + if allow_spa and looks_like_spa_route(subpath): + index_path = site_root / "index.html" + if index_path.is_file(): + return send_site_file(index_path, site["slug"]) + + abort(404) + + @app.before_request + def load_current_user() -> None: + g.current_user = None + token = request.cookies.get(SESSION_COOKIE) + if not token: + return + user = user_from_token(token) + if user is not None: + g.current_user = user + + @app.errorhandler(413) + def too_large(_: Exception) -> tuple[str, int]: + return "upload exceeds configured max size", 413 + + @app.get("/") + def root() -> Response: + return redirect(url_for("dashboard")) + + @app.get("/healthz") + def healthz() -> tuple[str, int]: + return "ok", 200 + + @app.get("/app") + def dashboard() -> Response: + if auth_backend.bootstrap_required(): + return redirect(url_for("setup")) + auth_redirect = require_auth() + if auth_redirect is not None: + return auth_redirect + + is_admin = g.current_user["role"] == "admin" + with connect_db() as conn: + if is_admin: + sites = conn.execute( + """ + SELECT s.id, s.slug, s.original_filename, s.created_at, u.username AS owner_username + FROM sites s + JOIN users u ON u.id = s.owner_user_id + ORDER BY s.id DESC + """ + ).fetchall() + users = conn.execute( + "SELECT id, username, role, created_at FROM users ORDER BY id ASC" + ).fetchall() + else: + sites = conn.execute( + """ + SELECT id, slug, original_filename, created_at + FROM sites + WHERE owner_user_id = ? + ORDER BY id DESC + """, + (g.current_user["id"],), + ).fetchall() + users = [] + + return render_page( + title="dashboard", + body_template=DASHBOARD_BODY_TEMPLATE, + allowed_suffixes=ARCHIVE_SUFFIXES, + is_admin=is_admin, + sites=sites, + users=users, + current_user=g.current_user, + ) + + @app.get("/app/setup") + def setup() -> Response: + if not auth_backend.bootstrap_required(): + return redirect(url_for("dashboard")) + return render_page(title="setup", body_template=SETUP_BODY_TEMPLATE) + + @app.post("/app/setup") + def setup_submit() -> Response: + if not auth_backend.bootstrap_required(): + flash("setup already completed", "error") + return redirect(url_for("login")) + + username = request.form.get("username", "") + password = request.form.get("password", "") + confirm = request.form.get("confirm", "") + + if password != confirm: + flash("passwords do not match", "error") + return redirect(url_for("setup")) + + ok, message = auth_backend.create_user(username=username, password=password, role="admin") + if not ok: + flash(message, "error") + return redirect(url_for("setup")) + + user = auth_backend.authenticate(username=username, password=password) + if user is None: + flash("admin created but login failed", "error") + return redirect(url_for("login")) + + token = create_session(user["id"]) + response = redirect(url_for("dashboard")) + response.set_cookie( + SESSION_COOKIE, + token, + max_age=SESSION_TTL_SECONDS, + httponly=True, + samesite="Lax", + path="/", + ) + flash("admin account created", "success") + return response + + @app.get("/app/login") + def login() -> Response: + if auth_backend.bootstrap_required(): + return redirect(url_for("setup")) + if g.current_user is not None: + return redirect(url_for("dashboard")) + return render_page(title="login", body_template=LOGIN_BODY_TEMPLATE) + + @app.post("/app/login") + def login_submit() -> Response: + if auth_backend.bootstrap_required(): + flash("run setup first", "error") + return redirect(url_for("setup")) + + username = request.form.get("username", "") + password = request.form.get("password", "") + user = auth_backend.authenticate(username=username, password=password) + if user is None: + flash("invalid credentials", "error") + return redirect(url_for("login")) + + token = create_session(user["id"]) + response = redirect(url_for("dashboard")) + response.set_cookie( + SESSION_COOKIE, + token, + max_age=SESSION_TTL_SECONDS, + httponly=True, + samesite="Lax", + path="/", + ) + flash("signed in", "success") + return response + + @app.post("/app/logout") + def logout() -> Response: + token = request.cookies.get(SESSION_COOKIE) + if token: + destroy_session(token) + response = redirect(url_for("login")) + response.set_cookie(SESSION_COOKIE, "", max_age=0, httponly=True, samesite="Lax", path="/") + return response + + @app.post("/app/upload") + def upload_site() -> Response: + auth_redirect = require_auth() + if auth_redirect is not None: + return auth_redirect + + archive = request.files.get("archive") + if archive is None or not archive.filename: + flash("select an archive file", "error") + return redirect(url_for("dashboard")) + + suffix = detect_archive_suffix(archive.filename) + if suffix is None: + flash("unsupported archive format", "error") + return redirect(url_for("dashboard")) + + temp_dir = Path(tempfile.mkdtemp(prefix="shim-upload-")) + archive_path = temp_dir / f"upload{suffix}" + extract_dir = temp_dir / "extract" + extract_dir.mkdir(parents=True, exist_ok=True) + + final_dir: Optional[Path] = None + try: + archive.save(archive_path) + extract_archive_secure(archive_path, suffix, extract_dir) + site_root = find_site_root(extract_dir) + + slug = reserve_random_slug(connect_db) + storage_key = secrets.token_hex(24) + final_dir = cfg.sites_dir / storage_key + + shutil.copytree(site_root, final_dir) + + with connect_db() as conn: + conn.execute( + """ + INSERT INTO sites (owner_user_id, slug, storage_key, original_filename, created_at, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + """, + ( + int(g.current_user["id"]), + slug, + storage_key, + Path(archive.filename).name, + ), + ) + except ValueError as exc: + flash(str(exc), "error") + if final_dir is not None: + shutil.rmtree(final_dir, ignore_errors=True) + return redirect(url_for("dashboard")) + except sqlite3.IntegrityError: + flash("failed to save site record", "error") + if final_dir is not None: + shutil.rmtree(final_dir, ignore_errors=True) + return redirect(url_for("dashboard")) + except (tarfile.TarError, zipfile.BadZipFile): + flash("archive is invalid or corrupted", "error") + if final_dir is not None: + shutil.rmtree(final_dir, ignore_errors=True) + return redirect(url_for("dashboard")) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + flash("site uploaded", "success") + return redirect(url_for("dashboard")) + + @app.post("/app/sites/<int:site_id>/delete") + def delete_site(site_id: int) -> Response: + auth_redirect = require_auth() + if auth_redirect is not None: + return auth_redirect + + with connect_db() as conn: + site = conn.execute( + "SELECT id, owner_user_id, slug, storage_key FROM sites WHERE id = ?", + (site_id,), + ).fetchone() + if site is None: + flash("site not found", "error") + return redirect(url_for("dashboard")) + + is_admin = g.current_user["role"] == "admin" + if not is_admin and int(site["owner_user_id"]) != int(g.current_user["id"]): + flash("not allowed", "error") + return redirect(url_for("dashboard")) + + conn.execute("DELETE FROM sites WHERE id = ?", (site_id,)) + + shutil.rmtree(cfg.sites_dir / site["storage_key"], ignore_errors=True) + flash("site deleted", "success") + return redirect(url_for("dashboard")) + + @app.post("/app/admin/users/create") + def admin_create_user() -> Response: + admin_redirect = require_admin() + if admin_redirect is not None: + return admin_redirect + + username = request.form.get("username", "") + password = request.form.get("password", "") + role = request.form.get("role", "user").strip().lower() + + ok, message = auth_backend.create_user(username=username, password=password, role=role) + flash(message, "success" if ok else "error") + return redirect(url_for("dashboard")) + + @app.post("/app/admin/users/<int:user_id>/delete") + def admin_delete_user(user_id: int) -> Response: + admin_redirect = require_admin() + if admin_redirect is not None: + return admin_redirect + + if int(g.current_user["id"]) == int(user_id): + flash("you cannot delete your own account", "error") + return redirect(url_for("dashboard")) + + with connect_db() as conn: + user = conn.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)).fetchone() + if user is None: + flash("user not found", "error") + return redirect(url_for("dashboard")) + + owned_site_keys = conn.execute( + "SELECT storage_key FROM sites WHERE owner_user_id = ?", + (user_id,), + ).fetchall() + + conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) + + for row in owned_site_keys: + shutil.rmtree(cfg.sites_dir / row["storage_key"], ignore_errors=True) + + flash("user deleted", "success") + return redirect(url_for("dashboard")) + + @app.post("/app/admin/sites/<int:site_id>/slug") + def admin_update_slug(site_id: int) -> Response: + admin_redirect = require_admin() + if admin_redirect is not None: + return admin_redirect + + new_slug = request.form.get("slug", "").strip().lower() + if not slug_is_valid(new_slug): + flash("invalid slug format", "error") + return redirect(url_for("dashboard")) + + try: + with connect_db() as conn: + cursor = conn.execute( + """ + UPDATE sites + SET slug = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + (new_slug, site_id), + ) + if cursor.rowcount == 0: + flash("site not found", "error") + return redirect(url_for("dashboard")) + except sqlite3.IntegrityError: + flash("slug is already in use", "error") + return redirect(url_for("dashboard")) + + flash("slug updated", "success") + return redirect(url_for("dashboard")) + + @app.get("/s/<slug>/") + def serve_site_root(slug: str) -> Response: + if not slug_is_valid(slug): + abort(404) + site = get_site_by_slug(slug) + if site is None: + abort(404) + return serve_site_resource(site, subpath="", allow_spa=True) + + @app.get("/s/<slug>/<path:subpath>") + def serve_site_subpath(slug: str, subpath: str) -> Response: + if not slug_is_valid(slug): + abort(404) + site = get_site_by_slug(slug) + if site is None: + abort(404) + return serve_site_resource(site, subpath=subpath, allow_spa=True) + + @app.get("/_site/<slug>/") + def serve_site_alias_root(slug: str) -> Response: + return serve_site_root(slug) + + @app.get("/_site/<slug>/<path:subpath>") + def serve_site_alias_subpath(slug: str, subpath: str) -> Response: + return serve_site_subpath(slug, subpath) + + @app.get("/<path:subpath>") + def site_root_relative_fallback(subpath: str) -> Response: + # handle absolute root paths from hosted apps by resolving the slug from referer/cookie + first = subpath.split("/", 1)[0].lower() + if first in {"app", "api", "s", "_site", "healthz"}: + abort(404) + + slug = extract_slug_from_referer(request.headers.get("Referer")) + if slug is None: + cookie_slug = request.cookies.get(ACTIVE_SITE_COOKIE) + if cookie_slug and slug_is_valid(cookie_slug): + slug = cookie_slug + + if slug is None: + abort(404) + + site = get_site_by_slug(slug) + if site is None: + abort(404) + return serve_site_resource(site, subpath=subpath, allow_spa=True) + + return app
\ No newline at end of file diff --git a/vendor/mojicrypt b/vendor/mojicrypt new file mode 100755 index 0000000..4f5c048 --- /dev/null +++ b/vendor/mojicrypt @@ -0,0 +1,351 @@ +#!/usr/bin/env python + +# mojicrypt — aes-256-gcm encryption with unicode-encoded output +# turns your secrets into a wall of emojis and symbols +# works on text, binary files, images, whatever you throw at it + +import sys +import os +import argparse +import secrets +import time +import uuid + +try: + from Crypto.Cipher import AES + from Crypto.Protocol.KDF import scrypt + from Crypto.Random import get_random_bytes +except ModuleNotFoundError: + from Cryptodome.Cipher import AES + from Cryptodome.Protocol.KDF import scrypt + from Cryptodome.Random import get_random_bytes + +APP_NAME = "mojicrypt" +VERSION = "20260303" +KEYFILE_LEN = 8192 # characters of hex entropy written to auto-generated keyfiles + +# params +SCRYPT_N = 2**18 # cpu/memory cost +SCRYPT_R = 8 # block size +SCRYPT_P = 1 # parallelization +SALT_LEN = 16 # 128-bit salt +KEY_LEN = 32 # 256-bit key +NONCE_LEN = 12 # 96-bit nonce (gcm standard) +TAG_LEN = 16 # 128-bit auth tag +HEADER_LEN = SALT_LEN + NONCE_LEN + TAG_LEN # 44 bytes overhead + + +# glyphs +# 256 unique non-alphanumeric unicode symbols +# each byte (0x00-0xFF) maps to one glyph — simple 1:1 encoding +# curated from: emoji faces, nature, animals, food, misc symbols, +# arrows, geometric shapes, and math operators + +def _build_glyph_table(): + """construct the 256-char glyph table from curated unicode ranges""" + codepoints = [] + + # emoji faces — smileys and expressions (64) + codepoints.extend(range(0x1F600, 0x1F640)) + + # weather and nature — cyclone through shooting star (32) + codepoints.extend(range(0x1F300, 0x1F320)) + + # animals — rat through blowfish (32) + codepoints.extend(range(0x1F400, 0x1F420)) + + # food — tomato through shrimp (32) + codepoints.extend(range(0x1F345, 0x1F365)) + + # misc symbols — sun through pointing fingers (32) + codepoints.extend(range(0x2600, 0x2620)) + + # arrows — left through rightwards paired (16) + codepoints.extend(range(0x2190, 0x21A0)) + + # geometric shapes — black square through white vertical rectangle (16) + codepoints.extend(range(0x25A0, 0x25B0)) + + # mathematical operators — for all through right angle (32) + codepoints.extend(range(0x2200, 0x2220)) + + table = "".join(chr(cp) for cp in codepoints) + + # sanity checks + assert len(table) == 256, f"glyph table has {len(table)} entries, expected 256" + assert len(set(table)) == 256, "duplicate glyphs detected in table" + for g in table: + assert not g.isalnum(), f"alphanumeric char found: {g} (U+{ord(g):04X})" + assert not g.isspace(), f"whitespace char found: {g} (U+{ord(g):04X})" + # no joiners (ZWJ, ZWNJ, BOM) + assert ord(g) not in (0x200D, 0x200C, 0xFEFF), \ + f"joiner/bom found: U+{ord(g):04X}" + + return table + + +GLYPH_TABLE = _build_glyph_table() +GLYPH_TO_BYTE = {g: i for i, g in enumerate(GLYPH_TABLE)} + + +# encode/decode + +def bytes_to_glyphs(data: bytes) -> str: + """encode raw bytes as unicode glyphs""" + return "".join(GLYPH_TABLE[b] for b in data) + + +def glyphs_to_bytes(text: str) -> bytes: + """decode unicode glyphs back to raw bytes""" + result = [] + for g in text: + if g not in GLYPH_TO_BYTE: + print( + f"error: invalid glyph in ciphertext: {g} (U+{ord(g):04X})", + file=sys.stderr, + ) + sys.exit(1) + result.append(GLYPH_TO_BYTE[g]) + return bytes(result) + + +# crypto operations + +def derive_key(passphrase: str, salt: bytes) -> bytes: + """derive a 256-bit key from passphrase using scrypt""" + return scrypt( + passphrase.encode("utf-8"), + salt, + key_len=KEY_LEN, + N=SCRYPT_N, + r=SCRYPT_R, + p=SCRYPT_P, + ) + + +def encrypt_bytes(data: bytes, passphrase: str) -> str: + """encrypt raw bytes with aes-256-gcm, return unicode-encoded ciphertext""" + salt = get_random_bytes(SALT_LEN) + key = derive_key(passphrase, salt) + + nonce = get_random_bytes(NONCE_LEN) + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(data) + + # wire format: salt(16) || nonce(12) || tag(16) || ciphertext(N) + packed = salt + nonce + tag + ciphertext + return bytes_to_glyphs(packed) + + +def decrypt_bytes(encoded: str, passphrase: str) -> bytes: + """decode unicode glyphs and decrypt with aes-256-gcm, return raw bytes""" + raw = glyphs_to_bytes(encoded) + + if len(raw) < HEADER_LEN: + print("error: ciphertext too short to be valid", file=sys.stderr) + sys.exit(1) + + # unpack: salt(16) || nonce(12) || tag(16) || ciphertext(N) + salt = raw[:SALT_LEN] + nonce = raw[SALT_LEN:SALT_LEN + NONCE_LEN] + tag = raw[SALT_LEN + NONCE_LEN:HEADER_LEN] + ciphertext = raw[HEADER_LEN:] + + key = derive_key(passphrase, salt) + + try: + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + return cipher.decrypt_and_verify(ciphertext, tag) + except (ValueError, KeyError): + print( + "error: decryption failed — wrong passphrase or corrupted data", + file=sys.stderr, + ) + sys.exit(1) + + +# text convenience wrappers + +def encrypt_text(plaintext: str, passphrase: str) -> str: + """encode text to utf-8, encrypt, return glyph string""" + return encrypt_bytes(plaintext.encode("utf-8"), passphrase) + + +def decrypt_text(encoded: str, passphrase: str) -> str: + """decrypt and decode result as utf-8 text""" + raw = decrypt_bytes(encoded, passphrase) + try: + return raw.decode("utf-8") + except UnicodeDecodeError: + print( + "error: decrypted data is not valid utf-8 — " + "did you mean to use --file/-f to write to a binary output?", + file=sys.stderr, + ) + sys.exit(1) + + +# keyfile helpers + +def generate_keyfile() -> tuple[str, str]: + """generate an 8192-char hex key, write it to a .ukey file, return (key, path)""" + key = secrets.token_hex(KEYFILE_LEN // 2) # token_hex(N) gives 2*N hex chars + epoch = int(time.time()) + uid = uuid.uuid4() + filename = f"mojicrypt-key{epoch}_{uid}.ukey" + with open(filename, "w", encoding="utf-8") as fh: + fh.write(key) + # lock it down — readable only by the owner + os.chmod(filename, 0o600) + return key, filename + + +def load_keyfile(path: str) -> str: + """read a .ukey file and return its contents as the passphrase""" + if not os.path.isfile(path): + print(f"error: keyfile not found: {path}", file=sys.stderr) + sys.exit(1) + with open(path, "r", encoding="utf-8") as fh: + key = fh.read().strip() + if not key: + print(f"error: keyfile is empty: {path}", file=sys.stderr) + sys.exit(1) + return key + + +# cli + + +def main(): + parser = argparse.ArgumentParser( + prog=APP_NAME, + description="aes-256-gcm encryption with unicode-encoded output", + ) + parser.add_argument( + "-v", "--version", + action="version", + version=f"{APP_NAME} {VERSION}", + ) + parser.add_argument( + "mode", + choices=["encrypt", "decrypt"], + help="operation to perform", + ) + parser.add_argument( + "text", + nargs="?", + help="text to encrypt/decrypt (reads stdin if omitted, ignored when -f is used)", + ) + parser.add_argument( + "-p", "--passphrase", + help="passphrase (prompted securely if omitted)", + ) + parser.add_argument( + "-f", "--file", + metavar="PATH", + help="input file to encrypt/decrypt", + ) + parser.add_argument( + "-o", "--output", + metavar="PATH", + help=( + "output file path " + "(encrypt default: <input>.uc | decrypt default: strips .uc or appends .dec)" + ), + ) + parser.add_argument( + "-k", "--keyfile", + metavar="PATH", + help="load passphrase from a .ukey file instead of -p", + ) + + args = parser.parse_args() + + # resolve passphrase — priority: -p > -k > auto-generate (encrypt) / error (decrypt) + if args.passphrase: + passphrase = args.passphrase + elif args.keyfile: + passphrase = load_keyfile(args.keyfile) + print(f"using keyfile: {args.keyfile}", file=sys.stderr) + elif args.mode == "encrypt": + passphrase, kf_path = generate_keyfile() + print(f"no passphrase given - generated keyfile: {kf_path}", file=sys.stderr) + print("keep that file safe — you'll need it to decrypt", file=sys.stderr) + else: + print( + "error: no passphrase provided for decryption.\n" + " use -p <passphrase> or -k <keyfile.ukey>", + file=sys.stderr, + ) + sys.exit(1) + + # file mode + if args.file: + in_path = args.file + + if not os.path.isfile(in_path): + print(f"error: file not found: {in_path}", file=sys.stderr) + sys.exit(1) + + if args.mode == "encrypt": + # default output: <input>.uc + out_path = args.output or (in_path + ".uc") + + with open(in_path, "rb") as fh: + data = fh.read() + + glyphs = encrypt_bytes(data, passphrase) + + with open(out_path, "w", encoding="utf-8") as fh: + fh.write(glyphs) + + print(f"encrypted → {out_path}", file=sys.stderr) + + else: # decrypt + # default output: strip .uc if present, else append .dec + if args.output: + out_path = args.output + elif in_path.endswith(".uc"): + out_path = in_path[:-3] + else: + out_path = in_path + ".dec" + + with open(in_path, "r", encoding="utf-8") as fh: + encoded = fh.read().strip() + + raw = decrypt_bytes(encoded, passphrase) + + with open(out_path, "wb") as fh: + fh.write(raw) + + print(f"decrypted → {out_path}", file=sys.stderr) + + return + + # ── text / stdin mode ────────────────────────────────────── + if args.text: + text = args.text + elif not sys.stdin.isatty(): + text = sys.stdin.read() + else: + if args.mode == "encrypt": + print("enter plaintext (ctrl+d to finish):", file=sys.stderr) + else: + print("paste ciphertext (ctrl+d to finish):", file=sys.stderr) + text = sys.stdin.read() + + if not text: + print("error: no input provided", file=sys.stderr) + sys.exit(1) + + if args.mode == "encrypt": + result = encrypt_text(text, passphrase) + print(result) + else: + # strip whitespace from pasted ciphertext (none of our glyphs are whitespace) + result = decrypt_text(text.strip(), passphrase) + print(result) + + +if __name__ == "__main__": + main() |
