Skip to content

refactor(python_executors): replace WorkerPool with loky reusable executor#14

Open
GrigoryEvko wants to merge 10 commits into
FusionBrainLab:mainfrom
GrigoryEvko:loky-executor
Open

refactor(python_executors): replace WorkerPool with loky reusable executor#14
GrigoryEvko wants to merge 10 commits into
FusionBrainLab:mainfrom
GrigoryEvko:loky-executor

Conversation

@GrigoryEvko
Copy link
Copy Markdown

@GrigoryEvko GrigoryEvko commented May 15, 2026

The hand-rolled WorkerPool (persistent subprocess pool over a length-prefixed cloudpickle stdin/stdout protocol) is replaced with loky.get_reusable_executor. Results are spilled to a per-call file on disk and read in the parent via mmap, so per-call memory cost is bounded by free disk space rather than parent RAM. Workers spawn lazily, are reused across calls, and on Linux receive PR_SET_PDEATHSIG so they die with the parent.

Known defects closed by construction:

  • WorkerPool never recycled workers; per-worker state accumulated indefinitely.
  • The default-pool singleton was lru_cache over asyncio.Queue/asyncio.Lock, binding the pool to the first event loop and breaking on subsequent loops (pytest-asyncio reset, repeated asyncio.run).
  • Worker timeout silently fell through to a one-shot subprocess retry, doubling wall time and hiding the timeout from callers.
  • MemoryLimitExceeded was a \"MemoryError\" in stderr substring heuristic that mislabelled in-validator OOMs, KeyError(\"MemoryError\"), and unrelated library messages.
  • Worker stderr=PIPE was never drained; parent RSS leaked unbounded.
  • _kill_process_tree's bare proc.kill() could target reused PIDs after a worker exit.
  • _run_via_worker swallowed cloudpickle.loads errors and discarded captured stderr on TimeoutError/IncompleteReadError.
  • get_worker awaited subprocess spawn while holding the pool lock, serialising the supposed fan-out.
  • cloudpickle.register_pickle_by_value(mod) accumulation concerns (verified bounded after refactor).

Defects explicitly fixed (locked in by regression tests):

  • ExecRunnerError.__str__ dropped stderr; now formatted into the exception message so log lines carry the user traceback.
  • execution.py truncated error messages to 200 chars; now logged in full.
  • User-code os.chdir()/sys.path.insert() calls persisted across worker reuse, leaking cwd and path mutations into the next task.
  • Loky's env= kwarg is additive, not replace — non-whitelisted env keys are now explicitly dropped in _worker_init, so ambient API tokens in the parent's environment do not reach user code.
  • TOCTOU symlink-swap on the spill path allowed arbitrary cloudpickle bytes to be unpickled by the parent (RCE vector); fixed with O_RDONLY | O_NOFOLLOW + post-open fstat.
  • Default spill directory tightened from bare \$TMPDIR to \$TMPDIR/gigaevo-<uid> with mode 0o700.
  • Spill-file leak on interpreter shutdown when shutdown_executor(wait=False) raced loky's executor-manager thread.
  • Loky pool thrash on os.environ mutation (kwargs inequality triggered pool teardown on every submit); scrubbed env is now cached once at import.
  • fd leak when os.fdopen raised mid-spill-write.
  • Stale type-ignore + 200-char truncation in execution.py.

Behaviour improvements:

  • Per-call result size bounded by free disk space, not parent RAM (mmap + protocol-5 cloudpickle).
  • Hard kill on timeout; no silent retry.
  • Default signal dispositions inside workers (KeyboardInterrupt on SIGINT, SIG_DFL on SIGTERM/SIGHUP/SIGQUIT) — user code sees standard Python semantics regardless of what loky installs internally.
  • Env whitelist drops ambient secrets: AWS_*, GH_TOKEN/GITHUB_TOKEN, OPENAI_API_KEY, ANTHROPIC_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, HF_TOKEN, STRIPE_*, SUPABASE_*, and everything else not in the explicit whitelist or under the GIGAEVO_*/LOKY_* prefixes.
  • PR_SET_PDEATHSIG on Linux replaces the ps-grep orphan-reaper in tools/flush.py.
  • Per-call worker accounting (peak RSS, wall time, user/sys time, pid) collected and logged at trace level.
  • max_workers auto-caps to cpu_count // xdist_workers under pytest-xdist to avoid the N×cpu_count fork-bomb on big hosts.
  • shutdown_executor blocks on the executor-manager thread when called with wait=True, so spill-cleanup done-callbacks fire before interpreter exit.

