diff --git a/.importlinter b/.importlinter index b199391..e5f4afb 100644 --- a/.importlinter +++ b/.importlinter @@ -24,6 +24,7 @@ forbidden_modules = docctl.service_manifest docctl.service_query docctl.service_session + docctl.service_session_worker docctl.service_snapshot docctl.service_types docctl.services @@ -44,6 +45,7 @@ forbidden_modules = docctl.service_manifest docctl.service_query docctl.service_session + docctl.service_session_worker docctl.service_snapshot docctl.service_types docctl.services @@ -57,6 +59,7 @@ source_modules = docctl.service_manifest docctl.service_query docctl.service_session + docctl.service_session_worker docctl.service_snapshot docctl.service_types docctl.services @@ -71,6 +74,7 @@ source_modules = docctl.service_manifest docctl.service_query docctl.service_session + docctl.service_session_worker docctl.service_snapshot forbidden_modules = docctl.chunking diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 4ae3b54..63065e4 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -20,6 +20,9 @@ Primary runtime code lives in `src/docctl/`. - `service_ingest.py`, `service_query.py`, `service_session.py`, `service_doctor.py` - Internal orchestration modules split by workflow domain. - Own command execution logic for ingest/query/session/doctor flows. +- `service_session_worker.py` + - Singleton detached session worker lifecycle and local IPC transport orchestration. + - Reuses `service_session.py` request dispatch/runtime handling. - `service_snapshot.py` - Snapshot orchestration for index export/import workflows. - Owns zip archive validation, safe extraction, and restore policy enforcement. diff --git a/README.md b/README.md index 1a7761e..95af16a 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,10 @@ docctl show --allow-model-download | `docctl catalog` | Show index summary and per-document inventory. | | `docctl doctor` | Run local diagnostics for index and embedding setup. | | `docctl session` | Run a read-only NDJSON request session on stdin/stdout. | +| `docctl session start` | Start singleton detached session worker (errors if already running). | +| `docctl session status` | Show singleton detached session worker status. | +| `docctl session exec` | Execute NDJSON requests through singleton detached worker. | +| `docctl session stop` | Stop singleton detached session worker. | ## JSON and Session Mode Use `--json` for deterministic machine-readable output: @@ -101,6 +105,22 @@ cat <<'EOF' | docctl session --allow-model-download EOF ``` +Use detached singleton worker mode when you want warm reuse across CLI invocations: + +```bash +# Start singleton worker (default idle timeout: 900 seconds) +docctl --json session start + +# Execute NDJSON requests through detached worker +cat <<'EOF' | docctl session exec +{"id":"q1","op":"search","query":"security gateway diagnostics","top_k":5} +{"id":"q2","op":"catalog"} +EOF + +# Stop worker explicitly +docctl --json session stop +``` + ## Configuration Global options: - `--index-path` (default: `.docctl`) diff --git a/docs/product-specs/cli-contract.md b/docs/product-specs/cli-contract.md index 14384aa..75531fa 100644 --- a/docs/product-specs/cli-contract.md +++ b/docs/product-specs/cli-contract.md @@ -1,4 +1,4 @@ -# PSPEC-0001 CLI Contract v3 (Multi-Format Ingest, Locator-Free Retrieval) +# PSPEC-0001 CLI Contract v4 (Singleton Session Worker Controls + Multi-Format Retrieval) ## Commands - `docctl ingest ` @@ -10,6 +10,10 @@ - `docctl catalog` - `docctl doctor` - `docctl session` +- `docctl session start` +- `docctl session status` +- `docctl session exec` +- `docctl session stop` ## Command Import Boundaries - Set A (ML-capable command path): `ingest`, `search`, `doctor`, `session`. @@ -58,6 +62,16 @@ - Search request accepts optional fields: `doc_id`, `source`, `title`, `min_score`, `rerank`, `rerank_candidates`. - Response line format: `{"id":"q1","ok":true,"result":{...}}` - Error response format: `{"id":"q1","ok":false,"error":{"message":"...","exit_code":NN}}` +- `docctl session exec` uses the same NDJSON request/response contract as `docctl session`. + +## Singleton Session Worker +- At most one detached worker session may run at a time. +- `docctl session start` fails if a worker is already running. +- `docctl session exec` auto-starts the singleton worker if none is running. +- A running worker is bound to startup config (`index_path`, `collection`, embedding/rerank models, `allow_model_download`). +- `docctl session exec` fails on config mismatch until `docctl session stop` is run. +- Detached worker mode is POSIX-only. +- Default idle timeout for detached worker mode is `900` seconds. ## Search Hit Payload - Base hit shape includes: `id`, `text`, `metadata`, `distance`, `score`, `rank`. @@ -83,3 +97,5 @@ 4. Failure classes map to stable exit codes. 5. In `--json` mode, stdout contains only deterministic JSON payloads. 6. `session` reuses one embedding model instance across multiple search requests in one process. +7. `session start` fails with a stable error when singleton worker is already running. +8. `session exec` reuses the detached singleton worker and preserves NDJSON response contract. diff --git a/docs/product-specs/index.md b/docs/product-specs/index.md index c8f8710..972fa62 100644 --- a/docs/product-specs/index.md +++ b/docs/product-specs/index.md @@ -5,4 +5,4 @@ Sorted by `id` ascending. | id | title | status | owner | last_updated | path | |---|---|---|---|---|---| -| PSPEC-0001 | CLI Contract v3 (Multi-Format Ingest, Locator-Free Retrieval) | active | Engineering | 2026-03-26 | docs/product-specs/cli-contract.md | +| PSPEC-0001 | CLI Contract v4 (Singleton Session Worker Controls + Multi-Format Retrieval) | active | Engineering | 2026-03-26 | docs/product-specs/cli-contract.md | diff --git a/src/docctl/cli.py b/src/docctl/cli.py index 5552d27..3d836f2 100644 --- a/src/docctl/cli.py +++ b/src/docctl/cli.py @@ -26,15 +26,20 @@ from .jsonio import dumps_json from .models import DoctorReport from .services import ( + DEFAULT_SESSION_IDLE_TTL_SECONDS, collect_catalog, collect_stats, + exec_session_requests, export_snapshot, import_snapshot, ingest_path, run_doctor, run_session_requests, search_chunks, + session_worker_status, show_chunk, + start_session_worker, + stop_session_worker, ) app = typer.Typer( @@ -42,6 +47,12 @@ no_args_is_help=True, help="docctl is a CLI-first local document retrieval tool.", ) +session_app = typer.Typer( + add_completion=False, + invoke_without_command=True, + help="Session control and NDJSON session execution commands.", +) +app.add_typer(session_app, name="session") ALLOW_MODEL_DOWNLOAD_HELP = "Allow downloading missing embedding/reranker model artifacts." @@ -396,7 +407,7 @@ def import_( _handle_error(error) -@app.command(help="Run a read-only NDJSON request session on stdin/stdout.") +@session_app.callback() def session( ctx: typer.Context, allow_model_download: bool = typer.Option( @@ -405,12 +416,14 @@ def session( help=ALLOW_MODEL_DOWNLOAD_HELP, ), ) -> None: - """Run a read-only NDJSON request session on standard streams. + """Run legacy read-only NDJSON stream mode when no subcommand is selected. Args: ctx: Typer context containing resolved configuration. allow_model_download: Whether missing embedding models may be downloaded. """ + if ctx.invoked_subcommand is not None: + return config = ctx.obj try: responses = run_session_requests( @@ -424,6 +437,118 @@ def session( _handle_error(error) +@session_app.command(help="Start singleton detached session worker.") +def start( + ctx: typer.Context, + allow_model_download: bool = typer.Option( + False, + "--allow-model-download", + help=ALLOW_MODEL_DOWNLOAD_HELP, + ), + idle_ttl: int = typer.Option( + DEFAULT_SESSION_IDLE_TTL_SECONDS, + "--idle-ttl", + min=1, + help="Idle timeout in seconds before worker self-termination.", + ), +) -> None: + """Start singleton detached session worker. + + Args: + ctx: Typer context containing resolved configuration. + allow_model_download: Whether missing embedding models may be downloaded. + idle_ttl: Idle timeout in seconds. + """ + config = ctx.obj + try: + payload = start_session_worker( + config=config, + allow_model_download=allow_model_download, + idle_ttl_seconds=idle_ttl, + ) + _emit_success(config=config, payload=payload) + except Exception as error: # noqa: BLE001 + _handle_error(error) + + +@session_app.command(help="Show singleton detached session worker status.") +def status( + ctx: typer.Context, + allow_model_download: bool = typer.Option( + False, + "--allow-model-download", + help=ALLOW_MODEL_DOWNLOAD_HELP, + ), +) -> None: + """Show singleton detached session worker status. + + Args: + ctx: Typer context containing resolved configuration. + allow_model_download: Whether missing embedding models may be downloaded. + """ + config = ctx.obj + try: + payload = session_worker_status( + config=config, + allow_model_download=allow_model_download, + ) + _emit_success(config=config, payload=payload) + except Exception as error: # noqa: BLE001 + _handle_error(error) + + +@session_app.command(name="exec", help="Execute NDJSON requests through singleton session worker.") +def exec_( + ctx: typer.Context, + allow_model_download: bool = typer.Option( + False, + "--allow-model-download", + help=ALLOW_MODEL_DOWNLOAD_HELP, + ), + idle_ttl: int = typer.Option( + DEFAULT_SESSION_IDLE_TTL_SECONDS, + "--idle-ttl", + min=1, + help="Idle timeout in seconds for auto-started worker.", + ), +) -> None: + """Execute NDJSON request lines through singleton session worker. + + Args: + ctx: Typer context containing resolved configuration. + allow_model_download: Whether missing embedding models may be downloaded. + idle_ttl: Idle timeout in seconds for auto-started worker. + """ + config = ctx.obj + try: + request_lines = list(sys.stdin) + responses = exec_session_requests( + config=config, + request_lines=request_lines, + allow_model_download=allow_model_download, + idle_ttl_seconds=idle_ttl, + ) + for response in responses: + typer.echo(dumps_json(response)) + except Exception as error: # noqa: BLE001 + _handle_error(error) + + +@session_app.command(help="Stop singleton detached session worker.") +def stop(ctx: typer.Context) -> None: + """Stop singleton detached session worker. + + Args: + ctx: Typer context containing resolved configuration. + """ + config = ctx.obj + try: + payload = stop_session_worker() + _emit_success(config=config, payload=payload) + except Exception as error: # noqa: BLE001 + _handle_error(error) + + def main() -> None: """Run the docctl CLI application entrypoint.""" app() diff --git a/src/docctl/service_session.py b/src/docctl/service_session.py index 0ac81c7..4bc51f7 100644 --- a/src/docctl/service_session.py +++ b/src/docctl/service_session.py @@ -390,28 +390,71 @@ def run_session_requests( Response dictionaries containing success results or structured errors. """ runtime = SessionRuntime(request=request, deps=deps) + yield from run_session_requests_with_runtime( + runtime=runtime, + request_lines=request.request_lines, + verbose=request.config.verbose, + ) + + +def run_session_requests_with_runtime( + *, + runtime: SessionRuntime, + request_lines: Iterable[str], + verbose: bool, +) -> Iterable[dict[str, Any]]: + """Process NDJSON request lines using an existing session runtime. + + Args: + runtime: Reusable session runtime containing cached dependencies. + request_lines: Incoming NDJSON request lines. + verbose: Whether verbose mode is enabled. + + Yields: + Response dictionaries containing success results or structured errors. + """ + for raw_line in request_lines: + response = run_session_request_line(runtime=runtime, raw_line=raw_line, verbose=verbose) + if response is not None: + yield response - for raw_line in request.request_lines: - line = raw_line.strip() - if not line: - continue - - request_id: Any = None - try: - payload = _parse_payload(line) - request_id = payload.get("id") - op = _parse_operation(payload) - handler = _OPERATION_HANDLERS.get(op) - if handler is None: - raise DocctlError(message=f"unsupported session operation: {op}", exit_code=50) - - with suppress_external_output(enabled=not request.config.verbose): - result = handler(runtime, payload) - - yield { - "id": request_id, - "ok": True, - "result": result, - } - except Exception as error: # noqa: BLE001 - yield session_error(request_id=request_id, error=error) + +def run_session_request_line( + *, + runtime: SessionRuntime, + raw_line: str, + verbose: bool, +) -> dict[str, Any] | None: + """Process one NDJSON request line with a reusable runtime. + + Args: + runtime: Reusable session runtime containing cached dependencies. + raw_line: Raw NDJSON request line. + verbose: Whether verbose mode is enabled. + + Returns: + One response payload for non-empty lines, otherwise `None`. + """ + line = raw_line.strip() + if not line: + return None + + request_id: Any = None + try: + payload = _parse_payload(line) + request_id = payload.get("id") + op = _parse_operation(payload) + handler = _OPERATION_HANDLERS.get(op) + if handler is None: + raise DocctlError(message=f"unsupported session operation: {op}", exit_code=50) + + with suppress_external_output(enabled=not verbose): + result = handler(runtime, payload) + + return { + "id": request_id, + "ok": True, + "result": result, + } + except Exception as error: # noqa: BLE001 + return session_error(request_id=request_id, error=error) diff --git a/src/docctl/service_session_worker.py b/src/docctl/service_session_worker.py new file mode 100644 index 0000000..d5ca47d --- /dev/null +++ b/src/docctl/service_session_worker.py @@ -0,0 +1,792 @@ +"""Singleton session-worker lifecycle and local IPC transport helpers.""" + +from __future__ import annotations + +import json +import multiprocessing +import os +import socket +import tempfile +import time +from contextlib import contextmanager, suppress +from dataclasses import asdict, dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, cast + +from .config import CliConfig +from .errors import DocctlError, InternalDocctlError +from .service_session import SessionRuntime, run_session_request_line +from .service_types import ServiceDependencies, SessionStreamRequest + +ENV_SESSION_DIR = "DOCCTL_SESSION_DIR" +DEFAULT_SESSION_IDLE_TTL_SECONDS = 900 +SESSION_PROTOCOL_VERSION = 1 +SESSION_SCHEMA_VERSION = 1 +SESSION_START_TIMEOUT_SECONDS = 10.0 +SESSION_STOP_TIMEOUT_SECONDS = 5.0 +SESSION_CONNECT_TIMEOUT_SECONDS = 5.0 +SESSION_CONTROL_STOP_ID = "__session_worker_stop__" +SESSION_CONTROL_STOP_OP = "__control_stop__" +SESSION_STATE_FILENAME = "session-state.json" +SESSION_LOCK_FILENAME = "session.lock" +SESSION_SOCKET_FILENAME = "session.sock" + + +@dataclass(slots=True, frozen=True) +class SessionArtifacts: + """Filesystem paths used by the singleton session worker.""" + + runtime_dir: Path + state_path: Path + lock_path: Path + socket_path: Path + + +@dataclass(slots=True, frozen=True) +class SessionBinding: + """Configuration fields that define whether a running session can be reused.""" + + index_path: str + collection: str + embedding_model: str + rerank_model: str + allow_model_download: bool + + +@dataclass(slots=True, frozen=True) +class SessionState: + """Serialized singleton session state stored on disk.""" + + schema_version: int + protocol_version: int + status: str + pid: int + socket_path: str + index_path: str + collection: str + embedding_model: str + rerank_model: str + allow_model_download: bool + idle_ttl_seconds: int + started_at: str + last_used_at: str + expires_at: str + + +def start_session_worker( + *, + config: CliConfig, + allow_model_download: bool, + idle_ttl_seconds: int, + deps: ServiceDependencies, +) -> dict[str, object]: + """Start the singleton session worker. + + Args: + config: Resolved CLI configuration. + allow_model_download: Whether model downloads are allowed in this worker. + idle_ttl_seconds: Idle timeout before worker self-termination. + deps: Session runtime dependency bundle. + + Returns: + Session status payload for the started worker. + + Raises: + DocctlError: If a worker is already running or runtime validation fails. + """ + _require_posix() + _validate_idle_ttl(idle_ttl_seconds) + artifacts = _session_artifacts() + binding = _binding_from_config(config=config, allow_model_download=allow_model_download) + _ensure_runtime_dir(artifacts.runtime_dir) + with _session_lock(artifacts.lock_path): + running_state = _load_running_state_locked(artifacts=artifacts) + if running_state is not None: + raise DocctlError( + message="session already running; run `docctl session stop` first", + exit_code=50, + ) + started_state = _start_worker_locked( + config=config, + artifacts=artifacts, + binding=binding, + idle_ttl_seconds=idle_ttl_seconds, + allow_model_download=allow_model_download, + deps=deps, + ) + return _running_payload(state=started_state) + + +def session_worker_status( + *, + config: CliConfig, + allow_model_download: bool, +) -> dict[str, object]: + """Return singleton session worker status. + + Args: + config: Resolved CLI configuration. + allow_model_download: Current command download flag for config-match checks. + + Returns: + Running or stopped status payload. + """ + _require_posix() + artifacts = _session_artifacts() + binding = _binding_from_config(config=config, allow_model_download=allow_model_download) + _ensure_runtime_dir(artifacts.runtime_dir) + with _session_lock(artifacts.lock_path): + state = _load_running_state_locked(artifacts=artifacts) + if state is None: + return _stopped_payload() + payload = _running_payload(state=state) + payload["config_match"] = _state_matches_binding(state=state, binding=binding) + return payload + + +def stop_session_worker() -> dict[str, object]: + """Stop the singleton session worker when it is running. + + Returns: + Status payload after stop handling. + """ + _require_posix() + artifacts = _session_artifacts() + _ensure_runtime_dir(artifacts.runtime_dir) + with _session_lock(artifacts.lock_path): + state = _read_state(path=artifacts.state_path) + if state is None: + _clear_artifacts_locked(artifacts=artifacts) + return _stopped_payload() + _request_worker_shutdown(socket_path=Path(state.socket_path)) + _wait_for_socket_shutdown( + socket_path=Path(state.socket_path), + timeout_seconds=SESSION_STOP_TIMEOUT_SECONDS, + ) + _clear_artifacts_locked(artifacts=artifacts) + return _stopped_payload() + + +def exec_session_requests( # noqa: PLR0913 + *, + config: CliConfig, + request_lines: list[str], + allow_model_download: bool, + idle_ttl_seconds: int, + deps: ServiceDependencies, +) -> list[dict[str, Any]]: + """Execute NDJSON requests through the singleton worker socket. + + Args: + config: Resolved CLI configuration. + request_lines: Raw NDJSON request lines. + allow_model_download: Whether model downloads are allowed. + idle_ttl_seconds: Idle timeout for auto-started workers. + deps: Session runtime dependency bundle. + + Returns: + Response payload dictionaries for non-empty request lines. + + Raises: + DocctlError: If session reuse is invalid or socket exchange fails. + """ + _require_posix() + _validate_idle_ttl(idle_ttl_seconds) + artifacts = _session_artifacts() + binding = _binding_from_config(config=config, allow_model_download=allow_model_download) + _ensure_runtime_dir(artifacts.runtime_dir) + with _session_lock(artifacts.lock_path): + state = _load_running_state_locked(artifacts=artifacts) + if state is None: + state = _start_worker_locked( + config=config, + artifacts=artifacts, + binding=binding, + idle_ttl_seconds=idle_ttl_seconds, + allow_model_download=allow_model_download, + deps=deps, + ) + elif not _state_matches_binding(state=state, binding=binding): + raise DocctlError( + message=( + "running session configuration does not match current options; " + "run `docctl session stop` first" + ), + exit_code=50, + ) + return _send_requests_over_socket( + socket_path=Path(state.socket_path), request_lines=request_lines + ) + + +def serve_session_worker( # noqa: PLR0913 + *, + config: CliConfig, + socket_path: Path, + state_path: Path, + idle_ttl_seconds: int, + allow_model_download: bool, + deps: ServiceDependencies, +) -> None: + """Run the singleton worker server loop until idle timeout or termination. + + Args: + config: Resolved CLI configuration used by session runtime. + socket_path: Unix socket path used for request transport. + state_path: Session state file path. + idle_ttl_seconds: Idle timeout before automatic shutdown. + allow_model_download: Whether model downloads are allowed. + deps: Session runtime dependencies. + """ + _require_posix() + _validate_idle_ttl(idle_ttl_seconds) + socket_path.parent.mkdir(parents=True, exist_ok=True) + _safe_unlink(path=socket_path) + session_request = SessionStreamRequest( + config=config, + request_lines=[], + allow_model_download=allow_model_download, + ) + runtime = SessionRuntime(request=session_request, deps=deps) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + server.bind(str(socket_path)) + os.chmod(socket_path, 0o600) + server.listen() + server.settimeout(1.0) + _serve_worker_loop( + server=server, + runtime=runtime, + verbose=config.verbose, + state_path=state_path, + idle_ttl_seconds=idle_ttl_seconds, + ) + finally: + server.close() + _safe_unlink(path=socket_path) + _safe_remove_state_if_owned(path=state_path, pid=os.getpid()) + + +def _serve_worker_loop( # noqa: PLR0913 + *, + server: socket.socket, + runtime: SessionRuntime, + verbose: bool, + state_path: Path, + idle_ttl_seconds: int, +) -> None: + last_used = time.time() + while True: + if time.time() - last_used >= idle_ttl_seconds: + break + try: + connection, _ = server.accept() + except TimeoutError: + continue + except OSError: + break + with connection: + used_connection, stop_requested = _serve_connection( + connection=connection, + runtime=runtime, + verbose=verbose, + ) + if used_connection: + last_used = time.time() + _update_last_used_state( + state_path=state_path, + pid=os.getpid(), + idle_ttl_seconds=idle_ttl_seconds, + last_used=last_used, + ) + if stop_requested: + break + + +def _require_posix() -> None: + if os.name == "posix" and hasattr(socket, "AF_UNIX"): + return + raise DocctlError( + message="session worker mode is supported on POSIX systems only", + exit_code=50, + ) + + +def _validate_idle_ttl(idle_ttl_seconds: int) -> None: + if idle_ttl_seconds >= 1: + return + raise DocctlError( + message="invalid idle ttl: --idle-ttl must be >= 1", + exit_code=50, + ) + + +def _session_artifacts() -> SessionArtifacts: + root_override = os.getenv(ENV_SESSION_DIR) + if root_override: + runtime_dir = Path(root_override).expanduser() + else: + runtime_dir = Path(tempfile.gettempdir()) / f"docctl-session-{os.getuid()}" + return SessionArtifacts( + runtime_dir=runtime_dir, + state_path=runtime_dir / SESSION_STATE_FILENAME, + lock_path=runtime_dir / SESSION_LOCK_FILENAME, + socket_path=runtime_dir / SESSION_SOCKET_FILENAME, + ) + + +def _ensure_runtime_dir(runtime_dir: Path) -> None: + runtime_dir.mkdir(parents=True, exist_ok=True) + os.chmod(runtime_dir, 0o700) + + +@contextmanager +def _session_lock(lock_path: Path): + import fcntl + + lock_path.parent.mkdir(parents=True, exist_ok=True) + with lock_path.open("a+", encoding="utf-8") as lock_file: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + try: + yield + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + + +def _binding_from_config(*, config: CliConfig, allow_model_download: bool) -> SessionBinding: + return SessionBinding( + index_path=str(config.index_path.resolve(strict=False)), + collection=config.collection, + embedding_model=config.embedding_model, + rerank_model=config.rerank_model, + allow_model_download=allow_model_download, + ) + + +def _load_running_state_locked(*, artifacts: SessionArtifacts) -> SessionState | None: + state = _read_state(path=artifacts.state_path) + if state is None: + _safe_unlink(path=artifacts.socket_path) + return None + if _socket_reachable(path=Path(state.socket_path)): + return state + _clear_artifacts_locked(artifacts=artifacts) + return None + + +def _read_state(*, path: Path) -> SessionState | None: + if not path.exists(): + return None + try: + raw_payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(raw_payload, dict): + return None + return _state_from_payload(payload=raw_payload) + + +def _state_from_payload(*, payload: dict[str, object]) -> SessionState | None: + try: + typed_payload = cast(dict[str, Any], payload) + return SessionState( + schema_version=int(typed_payload["schema_version"]), + protocol_version=int(typed_payload["protocol_version"]), + status=str(typed_payload["status"]), + pid=int(typed_payload["pid"]), + socket_path=str(typed_payload["socket_path"]), + index_path=str(typed_payload["index_path"]), + collection=str(typed_payload["collection"]), + embedding_model=str(typed_payload["embedding_model"]), + rerank_model=str(typed_payload["rerank_model"]), + allow_model_download=bool(typed_payload["allow_model_download"]), + idle_ttl_seconds=int(typed_payload["idle_ttl_seconds"]), + started_at=str(typed_payload["started_at"]), + last_used_at=str(typed_payload["last_used_at"]), + expires_at=str(typed_payload["expires_at"]), + ) + except (KeyError, TypeError, ValueError): + return None + + +def _write_state(*, path: Path, state: SessionState) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + json.dumps(asdict(state), sort_keys=True, separators=(",", ":")), encoding="utf-8" + ) + + +def _start_worker_locked( # noqa: PLR0913 + *, + config: CliConfig, + artifacts: SessionArtifacts, + binding: SessionBinding, + idle_ttl_seconds: int, + allow_model_download: bool, + deps: ServiceDependencies, +) -> SessionState: + process = _start_worker_process( + config=config, + socket_path=artifacts.socket_path, + state_path=artifacts.state_path, + idle_ttl_seconds=idle_ttl_seconds, + allow_model_download=allow_model_download, + deps=deps, + ) + if process.pid is None: + raise InternalDocctlError("session worker failed to start") + started_at = _now_utc_iso() + state = SessionState( + schema_version=SESSION_SCHEMA_VERSION, + protocol_version=SESSION_PROTOCOL_VERSION, + status="running", + pid=process.pid, + socket_path=str(artifacts.socket_path), + index_path=binding.index_path, + collection=binding.collection, + embedding_model=binding.embedding_model, + rerank_model=binding.rerank_model, + allow_model_download=binding.allow_model_download, + idle_ttl_seconds=idle_ttl_seconds, + started_at=started_at, + last_used_at=started_at, + expires_at=_expires_at(last_used_at=started_at, idle_ttl_seconds=idle_ttl_seconds), + ) + _write_state(path=artifacts.state_path, state=state) + if _wait_for_socket_ready(process=process, socket_path=artifacts.socket_path): + return state + _request_worker_shutdown(socket_path=artifacts.socket_path) + _wait_for_socket_shutdown( + socket_path=artifacts.socket_path, + timeout_seconds=SESSION_STOP_TIMEOUT_SECONDS, + ) + _clear_artifacts_locked(artifacts=artifacts) + raise InternalDocctlError("session worker failed to start") + + +def _start_worker_process( # noqa: PLR0913 + *, + config: CliConfig, + socket_path: Path, + state_path: Path, + idle_ttl_seconds: int, + allow_model_download: bool, + deps: ServiceDependencies, +) -> multiprocessing.process.BaseProcess: + context = multiprocessing.get_context("spawn") + process = context.Process( + target=_run_worker_process, + kwargs={ + "config": config, + "socket_path": socket_path, + "state_path": state_path, + "idle_ttl_seconds": idle_ttl_seconds, + "allow_model_download": allow_model_download, + "deps": deps, + }, + daemon=False, + ) + process.start() + _detach_child_process(process=process) + return process + + +def _detach_child_process(*, process: multiprocessing.process.BaseProcess) -> None: + import multiprocessing.process as multiprocessing_process + + children = getattr(multiprocessing_process, "_children", None) + if isinstance(children, set): + children.discard(process) + + +def _run_worker_process( # noqa: PLR0913 + *, + config: CliConfig, + socket_path: Path, + state_path: Path, + idle_ttl_seconds: int, + allow_model_download: bool, + deps: ServiceDependencies, +) -> None: + if os.name == "posix": + with suppress(OSError): + os.setsid() + serve_session_worker( + config=config, + socket_path=socket_path, + state_path=state_path, + idle_ttl_seconds=idle_ttl_seconds, + allow_model_download=allow_model_download, + deps=deps, + ) + + +def _wait_for_socket_ready( + *, process: multiprocessing.process.BaseProcess, socket_path: Path +) -> bool: + deadline = time.time() + SESSION_START_TIMEOUT_SECONDS + while time.time() < deadline: + if not process.is_alive() and process.exitcode is not None: + return False + if _socket_reachable(path=socket_path): + return True + time.sleep(0.05) + return False + + +def _request_worker_shutdown(*, socket_path: Path) -> None: + if not _socket_reachable(path=socket_path): + return + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: + client.settimeout(SESSION_CONNECT_TIMEOUT_SECONDS) + client.connect(str(socket_path)) + reader = client.makefile("r", encoding="utf-8") + writer = client.makefile("w", encoding="utf-8") + try: + payload = {"id": SESSION_CONTROL_STOP_ID, "op": SESSION_CONTROL_STOP_OP} + writer.write(json.dumps(payload, sort_keys=True, separators=(",", ":"))) + writer.write("\n") + writer.flush() + _ = reader.readline() + finally: + reader.close() + writer.close() + except OSError: + return + + +def _wait_for_socket_shutdown(*, socket_path: Path, timeout_seconds: float) -> None: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + if not _socket_reachable(path=socket_path): + return + time.sleep(0.05) + + +def _send_requests_over_socket( + *, + socket_path: Path, + request_lines: list[str], +) -> list[dict[str, Any]]: + responses: list[dict[str, Any]] = [] + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: + client.settimeout(SESSION_CONNECT_TIMEOUT_SECONDS) + client.connect(str(socket_path)) + # Only bound the connect phase; request handling may legitimately take longer + # (for example first-use model warm-up) and should not fail at transport layer. + client.settimeout(None) + reader = client.makefile("r", encoding="utf-8") + writer = client.makefile("w", encoding="utf-8") + try: + for raw_line in request_lines: + stripped = raw_line.strip() + if not stripped: + continue + writer.write(stripped) + writer.write("\n") + writer.flush() + response_line = reader.readline() + if not response_line: + raise InternalDocctlError("session worker connection closed unexpectedly") + responses.append(_parse_worker_response(line=response_line)) + finally: + reader.close() + writer.close() + return responses + + +def _parse_worker_response(*, line: str) -> dict[str, Any]: + try: + payload = json.loads(line) + except json.JSONDecodeError as error: + raise InternalDocctlError("session worker returned invalid JSON") from error + if isinstance(payload, dict): + return payload + raise InternalDocctlError("session worker returned invalid response payload") + + +def _state_matches_binding(*, state: SessionState, binding: SessionBinding) -> bool: + return ( + state.index_path == binding.index_path + and state.collection == binding.collection + and state.embedding_model == binding.embedding_model + and state.rerank_model == binding.rerank_model + and state.allow_model_download == binding.allow_model_download + ) + + +def _running_payload(*, state: SessionState) -> dict[str, object]: + return { + "status": "running", + "pid": state.pid, + "socket_path": state.socket_path, + "protocol_version": state.protocol_version, + "index_path": state.index_path, + "collection": state.collection, + "embedding_model": state.embedding_model, + "rerank_model": state.rerank_model, + "allow_model_download": state.allow_model_download, + "idle_ttl_seconds": state.idle_ttl_seconds, + "started_at": state.started_at, + "last_used_at": state.last_used_at, + "expires_at": state.expires_at, + } + + +def _stopped_payload() -> dict[str, object]: + return {"status": "stopped"} + + +def _socket_reachable(*, path: Path) -> bool: + if not path.exists(): + return False + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as probe: + probe.settimeout(0.2) + probe.connect(str(path)) + return True + except OSError: + return False + + +def _clear_artifacts_locked(*, artifacts: SessionArtifacts) -> None: + _safe_unlink(path=artifacts.socket_path) + _safe_unlink(path=artifacts.state_path) + + +def _safe_unlink(*, path: Path) -> None: + try: + path.unlink() + except FileNotFoundError: + return + except OSError: + return + + +def _safe_remove_state_if_owned(*, path: Path, pid: int) -> None: + state = _read_state(path=path) + if state is None or state.pid != pid: + return + _safe_unlink(path=path) + + +def _serve_connection( + *, + connection: socket.socket, + runtime: SessionRuntime, + verbose: bool, +) -> tuple[bool, bool]: + used_connection = False + stop_requested = False + reader = connection.makefile("r", encoding="utf-8") + writer = connection.makefile("w", encoding="utf-8") + try: + for raw_line in reader: + if _is_control_stop_request(raw_line=raw_line): + used_connection = True + stop_requested = True + if not _write_worker_response_line( + writer=writer, + payload={ + "id": SESSION_CONTROL_STOP_ID, + "ok": True, + "result": {"status": "stopping"}, + }, + ): + break + break + response = run_session_request_line(runtime=runtime, raw_line=raw_line, verbose=verbose) + if response is None: + continue + used_connection = True + if not _write_worker_response_line(writer=writer, payload=response): + break + finally: + with suppress(OSError): + reader.close() + with suppress(OSError): + writer.close() + return used_connection, stop_requested + + +def _write_worker_response_line(*, writer: Any, payload: dict[str, Any]) -> bool: + """Write one NDJSON response line to the worker socket stream. + + Args: + writer: Text writer bound to a socket stream. + payload: Response payload to serialize. + + Returns: + `True` when the response is written and flushed successfully, otherwise + `False` when the peer disconnected. + """ + try: + writer.write(json.dumps(payload, sort_keys=True, separators=(",", ":"))) + writer.write("\n") + writer.flush() + except OSError: + return False + return True + + +def _is_control_stop_request(*, raw_line: str) -> bool: + line = raw_line.strip() + if not line: + return False + try: + payload = json.loads(line) + except json.JSONDecodeError: + return False + if not isinstance(payload, dict): + return False + return ( + payload.get("id") == SESSION_CONTROL_STOP_ID + and payload.get("op") == SESSION_CONTROL_STOP_OP + ) + + +def _update_last_used_state( + *, + state_path: Path, + pid: int, + idle_ttl_seconds: int, + last_used: float, +) -> None: + state = _read_state(path=state_path) + if state is None or state.pid != pid: + return + last_used_at = _datetime_to_iso(timestamp=last_used) + updated_state = SessionState( + schema_version=state.schema_version, + protocol_version=state.protocol_version, + status=state.status, + pid=state.pid, + socket_path=state.socket_path, + index_path=state.index_path, + collection=state.collection, + embedding_model=state.embedding_model, + rerank_model=state.rerank_model, + allow_model_download=state.allow_model_download, + idle_ttl_seconds=state.idle_ttl_seconds, + started_at=state.started_at, + last_used_at=last_used_at, + expires_at=_expires_at(last_used_at=last_used_at, idle_ttl_seconds=idle_ttl_seconds), + ) + _write_state(path=state_path, state=updated_state) + + +def _now_utc_iso() -> str: + return datetime.now(tz=UTC).isoformat() + + +def _datetime_to_iso(*, timestamp: float) -> str: + return datetime.fromtimestamp(timestamp, tz=UTC).isoformat() + + +def _expires_at(*, last_used_at: str, idle_ttl_seconds: int) -> str: + last_used = datetime.fromisoformat(last_used_at) + expires = last_used.timestamp() + idle_ttl_seconds + return _datetime_to_iso(timestamp=expires) diff --git a/src/docctl/services.py b/src/docctl/services.py index f556d49..e538349 100644 --- a/src/docctl/services.py +++ b/src/docctl/services.py @@ -15,6 +15,21 @@ from .service_query import search_chunks as search_chunks_impl from .service_query import show_chunk as show_chunk_impl from .service_session import run_session_requests as run_session_requests_impl +from .service_session_worker import ( + DEFAULT_SESSION_IDLE_TTL_SECONDS, +) +from .service_session_worker import ( + exec_session_requests as exec_session_requests_impl, +) +from .service_session_worker import ( + session_worker_status as session_worker_status_impl, +) +from .service_session_worker import ( + start_session_worker as start_session_worker_impl, +) +from .service_session_worker import ( + stop_session_worker as stop_session_worker_impl, +) from .service_snapshot import export_snapshot as export_snapshot_impl from .service_snapshot import import_snapshot as import_snapshot_impl from .service_types import ( @@ -305,6 +320,79 @@ def run_session_requests( return run_session_requests_impl(request=request, deps=_ml_dependencies()) +def start_session_worker( + *, + config: CliConfig, + allow_model_download: bool, + idle_ttl_seconds: int = DEFAULT_SESSION_IDLE_TTL_SECONDS, +) -> dict[str, object]: + """Start the singleton session worker. + + Args: + config: Resolved CLI configuration. + allow_model_download: Whether missing models may be downloaded. + idle_ttl_seconds: Idle timeout before worker self-termination. + + Returns: + Session start payload. + """ + return start_session_worker_impl( + config=config, + allow_model_download=allow_model_download, + idle_ttl_seconds=idle_ttl_seconds, + deps=_ml_dependencies(), + ) + + +def session_worker_status(*, config: CliConfig, allow_model_download: bool) -> dict[str, object]: + """Return singleton session worker status payload. + + Args: + config: Resolved CLI configuration. + allow_model_download: Whether missing models may be downloaded. + + Returns: + Session status payload. + """ + return session_worker_status_impl(config=config, allow_model_download=allow_model_download) + + +def stop_session_worker() -> dict[str, object]: + """Stop singleton session worker if running. + + Returns: + Session stop payload. + """ + return stop_session_worker_impl() + + +def exec_session_requests( + *, + config: CliConfig, + request_lines: list[str], + allow_model_download: bool, + idle_ttl_seconds: int = DEFAULT_SESSION_IDLE_TTL_SECONDS, +) -> list[dict[str, Any]]: + """Execute NDJSON requests through singleton session worker. + + Args: + config: Resolved CLI configuration. + request_lines: NDJSON request lines to execute. + allow_model_download: Whether missing models may be downloaded. + idle_ttl_seconds: Idle timeout for auto-started workers. + + Returns: + Response payload dictionaries for all non-empty input lines. + """ + return exec_session_requests_impl( + config=config, + request_lines=request_lines, + allow_model_download=allow_model_download, + idle_ttl_seconds=idle_ttl_seconds, + deps=_ml_dependencies(), + ) + + def run_doctor(*, config: CliConfig, allow_model_download: bool) -> DoctorReport: """Run repository-local health checks for index and embedding readiness. diff --git a/tests/integration/test_session_worker.py b/tests/integration/test_session_worker.py new file mode 100644 index 0000000..7690a51 --- /dev/null +++ b/tests/integration/test_session_worker.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import json +import os +import time +from pathlib import Path + +from docctl.cli import app + + +def _runtime_dir(test_name: str) -> Path: + return Path("/tmp") / f"docctl-session-{os.getpid()}-{test_name}-{time.time_ns()}" + + +def test_session_worker_start_status_stop_singleton(runner, tmp_path) -> None: + _ = tmp_path + runtime_dir = _runtime_dir("start-stop") + env = {"DOCCTL_SESSION_DIR": str(runtime_dir)} + + start_result = runner.invoke(app, ["--json", "session", "start"], env=env) + assert start_result.exit_code == 0, start_result.output + start_payload = json.loads(start_result.output) + assert start_payload["status"] == "running" + pid = start_payload["pid"] + + second_start = runner.invoke(app, ["session", "start"], env=env) + assert second_start.exit_code == 50 + assert "session already running" in second_start.output + + status_result = runner.invoke(app, ["--json", "session", "status"], env=env) + assert status_result.exit_code == 0, status_result.output + status_payload = json.loads(status_result.output) + assert status_payload["status"] == "running" + assert status_payload["pid"] == pid + + stop_result = runner.invoke(app, ["--json", "session", "stop"], env=env) + assert stop_result.exit_code == 0, stop_result.output + stop_payload = json.loads(stop_result.output) + assert stop_payload["status"] == "stopped" + + +def test_session_exec_auto_start_and_config_mismatch(runner, tmp_path) -> None: + _ = tmp_path + runtime_dir = _runtime_dir("exec") + env = {"DOCCTL_SESSION_DIR": str(runtime_dir)} + request_lines = '{"id":"q1","op":"stats"}\n' + + exec_result = runner.invoke(app, ["session", "exec"], input=request_lines, env=env) + assert exec_result.exit_code == 0, exec_result.output + responses = [json.loads(line) for line in exec_result.output.splitlines() if line.strip()] + assert len(responses) == 1 + assert responses[0]["id"] == "q1" + + status_result = runner.invoke(app, ["--json", "session", "status"], env=env) + assert status_result.exit_code == 0, status_result.output + status_payload = json.loads(status_result.output) + assert status_payload["status"] == "running" + + mismatch_result = runner.invoke( + app, + ["--collection", "other", "session", "exec"], + input=request_lines, + env=env, + ) + assert mismatch_result.exit_code == 50 + assert "running session configuration does not match current options" in mismatch_result.output + + stop_result = runner.invoke(app, ["session", "stop"], env=env) + assert stop_result.exit_code == 0, stop_result.output + + +def test_session_worker_idle_ttl_self_terminates(runner, tmp_path) -> None: + _ = tmp_path + runtime_dir = _runtime_dir("ttl") + env = {"DOCCTL_SESSION_DIR": str(runtime_dir)} + + start_result = runner.invoke( + app, + ["--json", "session", "start", "--idle-ttl", "1"], + env=env, + ) + assert start_result.exit_code == 0, start_result.output + start_payload = json.loads(start_result.output) + assert start_payload["status"] == "running" + + deadline = time.time() + 5.0 + while time.time() < deadline: + status_result = runner.invoke(app, ["--json", "session", "status"], env=env) + assert status_result.exit_code == 0, status_result.output + status_payload = json.loads(status_result.output) + if status_payload["status"] == "stopped": + break + time.sleep(0.1) + + final_status = runner.invoke(app, ["--json", "session", "status"], env=env) + assert final_status.exit_code == 0, final_status.output + final_payload = json.loads(final_status.output) + assert final_payload["status"] == "stopped" diff --git a/tests/unit/test_cli_branches.py b/tests/unit/test_cli_branches.py index 7c9ee69..6af254f 100644 --- a/tests/unit/test_cli_branches.py +++ b/tests/unit/test_cli_branches.py @@ -147,6 +147,62 @@ def test_session_command_handles_runtime_exception(runner, monkeypatch: pytest.M assert "session failed" in result.output +def test_session_start_command_handles_runtime_exception( + runner, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + cli, + "start_session_worker", + lambda **kwargs: (_ for _ in ()).throw(RuntimeError("session start failed")), + ) + + result = runner.invoke(cli.app, ["session", "start"]) + assert result.exit_code == 50 + assert "session start failed" in result.output + + +def test_session_exec_command_handles_runtime_exception( + runner, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + cli, + "exec_session_requests", + lambda **kwargs: (_ for _ in ()).throw(RuntimeError("session exec failed")), + ) + + result = runner.invoke(cli.app, ["session", "exec"], input='{"id":"x","op":"stats"}\n') + assert result.exit_code == 50 + assert "session exec failed" in result.output + + +def test_session_status_command_handles_runtime_exception( + runner, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + cli, + "session_worker_status", + lambda **kwargs: (_ for _ in ()).throw(RuntimeError("session status failed")), + ) + + result = runner.invoke(cli.app, ["session", "status"]) + assert result.exit_code == 50 + assert "session status failed" in result.output + + +def test_session_stop_command_handles_runtime_exception( + runner, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + cli, + "stop_session_worker", + lambda: (_ for _ in ()).throw(RuntimeError("session stop failed")), + ) + + result = runner.invoke(cli.app, ["session", "stop"]) + assert result.exit_code == 50 + assert "session stop failed" in result.output + + def test_search_command_rejects_rerank_candidates_below_top_k( runner, monkeypatch: pytest.MonkeyPatch ) -> None: diff --git a/tests/unit/test_session_worker_branches.py b/tests/unit/test_session_worker_branches.py new file mode 100644 index 0000000..9e6e40e --- /dev/null +++ b/tests/unit/test_session_worker_branches.py @@ -0,0 +1,914 @@ +from __future__ import annotations + +import json +from contextlib import contextmanager +from pathlib import Path +from typing import Any + +import pytest + +from docctl import service_session_worker as session_worker +from docctl.config import CliConfig +from docctl.errors import DocctlError, InternalDocctlError +from docctl.service_types import ServiceDependencies + + +def _config(tmp_path: Path) -> CliConfig: + return CliConfig( + index_path=tmp_path / "index", + collection="test", + json_output=False, + verbose=False, + embedding_model="embed", + rerank_model="rerank", + require_write_approval=False, + ) + + +def _deps() -> ServiceDependencies: + return ServiceDependencies( + embedding_factory=lambda **kwargs: object(), + store_factory=lambda **kwargs: object(), + reranker_factory=None, + ) + + +@contextmanager +def _noop_lock(_path: Path): + yield + + +class _ProcessStub: + def __init__(self, *, pid: int | None, alive: bool = True, exitcode: int | None = None) -> None: + self.pid = pid + self._alive = alive + self.exitcode = exitcode + + def is_alive(self) -> bool: + return self._alive + + +def _state(*, socket_path: Path, pid: int = 123) -> session_worker.SessionState: + return session_worker.SessionState( + schema_version=1, + protocol_version=1, + status="running", + pid=pid, + socket_path=str(socket_path), + index_path="/tmp/index", + collection="default", + embedding_model="embed", + rerank_model="rerank", + allow_model_download=False, + idle_ttl_seconds=10, + started_at="2026-01-01T00:00:00+00:00", + last_used_at="2026-01-01T00:00:00+00:00", + expires_at="2026-01-01T00:00:10+00:00", + ) + + +def test_require_posix_and_validate_idle_ttl_errors(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(session_worker.os, "name", "nt", raising=False) + with pytest.raises(DocctlError, match="POSIX systems only"): + session_worker._require_posix() + + with pytest.raises(DocctlError, match="invalid idle ttl"): + session_worker._validate_idle_ttl(0) + + +def test_session_artifacts_falls_back_to_tempdir(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv(session_worker.ENV_SESSION_DIR, raising=False) + monkeypatch.setattr(session_worker.tempfile, "gettempdir", lambda: "/tmp") + monkeypatch.setattr(session_worker.os, "getuid", lambda: 42) + + artifacts = session_worker._session_artifacts() + assert str(artifacts.runtime_dir) == "/tmp/docctl-session-42" + + +def test_stop_session_worker_returns_stopped_when_no_state( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + artifacts = session_worker.SessionArtifacts( + runtime_dir=tmp_path, + state_path=tmp_path / "state.json", + lock_path=tmp_path / "lock", + socket_path=tmp_path / "sock", + ) + calls: dict[str, int] = {"clear": 0} + monkeypatch.setattr(session_worker, "_require_posix", lambda: None) + monkeypatch.setattr(session_worker, "_session_artifacts", lambda: artifacts) + monkeypatch.setattr(session_worker, "_ensure_runtime_dir", lambda runtime_dir: None) + monkeypatch.setattr(session_worker, "_session_lock", _noop_lock) + monkeypatch.setattr(session_worker, "_read_state", lambda **kwargs: None) + monkeypatch.setattr( + session_worker, + "_clear_artifacts_locked", + lambda **kwargs: calls.__setitem__("clear", calls["clear"] + 1), + ) + + payload = session_worker.stop_session_worker() + assert payload == {"status": "stopped"} + assert calls["clear"] == 1 + + +class _ServerStub: + def __init__(self, events: list[object]) -> None: + self._events = events + self.closed = False + + def bind(self, _path: str) -> None: + return None + + def listen(self) -> None: + return None + + def settimeout(self, _timeout: float) -> None: + return None + + def accept(self): # noqa: ANN201 + event = self._events.pop(0) + if isinstance(event, BaseException): + raise event + return event, None + + def close(self) -> None: + self.closed = True + + +class _ServerNeverAccept: + def __init__(self) -> None: + self.closed = False + self.accept_calls = 0 + + def bind(self, _path: str) -> None: + return None + + def listen(self) -> None: + return None + + def settimeout(self, _timeout: float) -> None: + return None + + def accept(self): # noqa: ANN201 + self.accept_calls += 1 + raise AssertionError("accept should not be called when idle timeout already elapsed") + + def close(self) -> None: + self.closed = True + + +class _ConnStub: + def __enter__(self): # noqa: ANN204 + return self + + def __exit__(self, exc_type, exc, tb): # noqa: ANN001, ANN201 + _ = (exc_type, exc, tb) + return False + + +def test_serve_session_worker_handles_timeout_and_socket_error( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + state_path = tmp_path / "state.json" + server = _ServerStub(events=[TimeoutError(), OSError("stop")]) + cleanup_calls: dict[str, int] = {"unlink": 0, "remove_state": 0} + time_values = iter([0.0, 0.1, 0.2]) + + monkeypatch.setattr(session_worker, "_require_posix", lambda: None) + monkeypatch.setattr(session_worker.os, "chmod", lambda _path, _mode: None) + monkeypatch.setattr(session_worker, "SessionRuntime", lambda request, deps: object()) + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: server) + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr( + session_worker, + "_safe_unlink", + lambda **kwargs: cleanup_calls.__setitem__("unlink", cleanup_calls["unlink"] + 1), + ) + monkeypatch.setattr( + session_worker, + "_safe_remove_state_if_owned", + lambda **kwargs: cleanup_calls.__setitem__( + "remove_state", cleanup_calls["remove_state"] + 1 + ), + ) + + session_worker.serve_session_worker( + config=_config(tmp_path), + socket_path=socket_path, + state_path=state_path, + idle_ttl_seconds=10, + allow_model_download=False, + deps=_deps(), + ) + + assert server.closed is True + assert cleanup_calls["unlink"] >= 1 + assert cleanup_calls["remove_state"] == 1 + + +def test_serve_session_worker_updates_last_used_on_handled_connection( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + state_path = tmp_path / "state.json" + server = _ServerStub(events=[_ConnStub(), OSError("stop")]) + update_calls: dict[str, int] = {"count": 0} + time_values = iter([0.0, 0.1, 0.2, 0.3]) + + monkeypatch.setattr(session_worker, "_require_posix", lambda: None) + monkeypatch.setattr(session_worker.os, "chmod", lambda _path, _mode: None) + monkeypatch.setattr(session_worker, "SessionRuntime", lambda request, deps: object()) + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: server) + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr(session_worker, "_safe_unlink", lambda **kwargs: None) + monkeypatch.setattr(session_worker, "_safe_remove_state_if_owned", lambda **kwargs: None) + monkeypatch.setattr(session_worker, "_serve_connection", lambda **kwargs: (True, False)) + monkeypatch.setattr( + session_worker, + "_update_last_used_state", + lambda **kwargs: update_calls.__setitem__("count", update_calls["count"] + 1), + ) + + session_worker.serve_session_worker( + config=_config(tmp_path), + socket_path=socket_path, + state_path=state_path, + idle_ttl_seconds=5, + allow_model_download=False, + deps=_deps(), + ) + + assert update_calls["count"] == 1 + + +def test_serve_session_worker_exits_on_idle_timeout_without_accept( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + state_path = tmp_path / "state.json" + server = _ServerNeverAccept() + cleanup_calls: dict[str, int] = {"unlink": 0, "remove_state": 0} + # Initial last_used=0.0 then loop check sees 10.0-0.0 >= 1 and breaks. + time_values = iter([0.0, 10.0]) + + monkeypatch.setattr(session_worker, "_require_posix", lambda: None) + monkeypatch.setattr(session_worker.os, "chmod", lambda _path, _mode: None) + monkeypatch.setattr(session_worker, "SessionRuntime", lambda request, deps: object()) + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: server) + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr( + session_worker, + "_safe_unlink", + lambda **kwargs: cleanup_calls.__setitem__("unlink", cleanup_calls["unlink"] + 1), + ) + monkeypatch.setattr( + session_worker, + "_safe_remove_state_if_owned", + lambda **kwargs: cleanup_calls.__setitem__( + "remove_state", cleanup_calls["remove_state"] + 1 + ), + ) + + session_worker.serve_session_worker( + config=_config(tmp_path), + socket_path=socket_path, + state_path=state_path, + idle_ttl_seconds=1, + allow_model_download=False, + deps=_deps(), + ) + + assert server.accept_calls == 0 + assert server.closed is True + assert cleanup_calls["remove_state"] == 1 + + +def test_serve_worker_loop_breaks_when_stop_requested( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + server = _ServerStub(events=[_ConnStub()]) + updates: dict[str, int] = {"count": 0} + time_values = iter([0.0, 0.1, 0.2]) + + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr( + session_worker, + "_serve_connection", + lambda **kwargs: (True, True), + ) + monkeypatch.setattr( + session_worker, + "_update_last_used_state", + lambda **kwargs: updates.__setitem__("count", updates["count"] + 1), + ) + + session_worker._serve_worker_loop( + server=server, + runtime=object(), + verbose=False, + state_path=tmp_path / "state.json", + idle_ttl_seconds=10, + ) + + assert updates["count"] == 1 + + +def test_load_running_state_clears_stale_worker( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + artifacts = session_worker.SessionArtifacts( + runtime_dir=tmp_path, + state_path=tmp_path / "state.json", + lock_path=tmp_path / "lock", + socket_path=tmp_path / "sock", + ) + stale_state = _state(socket_path=artifacts.socket_path) + cleared: dict[str, int] = {"count": 0} + + monkeypatch.setattr(session_worker, "_read_state", lambda **kwargs: stale_state) + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: False) + monkeypatch.setattr( + session_worker, + "_clear_artifacts_locked", + lambda **kwargs: cleared.__setitem__("count", cleared["count"] + 1), + ) + + assert session_worker._load_running_state_locked(artifacts=artifacts) is None + assert cleared["count"] == 1 + + +def test_read_state_handles_invalid_content(tmp_path: Path) -> None: + invalid_json = tmp_path / "invalid.json" + invalid_json.write_text("{", encoding="utf-8") + assert session_worker._read_state(path=invalid_json) is None + + invalid_payload = tmp_path / "payload.json" + invalid_payload.write_text(json.dumps([1, 2, 3]), encoding="utf-8") + assert session_worker._read_state(path=invalid_payload) is None + + +def test_state_from_payload_handles_invalid_mapping() -> None: + assert session_worker._state_from_payload(payload={"schema_version": "x"}) is None + + +def test_start_worker_locked_raises_on_missing_pid( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + artifacts = session_worker.SessionArtifacts( + runtime_dir=tmp_path, + state_path=tmp_path / "state.json", + lock_path=tmp_path / "lock", + socket_path=tmp_path / "sock", + ) + binding = session_worker.SessionBinding( + index_path="/tmp/index", + collection="default", + embedding_model="embed", + rerank_model="rerank", + allow_model_download=False, + ) + + monkeypatch.setattr( + session_worker, + "_start_worker_process", + lambda **kwargs: _ProcessStub(pid=None), + ) + + with pytest.raises(InternalDocctlError, match="failed to start"): + session_worker._start_worker_locked( + config=_config(tmp_path), + artifacts=artifacts, + binding=binding, + idle_ttl_seconds=10, + allow_model_download=False, + deps=_deps(), + ) + + +def test_start_worker_locked_handles_start_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + artifacts = session_worker.SessionArtifacts( + runtime_dir=tmp_path, + state_path=tmp_path / "state.json", + lock_path=tmp_path / "lock", + socket_path=tmp_path / "sock", + ) + binding = session_worker.SessionBinding( + index_path="/tmp/index", + collection="default", + embedding_model="embed", + rerank_model="rerank", + allow_model_download=False, + ) + calls: dict[str, int] = {"shutdown": 0, "wait": 0, "clear": 0} + process = _ProcessStub(pid=33) + + monkeypatch.setattr(session_worker, "_start_worker_process", lambda **kwargs: process) + monkeypatch.setattr(session_worker, "_wait_for_socket_ready", lambda **kwargs: False) + monkeypatch.setattr( + session_worker, + "_request_worker_shutdown", + lambda **kwargs: calls.__setitem__("shutdown", calls["shutdown"] + 1), + ) + monkeypatch.setattr( + session_worker, + "_wait_for_socket_shutdown", + lambda **kwargs: calls.__setitem__("wait", calls["wait"] + 1), + ) + monkeypatch.setattr( + session_worker, + "_clear_artifacts_locked", + lambda **kwargs: calls.__setitem__("clear", calls["clear"] + 1), + ) + + with pytest.raises(InternalDocctlError, match="failed to start"): + session_worker._start_worker_locked( + config=_config(tmp_path), + artifacts=artifacts, + binding=binding, + idle_ttl_seconds=10, + allow_model_download=False, + deps=_deps(), + ) + assert calls == {"shutdown": 1, "wait": 1, "clear": 1} + + +def test_wait_for_socket_ready_dead_or_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + dead = _ProcessStub(pid=1, alive=False, exitcode=1) + assert session_worker._wait_for_socket_ready(process=dead, socket_path=tmp_path / "s") is False + + alive = _ProcessStub(pid=1, alive=True, exitcode=None) + time_values = iter([0.0, 0.1, 100.0]) + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr(session_worker.time, "sleep", lambda _delay: None) + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: False) + assert session_worker._wait_for_socket_ready(process=alive, socket_path=tmp_path / "s") is False + + +class _ReaderStub: + def __init__(self, lines: list[str]) -> None: + self._lines = lines + self.closed = False + + def readline(self) -> str: + if not self._lines: + return "" + return self._lines.pop(0) + + def __iter__(self): # noqa: ANN204 + return iter(self._lines) + + def close(self) -> None: + self.closed = True + + +class _WriterStub: + def __init__(self) -> None: + self.buffer: list[str] = [] + self.closed = False + + def write(self, value: str) -> None: + self.buffer.append(value) + + def flush(self) -> None: + return None + + def close(self) -> None: + self.closed = True + + +class _ClientSocketStub: + def __init__(self, reader: _ReaderStub, writer: _WriterStub) -> None: + self._reader = reader + self._writer = writer + + def __enter__(self): # noqa: ANN204 + return self + + def __exit__(self, exc_type, exc, tb): # noqa: ANN001, ANN201 + _ = (exc_type, exc, tb) + return False + + def settimeout(self, _timeout: float) -> None: + return None + + def connect(self, _path: str) -> None: + return None + + def makefile(self, mode: str, encoding: str): # noqa: ANN001, ANN201 + _ = encoding + if mode == "r": + return self._reader + return self._writer + + +class _TimeoutClientSocketStub(_ClientSocketStub): + def __init__(self, reader: _ReaderStub, writer: _WriterStub) -> None: + super().__init__(reader, writer) + self.timeouts: list[float | None] = [] + + def settimeout(self, timeout: float | None) -> None: + self.timeouts.append(timeout) + + +def test_send_requests_over_socket_handles_blank_and_closed_connection( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + blank_reader = _ReaderStub(lines=[]) + blank_writer = _WriterStub() + monkeypatch.setattr( + session_worker.socket, + "socket", + lambda *args, **kwargs: _ClientSocketStub(blank_reader, blank_writer), + ) + responses = session_worker._send_requests_over_socket( + socket_path=tmp_path / "sock", + request_lines=[" "], + ) + assert responses == [] + + closed_reader = _ReaderStub(lines=[""]) + closed_writer = _WriterStub() + monkeypatch.setattr( + session_worker.socket, + "socket", + lambda *args, **kwargs: _ClientSocketStub(closed_reader, closed_writer), + ) + with pytest.raises(InternalDocctlError, match="connection closed unexpectedly"): + session_worker._send_requests_over_socket( + socket_path=tmp_path / "sock", + request_lines=['{"id":"x","op":"stats"}'], + ) + + +def test_send_requests_over_socket_sets_blocking_after_connect( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + reader = _ReaderStub(lines=['{"id":"x","ok":true,"result":{}}\n']) + writer = _WriterStub() + client = _TimeoutClientSocketStub(reader, writer) + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: client) + + responses = session_worker._send_requests_over_socket( + socket_path=tmp_path / "sock", + request_lines=['{"id":"x","op":"stats"}'], + ) + + assert responses == [{"id": "x", "ok": True, "result": {}}] + assert client.timeouts == [session_worker.SESSION_CONNECT_TIMEOUT_SECONDS, None] + + +def test_parse_worker_response_validation_errors() -> None: + with pytest.raises(InternalDocctlError, match="invalid JSON"): + session_worker._parse_worker_response(line="{") + with pytest.raises(InternalDocctlError, match="invalid response payload"): + session_worker._parse_worker_response(line='["not-a-dict"]') + + +def test_socket_reachable_error_branch(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + socket_path = tmp_path / "s.sock" + socket_path.write_text("x", encoding="utf-8") + + class _ProbeSocket: + def __enter__(self): # noqa: ANN204 + return self + + def __exit__(self, exc_type, exc, tb): # noqa: ANN001, ANN201 + _ = (exc_type, exc, tb) + return False + + def settimeout(self, timeout: float) -> None: + _ = timeout + + def connect(self, path: str) -> None: + _ = path + raise OSError("unreachable") + + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: _ProbeSocket()) + assert session_worker._socket_reachable(path=socket_path) is False + + +def test_request_worker_shutdown_returns_when_socket_unreachable( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: False) + session_worker._request_worker_shutdown(socket_path=socket_path) + + +def test_request_worker_shutdown_ignores_socket_errors( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: True) + + class _FailingSocket: + def __enter__(self): # noqa: ANN204 + return self + + def __exit__(self, exc_type, exc, tb): # noqa: ANN001, ANN201 + _ = (exc_type, exc, tb) + return False + + def settimeout(self, timeout: float) -> None: + _ = timeout + + def connect(self, path: str) -> None: + _ = path + raise OSError("cannot connect") + + monkeypatch.setattr(session_worker.socket, "socket", lambda *args, **kwargs: _FailingSocket()) + session_worker._request_worker_shutdown(socket_path=socket_path) + + +def test_request_worker_shutdown_sends_control_message( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: True) + reader = _ReaderStub(lines=['{"id":"x","ok":true}\n']) + writer = _WriterStub() + monkeypatch.setattr( + session_worker.socket, + "socket", + lambda *args, **kwargs: _ClientSocketStub(reader, writer), + ) + session_worker._request_worker_shutdown(socket_path=socket_path) + serialized = "".join(writer.buffer) + assert session_worker.SESSION_CONTROL_STOP_OP in serialized + + +def test_wait_for_socket_shutdown_returns_on_disconnect( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + states = iter([True, False]) + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: next(states)) + monkeypatch.setattr(session_worker.time, "time", lambda: 0.0) + monkeypatch.setattr(session_worker.time, "sleep", lambda _delay: None) + session_worker._wait_for_socket_shutdown(socket_path=socket_path, timeout_seconds=1.0) + + +def test_wait_for_socket_shutdown_times_out( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + socket_path = tmp_path / "worker.sock" + time_values = iter([0.0, 0.1, 2.0]) + monkeypatch.setattr(session_worker.time, "time", lambda: next(time_values)) + monkeypatch.setattr(session_worker, "_socket_reachable", lambda **kwargs: True) + session_worker._wait_for_socket_shutdown(socket_path=socket_path, timeout_seconds=1.0) + + +def test_safe_unlink_and_safe_remove_state_if_owned( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setattr( + Path, + "unlink", + lambda self: (_ for _ in ()).throw(OSError("cannot unlink")), + ) + session_worker._safe_unlink(path=tmp_path / "missing") + + state_path = tmp_path / "state.json" + removed: dict[str, int] = {"count": 0} + monkeypatch.setattr(session_worker, "_read_state", lambda **kwargs: None) + monkeypatch.setattr( + session_worker, + "_safe_unlink", + lambda **kwargs: removed.__setitem__("count", removed["count"] + 1), + ) + session_worker._safe_remove_state_if_owned(path=state_path, pid=7) + assert removed["count"] == 0 + + monkeypatch.setattr( + session_worker, + "_read_state", + lambda **kwargs: _state(socket_path=tmp_path / "sock", pid=7), + ) + session_worker._safe_remove_state_if_owned(path=state_path, pid=7) + assert removed["count"] == 1 + + +def test_serve_connection_and_update_last_used_state( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + reader = _ReaderStub(lines=[" ", '{"id":"x","op":"stats"}\n']) + writer = _WriterStub() + + class _Connection: + def makefile(self, mode: str, encoding: str): # noqa: ANN001, ANN201 + _ = encoding + return reader if mode == "r" else writer + + responses = iter([None, {"id": "x", "ok": True, "result": {}}]) + monkeypatch.setattr( + session_worker, + "run_session_request_line", + lambda **kwargs: next(responses), + ) + + used, stop_requested = session_worker._serve_connection( + connection=_Connection(), + runtime=object(), + verbose=False, + ) + assert used is True + assert stop_requested is False + assert writer.closed is True + assert reader.closed is True + assert any('"ok":true' in part for part in writer.buffer) + + state_path = tmp_path / "state.json" + monkeypatch.setattr(session_worker, "_read_state", lambda **kwargs: None) + session_worker._update_last_used_state( + state_path=state_path, + pid=1, + idle_ttl_seconds=10, + last_used=0.0, + ) + + writes: dict[str, Any] = {} + monkeypatch.setattr( + session_worker, + "_read_state", + lambda **kwargs: _state(socket_path=tmp_path / "sock", pid=2), + ) + monkeypatch.setattr( + session_worker, + "_write_state", + lambda **kwargs: writes.update(kwargs), + ) + monkeypatch.setattr( + session_worker, "_datetime_to_iso", lambda **kwargs: "2026-01-01T00:00:01+00:00" + ) + monkeypatch.setattr(session_worker, "_expires_at", lambda **kwargs: "2026-01-01T00:00:11+00:00") + + session_worker._update_last_used_state( + state_path=state_path, + pid=2, + idle_ttl_seconds=10, + last_used=1.0, + ) + assert "state" in writes + assert writes["state"].last_used_at == "2026-01-01T00:00:01+00:00" + + control_reader = _ReaderStub( + lines=[ + json.dumps( + { + "id": session_worker.SESSION_CONTROL_STOP_ID, + "op": session_worker.SESSION_CONTROL_STOP_OP, + } + ) + + "\n" + ] + ) + control_writer = _WriterStub() + + class _ControlConnection: + def makefile(self, mode: str, encoding: str): # noqa: ANN001, ANN201 + _ = encoding + return control_reader if mode == "r" else control_writer + + used, stop_requested = session_worker._serve_connection( + connection=_ControlConnection(), + runtime=object(), + verbose=False, + ) + assert used is True + assert stop_requested is True + assert any("stopping" in value for value in control_writer.buffer) + + +def test_serve_connection_handles_broken_pipe_without_crashing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + reader = _ReaderStub(lines=['{"id":"x","op":"stats"}\n']) + + class _BrokenPipeWriter: + def __init__(self) -> None: + self.buffer: list[str] = [] + self.closed = False + + def write(self, value: str) -> None: + self.buffer.append(value) + + def flush(self) -> None: + raise BrokenPipeError("client disconnected") + + def close(self) -> None: + self.closed = True + raise BrokenPipeError("client disconnected") + + writer = _BrokenPipeWriter() + + class _Connection: + def makefile(self, mode: str, encoding: str): # noqa: ANN001, ANN201 + _ = encoding + return reader if mode == "r" else writer + + monkeypatch.setattr( + session_worker, + "run_session_request_line", + lambda **kwargs: {"id": "x", "ok": True, "result": {}}, + ) + + used, stop_requested = session_worker._serve_connection( + connection=_Connection(), + runtime=object(), + verbose=False, + ) + + assert used is True + assert stop_requested is False + assert reader.closed is True + assert writer.closed is True + + +def test_serve_connection_control_stop_handles_broken_pipe_ack( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _ = monkeypatch + reader = _ReaderStub( + lines=[ + json.dumps( + { + "id": session_worker.SESSION_CONTROL_STOP_ID, + "op": session_worker.SESSION_CONTROL_STOP_OP, + } + ) + + "\n" + ] + ) + + class _BrokenPipeWriter: + def __init__(self) -> None: + self.buffer: list[str] = [] + self.closed = False + + def write(self, value: str) -> None: + self.buffer.append(value) + + def flush(self) -> None: + raise BrokenPipeError("client disconnected") + + def close(self) -> None: + self.closed = True + raise BrokenPipeError("client disconnected") + + writer = _BrokenPipeWriter() + + class _Connection: + def makefile(self, mode: str, encoding: str): # noqa: ANN001, ANN201 + _ = encoding + return reader if mode == "r" else writer + + used, stop_requested = session_worker._serve_connection( + connection=_Connection(), + runtime=object(), + verbose=False, + ) + + assert used is True + assert stop_requested is True + assert reader.closed is True + assert writer.closed is True + + +def test_run_worker_process_setsid_and_delegates( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + calls: dict[str, int] = {"setsid": 0, "serve": 0} + + monkeypatch.setattr(session_worker.os, "name", "posix", raising=False) + monkeypatch.setattr( + session_worker.os, + "setsid", + lambda: calls.__setitem__("setsid", calls["setsid"] + 1), + ) + monkeypatch.setattr( + session_worker, + "serve_session_worker", + lambda **kwargs: calls.__setitem__("serve", calls["serve"] + 1), + ) + + session_worker._run_worker_process( + config=_config(tmp_path), + socket_path=tmp_path / "sock", + state_path=tmp_path / "state.json", + idle_ttl_seconds=5, + allow_model_download=False, + deps=_deps(), + ) + + assert calls == {"setsid": 1, "serve": 1} + + +def test_is_control_stop_request_invalid_payloads() -> None: + assert session_worker._is_control_stop_request(raw_line="not-json\n") is False + assert session_worker._is_control_stop_request(raw_line='["x"]\n') is False