#!/usr/bin/env python3 import os import re import sqlite3 import subprocess import sys import uuid from pathlib import Path from typing import Callable, Optional, Protocol from email_validator import EmailNotValidError, validate_email # fixed challenge text used for passphrase verification. # we store encrypted challenge output instead of storing passwords. AUTH_CHALLENGE = "SHIM_AUTH_VALID" HANDLE_RE = re.compile(r"^[a-z0-9_.-]{2,64}$") UUID_RE = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" ) ConnectFn = Callable[[], sqlite3.Connection] class AuthBackend(Protocol): def bootstrap_required(self) -> bool: ... def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]: ... def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]: ... def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]: ... def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]: ... def normalize_username(username: str) -> str: return username.strip().lower() def normalize_uuid(value: str) -> Optional[str]: candidate = (value or "").strip().lower() if not UUID_RE.fullmatch(candidate): return None return candidate def is_valid_email_username(value: str) -> bool: if len(value) > 254: return False try: validated = validate_email( value, check_deliverability=False, allow_smtputf8=False, allow_display_name=False, ) except EmailNotValidError: return False # keep stored usernames predictable - only accept already normalized forms. return validated.normalized == value def looks_like_python_script(path: Path) -> bool: try: with open(path, "r", encoding="utf-8", errors="ignore") as f: first_line = f.readline(200).lower() except OSError: return False return first_line.startswith("#!") and "python" in first_line class LocalMojicryptAuthBackend: """default auth backend using sqlite and local mojicrypt challenge encryption""" def __init__(self, connect_db: ConnectFn, mojicrypt_bin: Path): self.connect_db = connect_db self.mojicrypt_bin = mojicrypt_bin self.last_error = "" def bootstrap_required(self) -> bool: with self.connect_db() as conn: row = conn.execute("SELECT COUNT(*) AS count FROM users").fetchone() return int(row["count"]) == 0 def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]: username = normalize_username(username) username_error = self._validate_username(username) if username_error: return False, username_error password_error = self._validate_password(password) if password_error: return False, password_error if role not in {"admin", "user"}: return False, "invalid role" # store an encrypted challenge, never the raw password. encrypted = self.encrypt_password(password) if not encrypted: return False, "mojicrypt encryption failed" try: with self.connect_db() as conn: conn.execute( """ INSERT INTO users (user_uuid, username, role, encrypted_challenge, created_at) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) """, (str(uuid.uuid4()), username, role, encrypted), ) return True, "user created" except sqlite3.IntegrityError: return False, "username already exists" def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]: normalized_user_uuid = normalize_uuid(user_uuid) if normalized_user_uuid is None: return False, "invalid user id" normalized = normalize_username(new_username) username_error = self._validate_username(normalized) if username_error: return False, username_error try: with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET username = ? WHERE user_uuid = ?", (normalized, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" except sqlite3.IntegrityError: return False, "username already exists" return True, "username updated" def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]: normalized_user_uuid = normalize_uuid(user_uuid) if normalized_user_uuid is None: return False, "invalid user id" password_error = self._validate_password(new_password) if password_error: return False, password_error # this rotates the encrypted challenge blob for future logins. encrypted = self.encrypt_password(new_password) if not encrypted: return False, "mojicrypt encryption failed" with self.connect_db() as conn: cursor = conn.execute( "UPDATE users SET encrypted_challenge = ? WHERE user_uuid = ?", (encrypted, normalized_user_uuid), ) if cursor.rowcount == 0: return False, "user not found" return True, "password updated" def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]: username = normalize_username(username) with self.connect_db() as conn: user = conn.execute( """ SELECT id, username, role, encrypted_challenge FROM users WHERE username = ? """, (username,), ).fetchone() if not user: return None if not self.verify_password(user["encrypted_challenge"], password): return None return user def encrypt_password(self, password: str) -> Optional[str]: return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password) def _validate_username(self, username: str) -> Optional[str]: if len(username) < 2: return "username must be at least 2 characters" if not username.isascii(): return "username must use ascii characters only" if HANDLE_RE.fullmatch(username): return None if is_valid_email_username(username): return None return ( "username must be a handle [a-z0-9_.-] (2-64 chars) or a valid email" ) def _validate_password(self, password: str) -> Optional[str]: if len(password) < 2: return "password must be at least 2 characters" return None def verify_password(self, encrypted_blob: str, password: str) -> bool: decrypted = self._run_mojicrypt("decrypt", encrypted_blob, password) return decrypted == AUTH_CHALLENGE def _run_mojicrypt(self, command: str, payload: str, passphrase: str) -> Optional[str]: self.last_error = "" if not self.mojicrypt_bin.exists(): self.last_error = f"binary not found at {self.mojicrypt_bin}" return None is_python_script = looks_like_python_script(self.mojicrypt_bin) direct_exec_ok = os.access(self.mojicrypt_bin, os.X_OK) python_exec_ok = bool(sys.executable) and is_python_script if not direct_exec_ok and not python_exec_ok: self.last_error = "binary is not executable" return None # try direct execution first, then python fallback when needed. commands_to_try = [] if direct_exec_ok: commands_to_try.append([ str(self.mojicrypt_bin), "-p", passphrase, command, payload, ]) # some systems do not expose a plain `python` binary for shebang execution. # retrying with the active interpreter avoids false failures for vendored scripts. if python_exec_ok: fallback_cmd = [ sys.executable, str(self.mojicrypt_bin), "-p", passphrase, command, payload, ] if fallback_cmd not in commands_to_try: commands_to_try.append(fallback_cmd) for cmd in commands_to_try: try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=90, ) except FileNotFoundError as exc: self.last_error = str(exc) continue except subprocess.TimeoutExpired: self.last_error = "command timed out" continue if result.returncode == 0: return result.stdout.strip() stderr = (result.stderr or "").strip() if stderr: self.last_error = stderr else: self.last_error = f"command exited with status {result.returncode}" return None