aboutsummaryrefslogtreecommitdiffstats
path: root/auth_backend.py
blob: 721993140ac05669991478e8bd5863ed8d36cb0c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3

import os
import re
import sqlite3
import subprocess
import sys
import uuid
from pathlib import Path
from typing import Callable, Optional, Protocol


AUTH_CHALLENGE = "SHIM_AUTH_VALID"
USERNAME_RE = re.compile(r"^[a-z0-9_.-]{2,64}$")

ConnectFn = Callable[[], sqlite3.Connection]


class AuthBackend(Protocol):
    def bootstrap_required(self) -> bool:
        ...

    def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
        ...

    def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
        ...


def normalize_username(username: str) -> str:
    return username.strip().lower()


def looks_like_python_script(path: Path) -> bool:
    try:
        with open(path, "r", encoding="utf-8", errors="ignore") as f:
            first_line = f.readline(200).lower()
    except OSError:
        return False
    return first_line.startswith("#!") and "python" in first_line


class LocalMojicryptAuthBackend:
    """default auth backend using sqlite and local mojicrypt challenge encryption"""

    def __init__(self, connect_db: ConnectFn, mojicrypt_bin: Path):
        self.connect_db = connect_db
        self.mojicrypt_bin = mojicrypt_bin
        self.last_error = ""

    def bootstrap_required(self) -> bool:
        with self.connect_db() as conn:
            row = conn.execute("SELECT COUNT(*) AS count FROM users").fetchone()
        return int(row["count"]) == 0

    def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
        username = normalize_username(username)
        if not USERNAME_RE.fullmatch(username):
            return False, "username must be lowercase and use [a-z0-9_.-], 2-64 chars"
        if len(password) < 2:
            return False, "password must be at least 2 characters"
        if role not in {"admin", "user"}:
            return False, "invalid role"

        encrypted = self.encrypt_password(password)
        if not encrypted:
            return False, "mojicrypt encryption failed"

        try:
            with self.connect_db() as conn:
                conn.execute(
                    """
                    INSERT INTO users (user_uuid, username, role, encrypted_challenge, created_at)
                    VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
                    """,
                    (str(uuid.uuid4()), username, role, encrypted),
                )
            return True, "user created"
        except sqlite3.IntegrityError:
            return False, "username already exists"

    def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
        username = normalize_username(username)
        with self.connect_db() as conn:
            user = conn.execute(
                """
                SELECT id, username, role, encrypted_challenge
                FROM users
                WHERE username = ?
                """,
                (username,),
            ).fetchone()
        if not user:
            return None
        if not self.verify_password(user["encrypted_challenge"], password):
            return None
        return user

    def encrypt_password(self, password: str) -> Optional[str]:
        return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password)

    def verify_password(self, encrypted_blob: str, password: str) -> bool:
        decrypted = self._run_mojicrypt("decrypt", encrypted_blob, password)
        return decrypted == AUTH_CHALLENGE

    def _run_mojicrypt(self, command: str, payload: str, passphrase: str) -> Optional[str]:
        self.last_error = ""

        if not self.mojicrypt_bin.exists():
            self.last_error = f"binary not found at {self.mojicrypt_bin}"
            return None

        is_python_script = looks_like_python_script(self.mojicrypt_bin)
        direct_exec_ok = os.access(self.mojicrypt_bin, os.X_OK)
        python_exec_ok = bool(sys.executable) and is_python_script

        if not direct_exec_ok and not python_exec_ok:
            self.last_error = "binary is not executable"
            return None

        commands_to_try = []
        if direct_exec_ok:
            commands_to_try.append([
                str(self.mojicrypt_bin),
                "-p",
                passphrase,
                command,
                payload,
            ])

        # some systems do not expose a plain `python` binary for shebang execution.
        # retrying with the active interpreter avoids false failures for vendored scripts.
        if python_exec_ok:
            fallback_cmd = [
                sys.executable,
                str(self.mojicrypt_bin),
                "-p",
                passphrase,
                command,
                payload,
            ]
            if fallback_cmd not in commands_to_try:
                commands_to_try.append(fallback_cmd)

        for cmd in commands_to_try:
            try:
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    timeout=90,
                )
            except FileNotFoundError as exc:
                self.last_error = str(exc)
                continue
            except subprocess.TimeoutExpired:
                self.last_error = "command timed out"
                continue

            if result.returncode == 0:
                return result.stdout.strip()

            stderr = (result.stderr or "").strip()
            if stderr:
                self.last_error = stderr
            else:
                self.last_error = f"command exited with status {result.returncode}"

        return None