refactor(python_executors): replace WorkerPool with loky reusable executor#14
Open
GrigoryEvko wants to merge 10 commits into
Open
refactor(python_executors): replace WorkerPool with loky reusable executor#14GrigoryEvko wants to merge 10 commits into
GrigoryEvko wants to merge 10 commits into
Conversation
…_executor Drops the hand-rolled length-prefixed cloudpickle subprocess protocol in favour of loky's reusable process executor. Results are spilled to disk and read in the parent via mmap, so per-call memory cost is bounded by free disk space rather than parent RAM. Workers spawn lazily, share state across calls, and on Linux receive PR_SET_PDEATHSIG so they die with the parent (replacing the ps-grep orphan reaper). Public ``run_exec_runner`` signature: dropped ``cwd``/``runner_path`` (never used), dropped ``max_memory_mb``/``max_output_size`` (replaced by disk spilling), and dropped the legacy ``(value, b"", "")`` 3-tuple return — now returns the value directly. Closes by construction: worker recycling concerns, asyncio.Queue/Lock event-loop binding via lru_cache, the silent one-shot retry on timeout, the MemoryLimitExceeded string-match heuristic, undrained worker stderr, PID-reuse races in process-tree kill, and unbounded register_pickle_by_value. Regression tests cover: ExecRunnerError formatting includes stderr, no memory-error string heuristic, multi-event-loop safety, timeout without silent retry, spill-file cleanup, env scrubbing, worker observability envelope, unpicklable-result handling, mmap large-array round-trip, and direct exec_runner._run_one library coverage. Tooling: ruff and ty clean; pytest-xdist compatible.
Replaces untyped ``dict[str, Any]`` payloads and error envelopes with ``WorkerCall``, ``WorkerError``, and ``WorkerResult`` frozen dataclasses. Adds ``ExecutorConfig`` with a ``from_env()`` factory so the env-var knobs are gathered in one declarative place rather than scattered as module-level mutables. Replaces the ``_apply_env`` / ``_restore_env`` mutation pair with a ``_scoped_env`` context manager so env updates are guaranteed to be reverted on exception paths and the call-site reads top-down. No behaviour change — public ``run_exec_runner`` signature unchanged; loky executor lifecycle, spill semantics, env scrubbing, and worker initialiser all identical.
…ill_dir fixture The previous ``env=_scrub_env(...)`` arg to ``get_reusable_executor`` was additive — workers still inherited every parent env var, including secrets — because loky merges the dict on top of the inherited environment rather than replacing it. Strip non-whitelisted keys explicitly in ``_worker_init`` so the worker's os.environ is exactly the scrubbed set after startup. Update the ``isolated_spill_dir`` fixture to monkeypatch the new ``_CONFIG`` ``ExecutorConfig`` instance (the previous ``SPILL_DIR`` module attribute was renamed in the dataclass refactor). All 51 executor-suite tests now pass.
Loky installs internal signal handlers on its workers (typically blocking SIGINT and installing a custom SIGTERM disposition) so the pool can manage worker lifecycle. This breaks user code that wants standard Python semantics — KeyboardInterrupt on Ctrl-C, immediate termination on SIGTERM/SIGHUP/SIGQUIT, BrokenPipeError on SIGPIPE. Reset SIGINT to Python's ``default_int_handler`` (which raises KeyboardInterrupt — caught by ``_run_one``'s ``except BaseException`` and surfaced as a structured worker error) and SIGTERM/SIGHUP/SIGQUIT/ SIGUSR1/SIGUSR2/SIGCHLD to ``SIG_DFL`` in ``_worker_init``. Leave SIGPIPE alone so Python's default ``SIG_IGN`` + BrokenPipeError flow remains intact. Two regression tests verify SIGINT and SIGTERM dispositions from inside the worker.
Folds the two-line ``_env_int_or`` / ``_env_path_or`` helpers into ``ExecutorConfig.from_env`` as a single nested ``_pos_int`` since they were each used once and only at module-import time. Drops a stale ``# type: ignore[assignment]`` in the (now context-manager- backed) env restoration path — ty flags it as unused since the expression is well-typed. All 53 executor tests still pass; ruff and ty clean.
…, env-scrub hardening
Three parallel audit passes surfaced fixable defects and added regression
tests. All survive ruff, ty, and the full stages test suite (839 tests).
Correctness / concurrency:
- Cache the scrubbed worker env in a module-global ``_WORKER_ENV`` and
pass the same dict identity to every ``get_reusable_executor`` call.
Previously a fresh dict was built per call from ``os.environ``; loky
treats kwargs inequality as "config changed" and tears the pool down
on every submit when any whitelisted var mutated, breaking concurrent
in-flight tasks with ``BrokenProcessPool``.
- Guard ``os.fdopen(fd, "wb")`` in ``_run_task`` so a fail at fdopen
doesn't leak the descriptor.
- Move ``cloudpickle.register_pickle_by_value`` outside ``_scoped_env`` —
it's a process-global registry mutation unrelated to env updates.
Security:
- Default spill dir is now ``$TMPDIR/gigaevo-<uid>`` (was bare
``$TMPDIR``), created with mode 0o700 — defeats directory-listing
workload-metadata leaks on shared hosts.
- ``_load_spill`` opens with ``O_RDONLY | O_NOFOLLOW`` and stats via
``fstat`` post-open — closes a TOCTOU symlink-swap window that allowed
arbitrary cloudpickle bytes to be unpickled by the parent (RCE).
- ``ExecutorConfig.from_env`` resolves ``..`` segments via
``Path.resolve(strict=False)``.
- ``_worker_init``'s prctl call now logs errno on failure instead of
silently swallowing.
- Module docstring documents the trust boundary: user code is unsandboxed
by design; the trust boundary lives at the LLM call upstream.
Tests added (now 71 in the executor file, 839 total in stages):
- 13 parametrised secret-scrub assertions (AWS_*, GH_TOKEN, OPENAI_API_KEY,
ANTHROPIC_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, HF_TOKEN, STRIPE_*,
SUPABASE_SERVICE_ROLE_KEY, and more).
- ``TestSpillDirHardening`` — default-dir-per-uid + ``..`` resolution.
- ``TestPythonPathPropagation`` — extra python_path reaches the worker.
- ``TestEnvUpdatesNoneUnsets`` — ``env_updates={K: None}`` unsets K.
- ``TestCancellationCleansSpill`` — cancellation path unlinks spill file.
Deferred (need design discussion, not in this PR):
- RLIMIT_NPROC / RLIMIT_FSIZE / RLIMIT_CORE / RLIMIT_AS opt-in caps.
- Surface ``WorkerResult`` resource accounting through the existing
``LogWriter`` pattern.
…auto-cap, blocking shutdown Three parallel audits in less-trafficked corners (serialization, fork/spawn inheritance, async lifecycle / pytest-xdist interaction) surfaced real hazards plus several debunked concerns. Documents and tests added for both kinds so future readers see what's been verified. Worker isolation (HIGH): - `os.chdir()` and `sys.path.insert()` calls inside user code persisted across worker reuse — task B inherited task A's cwd and path prepends. ``_run_one`` now snapshots ``os.getcwd()`` and ``list(sys.path)`` on entry and restores in ``finally``. Two regression tests pin this. Async lifecycle / xdist: - Under ``pytest-xdist`` each worker process imported wrapper.py and defaulted to ``cpu_count`` loky workers — on a 28-CPU / 8-xdist-worker host this fan-out is 224 subprocesses. ``ExecutorConfig.from_env`` now reads ``PYTEST_XDIST_WORKER_COUNT`` and auto-caps to ``max(1, cpu_count // xdist_workers)`` when the user hasn't set an explicit override. - ``shutdown_executor`` gained a keyword-only ``wait: bool = False`` parameter; ``run.py``'s finally block uses ``wait=True`` so loky's executor-manager thread reaps workers and fires done-callbacks (including ``_unlink_spill_on_done``) before redis/writer close. The session-scoped pytest fixture also uses ``wait=True`` so xdist workers don't exit while their loky children are mid-SIGKILL. Serialization / inheritance (debunked but documented): - ``context="loky"`` actually uses ``fork_exec(close_fds=True)`` followed by a re-exec — spawn semantics, not fork. Workers inherit none of the parent's Redis sockets, langfuse handlers, asyncio loop state, loguru sinks, or atexit hooks. Module docstring rewritten to spell this out so future readers don't repeat the audit. - Protocol-5 cloudpickle without ``buffer_callback`` is verified safe — ``BYTEARRAY8`` opcodes copy into freshly allocated buffers, so the unpickled result does not alias the spill mmap. ``buffer_callback`` could shrink ML-payload IPC but introduces dangling-reference hazards; deferred until ML workloads dominate. - ``cloudpickle._PICKLE_BY_VALUE_MODULES`` is a set keyed by name; with ``user_code`` reused every call there's no monotonic growth. - ``sys.path`` accumulation, ``sys.modules`` shadowing, ``linecache`` growth, ``register_pickle_by_value`` idempotency — all verified bounded. Tests added (now 93 in the executor file, 861 in stages tree): - 13 serialization regression tests (cyclic structures, structured dtypes, numpy memmap, closures over user classes, instances of inner classes, mmap-UAF stress, envelope round-trip, unpicklable result). - 2 worker-isolation tests (cwd leak, sys.path leak). - 5 xdist auto-cap tests + 2 ``shutdown_executor(wait=...)`` tests. All pass under ``pytest -n 4`` xdist as well as serial execution.
…ction banners Aggressive comment hygiene pass on a too-verbose diff. The PR description covers all the audit rationale; source-level comments should be one-line WHY hints, not paragraph-length narrative. - wrapper.py module docstring: 109 lines → 12 (drop verbose ``Spill backing storage`` / ``Result pickle protocol`` / ``Worker state lifecycle`` / ``Start method`` sections — they belong in PR body). - ``ExecutorConfig.from_env`` docstring + inline comments: ~50 lines → 8. - ``_worker_init`` / ``shutdown_executor`` / ``_load_spill`` docstrings trimmed to behavioural one-liners. - ``_get_executor`` comment about lazy env capture: 6 lines → 0 (moved context to ``_WORKER_ENV`` decl above). - ``run.py`` finally-block comment: 7 lines → 1. - ``conftest.py`` session-fixture / ``isolated_spill_dir`` / ``fresh_executor`` docstrings trimmed. - ``exec_runner.py`` ``_run_one`` docstring + inline comments: 22 lines → 7. ``register_pickle_by_value`` rationale: 6 lines → 2. ``WorkerError`` docstring: 6 lines → 1. - ``test_python_executors.py``: drop 4 ``# ===== section ====== `` banners (class names are the section headers), trim 12 regression-class docstrings to one-line summaries, drop bug-id list from module docstring. Net: 114 insertions / 452 deletions on the diff. Behavior unchanged; 93/93 executor tests pass.
Six surgical reverts on lines the architectural refactor never needed to touch. Each is pure noise (variable rename, inlined locals, dropped blank lines, dropped pre-existing docstrings / section banners) that earlier trim passes introduced when they should have left those lines alone. exec_runner.py: - ``_prepend_sys_path``: restore ``normalized_existing`` name and the ``| None`` type union; drop the added docstring. - ``_iter_top_level_module_names``: restore blank line after ``return set()``. - ``_module_belongs_to_path``: restore ``file_candidate``/``package_candidate`` locals and the blank line. - ``_clear_shadowed_top_level_modules``: restore the ``| None`` union and the blank line; drop the added docstring. - ``_write_code_context``: drop added docstring; restore ``last = user_frames[-1]; lineno = last.lineno`` two-line read. - ``_register_source``: drop added docstring. - Imports: collapse ``import contextlib`` + ``from contextlib import …`` into a single ``from contextlib import …, suppress`` line. test_python_executors.py: - Restore the module docstring's pre-existing one-line shape (was inflated to a paragraph). - Restore the five ``# --- section ---`` banners stripped by earlier trim passes (TestRunExecRunner, TestRunExecRunnerErrors, TestExecRunnerErrorAttributes, TestPythonCodeExecutorStage, TestPythonCodeExecutorErrorPaths, TestCallFileFunctionStage, TestCallProgramFunctionWithFixedArgs, TestFetchMetricsAndFetchArtifact, TestCallValidatorFunction). - Restore one-line docstrings on pre-existing tests that didn't need to change otherwise; restore inline annotations (e.g. ``# 1KB limit``, ``# syntax error``, ``# Create a minimal valid file...``). - Restore the ``# Should return a ProgramStageResult failure, not raise`` comment + late-import order in test_compute_failure_returns_stage_result. Each restoration shrinks the diff by turning a ``+/-`` pair into an unchanged line. All 93 executor tests still pass.
…sses Tests in the added classes (TestSerializationDistantCorners, TestProtocolFiveSafety, TestXdistWorkerCountAutoCap, TestShutdownExecutorWaitFlag) had paragraph-length docstrings explaining the audit context behind each probe. That context is now in the PR description; the test name plus a one-line behavioural docstring is enough for the in-source reader. Test bodies and assertions unchanged. Saved ~70 LOC of explanatory prose; 93/93 executor tests still pass.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The hand-rolled WorkerPool (persistent subprocess pool over a length-prefixed cloudpickle stdin/stdout protocol) is replaced with
loky.get_reusable_executor. Results are spilled to a per-call file on disk and read in the parent viammap, so per-call memory cost is bounded by free disk space rather than parent RAM. Workers spawn lazily, are reused across calls, and on Linux receivePR_SET_PDEATHSIGso they die with the parent.Known defects closed by construction:
lru_cacheoverasyncio.Queue/asyncio.Lock, binding the pool to the first event loop and breaking on subsequent loops (pytest-asyncio reset, repeatedasyncio.run).MemoryLimitExceededwas a\"MemoryError\" in stderrsubstring heuristic that mislabelled in-validator OOMs,KeyError(\"MemoryError\"), and unrelated library messages.stderr=PIPEwas never drained; parent RSS leaked unbounded._kill_process_tree's bareproc.kill()could target reused PIDs after a worker exit._run_via_workerswallowedcloudpickle.loadserrors and discarded captured stderr onTimeoutError/IncompleteReadError.get_workerawaited subprocess spawn while holding the pool lock, serialising the supposed fan-out.cloudpickle.register_pickle_by_value(mod)accumulation concerns (verified bounded after refactor).Defects explicitly fixed (locked in by regression tests):
ExecRunnerError.__str__droppedstderr; now formatted into the exception message so log lines carry the user traceback.execution.pytruncated error messages to 200 chars; now logged in full.os.chdir()/sys.path.insert()calls persisted across worker reuse, leaking cwd and path mutations into the next task.env=kwarg is additive, not replace — non-whitelisted env keys are now explicitly dropped in_worker_init, so ambient API tokens in the parent's environment do not reach user code.O_RDONLY | O_NOFOLLOW+ post-openfstat.\$TMPDIRto\$TMPDIR/gigaevo-<uid>with mode0o700.shutdown_executor(wait=False)raced loky's executor-manager thread.os.environmutation (kwargs inequality triggered pool teardown on every submit); scrubbed env is now cached once at import.os.fdopenraised mid-spill-write.execution.py.Behaviour improvements:
mmap+ protocol-5 cloudpickle).AWS_*,GH_TOKEN/GITHUB_TOKEN,OPENAI_API_KEY,ANTHROPIC_API_KEY,LANGFUSE_SECRET_KEY,WANDB_API_KEY,HF_TOKEN,STRIPE_*,SUPABASE_*, and everything else not in the explicit whitelist or under theGIGAEVO_*/LOKY_*prefixes.PR_SET_PDEATHSIGon Linux replaces the ps-grep orphan-reaper intools/flush.py.max_workersauto-caps tocpu_count // xdist_workersunder pytest-xdist to avoid the N×cpu_count fork-bomb on big hosts.shutdown_executorblocks on the executor-manager thread when called withwait=True, so spill-cleanup done-callbacks fire before interpreter exit.Public API surface:
run_exec_runnerreturnsAnydirectly (was(value, b\"\", \"\")3-tuple).cwd,runner_path(never used),max_memory_mb,max_output_size(replaced by disk-spilling).ExecRunnerError.stdout_bytes(was alwaysb\"\").MAX_MEMORY_MB/MAX_OUTPUT_SIZEremoved fromgigaevo.entrypoint.constants.WorkerPool,default_exec_runner_pool,_run_via_worker,_start_worker_process,_kill_process_tree,_monitor_rss_limitremoved entirely.New env-var configuration:
GIGAEVO_EXECUTOR_MAX_WORKERS— override loky's max_workers.GIGAEVO_EXECUTOR_IDLE_TIMEOUT_S— idle worker reap timeout (default 300s).GIGAEVO_EXECUTOR_SPILL_DIR— where result pickles spill (default `$TMPDIR/gigaevo-`).New dependency: `loky>=3.5`.