Public API surface:

  • run_exec_runner returns Any directly (was (value, b\"\", \"\") 3-tuple).
  • Dropped kwargs: cwd, runner_path (never used), max_memory_mb, max_output_size (replaced by disk-spilling).
  • Dropped attribute: ExecRunnerError.stdout_bytes (was always b\"\").
  • Constants MAX_MEMORY_MB/MAX_OUTPUT_SIZE removed from gigaevo.entrypoint.constants.
  • WorkerPool, default_exec_runner_pool, _run_via_worker, _start_worker_process, _kill_process_tree, _monitor_rss_limit removed entirely.

New env-var configuration:

  • GIGAEVO_EXECUTOR_MAX_WORKERS — override loky's max_workers.
  • GIGAEVO_EXECUTOR_IDLE_TIMEOUT_S — idle worker reap timeout (default 300s).
  • GIGAEVO_EXECUTOR_SPILL_DIR — where result pickles spill (default `$TMPDIR/gigaevo-`).

New dependency: `loky>=3.5`.

…_executor

Drops the hand-rolled length-prefixed cloudpickle subprocess protocol in
favour of loky's reusable process executor.  Results are spilled to disk
and read in the parent via mmap, so per-call memory cost is bounded by
free disk space rather than parent RAM.  Workers spawn lazily, share state
across calls, and on Linux receive PR_SET_PDEATHSIG so they die with the
parent (replacing the ps-grep orphan reaper).

Public ``run_exec_runner`` signature: dropped ``cwd``/``runner_path``
(never used), dropped ``max_memory_mb``/``max_output_size`` (replaced by
disk spilling), and dropped the legacy ``(value, b"", "")`` 3-tuple
return — now returns the value directly.

Closes by construction: worker recycling concerns, asyncio.Queue/Lock
event-loop binding via lru_cache, the silent one-shot retry on timeout,
the MemoryLimitExceeded string-match heuristic, undrained worker stderr,
PID-reuse races in process-tree kill, and unbounded register_pickle_by_value.

Regression tests cover: ExecRunnerError formatting includes stderr,
no memory-error string heuristic, multi-event-loop safety, timeout
without silent retry, spill-file cleanup, env scrubbing, worker
observability envelope, unpicklable-result handling, mmap large-array
round-trip, and direct exec_runner._run_one library coverage.

Tooling: ruff and ty clean; pytest-xdist compatible.
Replaces untyped ``dict[str, Any]`` payloads and error envelopes with
``WorkerCall``, ``WorkerError``, and ``WorkerResult`` frozen dataclasses.
Adds ``ExecutorConfig`` with a ``from_env()`` factory so the env-var
knobs are gathered in one declarative place rather than scattered as
module-level mutables.

Replaces the ``_apply_env`` / ``_restore_env`` mutation pair with a
``_scoped_env`` context manager so env updates are guaranteed to be
reverted on exception paths and the call-site reads top-down.

No behaviour change — public ``run_exec_runner`` signature unchanged;
loky executor lifecycle, spill semantics, env scrubbing, and worker
initialiser all identical.
…ill_dir fixture

The previous ``env=_scrub_env(...)`` arg to ``get_reusable_executor`` was
additive — workers still inherited every parent env var, including
secrets — because loky merges the dict on top of the inherited
environment rather than replacing it.  Strip non-whitelisted keys
explicitly in ``_worker_init`` so the worker's os.environ is exactly
the scrubbed set after startup.

Update the ``isolated_spill_dir`` fixture to monkeypatch the new
``_CONFIG`` ``ExecutorConfig`` instance (the previous ``SPILL_DIR``
module attribute was renamed in the dataclass refactor).

All 51 executor-suite tests now pass.
Loky installs internal signal handlers on its workers (typically blocking
SIGINT and installing a custom SIGTERM disposition) so the pool can
manage worker lifecycle.  This breaks user code that wants standard
Python semantics — KeyboardInterrupt on Ctrl-C, immediate termination
on SIGTERM/SIGHUP/SIGQUIT, BrokenPipeError on SIGPIPE.

Reset SIGINT to Python's ``default_int_handler`` (which raises
KeyboardInterrupt — caught by ``_run_one``'s ``except BaseException``
and surfaced as a structured worker error) and SIGTERM/SIGHUP/SIGQUIT/
SIGUSR1/SIGUSR2/SIGCHLD to ``SIG_DFL`` in ``_worker_init``.  Leave
SIGPIPE alone so Python's default ``SIG_IGN`` + BrokenPipeError flow
remains intact.

Two regression tests verify SIGINT and SIGTERM dispositions from inside
the worker.
Folds the two-line ``_env_int_or`` / ``_env_path_or`` helpers into
``ExecutorConfig.from_env`` as a single nested ``_pos_int`` since they
were each used once and only at module-import time.

Drops a stale ``# type: ignore[assignment]`` in the (now context-manager-
backed) env restoration path — ty flags it as unused since the
expression is well-typed.

All 53 executor tests still pass; ruff and ty clean.
…, env-scrub hardening

Three parallel audit passes surfaced fixable defects and added regression
tests.  All survive ruff, ty, and the full stages test suite (839 tests).

Correctness / concurrency:
- Cache the scrubbed worker env in a module-global ``_WORKER_ENV`` and
  pass the same dict identity to every ``get_reusable_executor`` call.
  Previously a fresh dict was built per call from ``os.environ``; loky
  treats kwargs inequality as "config changed" and tears the pool down
  on every submit when any whitelisted var mutated, breaking concurrent
  in-flight tasks with ``BrokenProcessPool``.
- Guard ``os.fdopen(fd, "wb")`` in ``_run_task`` so a fail at fdopen
  doesn't leak the descriptor.
- Move ``cloudpickle.register_pickle_by_value`` outside ``_scoped_env`` —
  it's a process-global registry mutation unrelated to env updates.

Security:
- Default spill dir is now ``$TMPDIR/gigaevo-<uid>`` (was bare
  ``$TMPDIR``), created with mode 0o700 — defeats directory-listing
  workload-metadata leaks on shared hosts.
- ``_load_spill`` opens with ``O_RDONLY | O_NOFOLLOW`` and stats via
  ``fstat`` post-open — closes a TOCTOU symlink-swap window that allowed
  arbitrary cloudpickle bytes to be unpickled by the parent (RCE).
- ``ExecutorConfig.from_env`` resolves ``..`` segments via
  ``Path.resolve(strict=False)``.
- ``_worker_init``'s prctl call now logs errno on failure instead of
  silently swallowing.
- Module docstring documents the trust boundary: user code is unsandboxed
  by design; the trust boundary lives at the LLM call upstream.

Tests added (now 71 in the executor file, 839 total in stages):
- 13 parametrised secret-scrub assertions (AWS_*, GH_TOKEN, OPENAI_API_KEY,
  ANTHROPIC_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, HF_TOKEN, STRIPE_*,
  SUPABASE_SERVICE_ROLE_KEY, and more).
- ``TestSpillDirHardening`` — default-dir-per-uid + ``..`` resolution.
- ``TestPythonPathPropagation`` — extra python_path reaches the worker.
- ``TestEnvUpdatesNoneUnsets`` — ``env_updates={K: None}`` unsets K.
- ``TestCancellationCleansSpill`` — cancellation path unlinks spill file.

Deferred (need design discussion, not in this PR):
- RLIMIT_NPROC / RLIMIT_FSIZE / RLIMIT_CORE / RLIMIT_AS opt-in caps.
- Surface ``WorkerResult`` resource accounting through the existing
  ``LogWriter`` pattern.
…auto-cap, blocking shutdown

Three parallel audits in less-trafficked corners (serialization, fork/spawn
inheritance, async lifecycle / pytest-xdist interaction) surfaced real
hazards plus several debunked concerns.  Documents and tests added for
both kinds so future readers see what's been verified.

Worker isolation (HIGH):
- `os.chdir()` and `sys.path.insert()` calls inside user code persisted
  across worker reuse — task B inherited task A's cwd and path prepends.
  ``_run_one`` now snapshots ``os.getcwd()`` and ``list(sys.path)`` on
  entry and restores in ``finally``.  Two regression tests pin this.

Async lifecycle / xdist:
- Under ``pytest-xdist`` each worker process imported wrapper.py and
  defaulted to ``cpu_count`` loky workers — on a 28-CPU / 8-xdist-worker
  host this fan-out is 224 subprocesses.  ``ExecutorConfig.from_env``
  now reads ``PYTEST_XDIST_WORKER_COUNT`` and auto-caps to
  ``max(1, cpu_count // xdist_workers)`` when the user hasn't set an
  explicit override.
- ``shutdown_executor`` gained a keyword-only ``wait: bool = False``
  parameter; ``run.py``'s finally block uses ``wait=True`` so loky's
  executor-manager thread reaps workers and fires done-callbacks
  (including ``_unlink_spill_on_done``) before redis/writer close.
  The session-scoped pytest fixture also uses ``wait=True`` so xdist
  workers don't exit while their loky children are mid-SIGKILL.

Serialization / inheritance (debunked but documented):
- ``context="loky"`` actually uses ``fork_exec(close_fds=True)`` followed
  by a re-exec — spawn semantics, not fork.  Workers inherit none of
  the parent's Redis sockets, langfuse handlers, asyncio loop state,
  loguru sinks, or atexit hooks.  Module docstring rewritten to spell
  this out so future readers don't repeat the audit.
- Protocol-5 cloudpickle without ``buffer_callback`` is verified safe —
  ``BYTEARRAY8`` opcodes copy into freshly allocated buffers, so the
  unpickled result does not alias the spill mmap.  ``buffer_callback``
  could shrink ML-payload IPC but introduces dangling-reference hazards;
  deferred until ML workloads dominate.
- ``cloudpickle._PICKLE_BY_VALUE_MODULES`` is a set keyed by name; with
  ``user_code`` reused every call there's no monotonic growth.
- ``sys.path`` accumulation, ``sys.modules`` shadowing, ``linecache``
  growth, ``register_pickle_by_value`` idempotency — all verified bounded.

Tests added (now 93 in the executor file, 861 in stages tree):
- 13 serialization regression tests (cyclic structures, structured
  dtypes, numpy memmap, closures over user classes, instances of inner
  classes, mmap-UAF stress, envelope round-trip, unpicklable result).
- 2 worker-isolation tests (cwd leak, sys.path leak).
- 5 xdist auto-cap tests + 2 ``shutdown_executor(wait=...)`` tests.

All pass under ``pytest -n 4`` xdist as well as serial execution.
…ction banners

Aggressive comment hygiene pass on a too-verbose diff.  The PR description
covers all the audit rationale; source-level comments should be one-line
WHY hints, not paragraph-length narrative.

- wrapper.py module docstring: 109 lines → 12 (drop verbose
  ``Spill backing storage`` / ``Result pickle protocol`` / ``Worker state
  lifecycle`` / ``Start method`` sections — they belong in PR body).
- ``ExecutorConfig.from_env`` docstring + inline comments: ~50 lines → 8.
- ``_worker_init`` / ``shutdown_executor`` / ``_load_spill`` docstrings
  trimmed to behavioural one-liners.
- ``_get_executor`` comment about lazy env capture: 6 lines → 0 (moved
  context to ``_WORKER_ENV`` decl above).
- ``run.py`` finally-block comment: 7 lines → 1.
- ``conftest.py`` session-fixture / ``isolated_spill_dir`` / ``fresh_executor``
  docstrings trimmed.
- ``exec_runner.py`` ``_run_one`` docstring + inline comments: 22 lines → 7.
  ``register_pickle_by_value`` rationale: 6 lines → 2.  ``WorkerError``
  docstring: 6 lines → 1.
- ``test_python_executors.py``: drop 4 ``# ===== section ====== `` banners
  (class names are the section headers), trim 12 regression-class docstrings
  to one-line summaries, drop bug-id list from module docstring.

Net: 114 insertions / 452 deletions on the diff.  Behavior unchanged;
93/93 executor tests pass.
Six surgical reverts on lines the architectural refactor never needed to
touch.  Each is pure noise (variable rename, inlined locals, dropped blank
lines, dropped pre-existing docstrings / section banners) that earlier
trim passes introduced when they should have left those lines alone.

exec_runner.py:
- ``_prepend_sys_path``: restore ``normalized_existing`` name and the
  ``| None`` type union; drop the added docstring.
- ``_iter_top_level_module_names``: restore blank line after ``return set()``.
- ``_module_belongs_to_path``: restore ``file_candidate``/``package_candidate``
  locals and the blank line.
- ``_clear_shadowed_top_level_modules``: restore the ``| None`` union and
  the blank line; drop the added docstring.
- ``_write_code_context``: drop added docstring; restore
  ``last = user_frames[-1]; lineno = last.lineno`` two-line read.
- ``_register_source``: drop added docstring.
- Imports: collapse ``import contextlib`` + ``from contextlib import …``
  into a single ``from contextlib import …, suppress`` line.

test_python_executors.py:
- Restore the module docstring's pre-existing one-line shape (was inflated
  to a paragraph).
- Restore the five ``# --- section ---`` banners stripped by earlier
  trim passes (TestRunExecRunner, TestRunExecRunnerErrors,
  TestExecRunnerErrorAttributes, TestPythonCodeExecutorStage,
  TestPythonCodeExecutorErrorPaths, TestCallFileFunctionStage,
  TestCallProgramFunctionWithFixedArgs, TestFetchMetricsAndFetchArtifact,
  TestCallValidatorFunction).
- Restore one-line docstrings on pre-existing tests that didn't need to
  change otherwise; restore inline annotations (e.g. ``# 1KB limit``,
  ``# syntax error``, ``# Create a minimal valid file...``).
- Restore the ``# Should return a ProgramStageResult failure, not raise``
  comment + late-import order in test_compute_failure_returns_stage_result.

Each restoration shrinks the diff by turning a ``+/-`` pair into an
unchanged line.  All 93 executor tests still pass.
…sses

Tests in the added classes (TestSerializationDistantCorners,
TestProtocolFiveSafety, TestXdistWorkerCountAutoCap,
TestShutdownExecutorWaitFlag) had paragraph-length docstrings explaining
the audit context behind each probe.  That context is now in the PR
description; the test name plus a one-line behavioural docstring is
enough for the in-source reader.  Test bodies and assertions unchanged.

Saved ~70 LOC of explanatory prose; 93/93 executor tests still pass.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant