You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Celery worker_tasks.handle_result/handle_failure — third copy of failure-classification logic
packages/climate-ref/src/climate_ref/executor/result_handling.py is a 474-line god module exposing process_result, mark_execution_failed, handle_execution_result plus seven helpers. Each caller must know: when to open a transaction, how to look up a detached Execution by id, how to classify a future's failure as retryable vs not, and how the dirty-flag policy interacts with retry semantics.
The recent stuck-execution incident (7df2750f) was caused by exactly this scattering — successful=None rows escaped one drain loop because the failure-recording call wasn't reached on a code path. Fixing it required edits to local.py, hpc.py, and solver.py plus a new reaper. Any new executor will face the same trap.
The core invariant — every Execution row submitted to an executor reaches successful=True or successful=False — is enforced by convention across six files, not by a deep module.
Proposed Interface
A single OutcomeRecorder class with one public method (record) and three discriminated DTOs.
# packages/climate-ref/src/climate_ref/executor/recorder.py@define(frozen=True, slots=True)classCompleted:
"""Worker returned an ExecutionResult (success OR diagnostic failure)."""definition: ExecutionDefinitionresult: ExecutionResultexecution_id: int|Noneupdate_dirty: bool=True# reingest sets False@define(frozen=True, slots=True)classAbandoned:
"""Future never produced a result (timeout, walltime, OOM, worker crash)."""definition: ExecutionDefinitionexecution_id: int|Nonecause: str# "per-task timeout", "walltime", "worker crash"retryable: bool=True@define(frozen=True, slots=True)classStaleSweep:
"""Reaper: fail every Execution older than `older_than` still pending."""older_than: timedelta=timedelta(hours=6)
Outcome=Completed|Abandoned|StaleSweep@define(frozen=True, slots=True)classRecordReport:
recorded: int=0failed_to_record: int=0classification: str=""classOutcomeRecorder:
def__init__(self, *, config: Config, store: ExecutionStore) ->None: ...
defrecord(self, outcome: Outcome|Iterable[Outcome]) ->RecordReport: ...
Caller usage
LocalExecutor.join hot loop (replaces local.py:167-232, ~65 lines → ~20):
Production adapter SqlExecutionStore(database) wraps the existing Database; transaction() returns database.session.begin(); get_or_attach() does session.get + session.merge for detached rows. Test adapter InMemoryExecutionStore holds dicts of Execution rows for fast unit tests.
Filesystem and metric-bundle I/O stay as plain helpers (existing _copy_file_to_results, ingest_scalar_values, ingest_series_values, register_execution_outputs) called by the recorder. They were already pure-ish (callers own transactions per docstrings); deferring their port-extraction keeps the migration scope tight. They can be ported later if a non-CMEC bundle format or alternative storage backend appears.
One integration test per executor (LocalExecutor, HPCExecutor, SynchronousExecutor, Celery worker callback) proving wiring: submit → join → Execution.successful is not None
Old tests to delete or absorb
packages/climate-ref/tests/unit/executor/test_result_handling.py — most cases (process_result wrapper, mark_execution_failed wrapper, handle_execution_result branches) reduce to the new boundary tests above. Keep the helper-level tests that exercise filesystem copy + ingestion specifics.
packages/climate-ref/tests/unit/executor/test_local_executor.py — pool-mocking + future-exception tests collapse into the integration test plus boundary tests on the recorder.
packages/climate-ref/tests/unit/executor/test_synchronous_executor.py — handle_execution_result patching disappears; behavior asserted at recorder boundary.
packages/climate-ref/tests/unit/executor/test_hpc_executor.py — classification ladder tests move to recorder boundary.
The mark_execution_failed fixture-based tests become unnecessary — Abandoned is the new vocabulary.
Test environment needs
InMemoryExecutionStore adapter for unit tests (no sqlite, no tmp_path for ORM)
Existing tmp_path + CMEC bundle fixtures stay for the recorder's filesystem/ingestion paths (no port for those helpers)
Existing integration suite (packages/climate-ref/tests/integration/test_executor_failure_modes.py) is the regression guard during migration — its assertions are about end-state Execution.successful, which the recorder still owns.
Implementation Recommendations
Module responsibilities
OutcomeRecorder owns: the terminal-state invariant, transaction lifecycle for outcome recording, failure classification (system vs diagnostic), ORM detached-row reattachment, dirty-flag policy, CV cache, idempotence guard
OutcomeRecorder hides: which file copies happen in what order, when log-missing triggers retryable failure, how scalar/series metrics are bulk-inserted, how the stale-sweep query is shaped, how update_dirty=False propagates
OutcomeRecorder exposes: record(outcome); Completed/Abandoned/StaleSweep DTOs; RecordReport for observability
Migration plan
Add OutcomeRecorder + ExecutionStore port + SqlExecutionStore adapter + boundary tests, using existing helpers in result_handling.py internally. No call sites change yet.
Migrate SynchronousExecutor (smallest surface, no drain loop). Delete the session.merge dance.
Migrate HPCExecutor. Same shape as Local; classification ladder collapses.
Migrate solver.fail_stale_in_progress_executions to recorder.record(StaleSweep(...)). Delete the helper.
Migrate reingest.py to recorder.record(Completed(..., update_dirty=False)). Delete handle_execution_result(update_dirty=False) import.
Migrate Celery worker_tasks.handle_result/handle_failure to call the recorder on the coordinator side.
Delete process_result and mark_execution_failed from result_handling.py once all callers are migrated. Demote remaining helpers to module-private (_copy_*, _ingest_*).
Misuse guards worth adding
Assert not database.session.in_transaction() inside record() — the bug pattern from synchronous.py.
DTO validation: Completed requires result; StaleSweep rejects extra fields.
Document and test that observers/extensions added later must not call database.session.commit() inside the recorder's transaction.
Out of scope (deferred to future RFCs)
Extracting OutputStore / MetricIngestor / MetricValidator ports (D4-style full ports & adapters split). Worth doing only if a non-CMEC bundle format or alternative storage backend appears.
Pipeline / Stage / Observer abstractions for recorder extensibility (D2-style). Add only when a concrete extension request arrives (e.g., Slack notification on non-retryable failure).
Unifying the executor drain loop itself across LocalExecutor and HPCExecutor — separate refactor (Poller / ExecutionTracker, candidate Migrate to use uv workspaces #2 in the original list).
Problem
The Executor subsystem leaves terminal-state recording for
Executionrows scattered across five call sites with duplicated logic:LocalExecutor.join(packages/climate-ref/src/climate_ref/executor/local.py:167-254) — drain loop, per-task timeout,_mark_failed,_fail_outstandingHPCExecutor.join(packages/climate-ref/src/climate_ref/executor/hpc.py:415-528) — near-identical drain loop with extraDiagnosticError/ExecutionLostclassification ladderSynchronousExecutor.run(packages/climate-ref/src/climate_ref/executor/synchronous.py:48-61) — special detached-rowsession.mergedancesolver.fail_stale_in_progress_executions(packages/climate-ref/src/climate_ref/solver.py:520-572) — reaper for rows stuck insuccessful=Nonereingest.py(packages/climate-ref/src/climate_ref/executor/reingest.py:269-294) — callshandle_execution_result(update_dirty=False)directlyworker_tasks.handle_result/handle_failure— third copy of failure-classification logicpackages/climate-ref/src/climate_ref/executor/result_handling.pyis a 474-line god module exposingprocess_result,mark_execution_failed,handle_execution_resultplus seven helpers. Each caller must know: when to open a transaction, how to look up a detachedExecutionby id, how to classify a future's failure as retryable vs not, and how the dirty-flag policy interacts with retry semantics.The recent stuck-execution incident (
7df2750f) was caused by exactly this scattering —successful=Nonerows escaped one drain loop because the failure-recording call wasn't reached on a code path. Fixing it required edits tolocal.py,hpc.py, andsolver.pyplus a new reaper. Any new executor will face the same trap.The core invariant — every Execution row submitted to an executor reaches
successful=Trueorsuccessful=False— is enforced by convention across six files, not by a deep module.Proposed Interface
A single
OutcomeRecorderclass with one public method (record) and three discriminated DTOs.Caller usage
LocalExecutor.joinhot loop (replaceslocal.py:167-232, ~65 lines → ~20):HPCExecutorclassification ladder (replaceshpc.py:451-495):Reaper collapses to one expression:
Complexity hidden inside the recorder
with database.session.begin()per record; asserts caller has none open)session.get(Execution, id)ExecutionResult.build_from_failure(retryable=...)forAbandoned; respectsresult.retryableforCompletedresult_handling.py:433)AbandonedandStaleSweepalways leave dirty=Truesuccessful is None and created_at < cutoff)Dependency Strategy
Local-substitutable with one thin port for ORM access.
OutcomeRecorderdepends onConfigdirectly and on the database via anExecutionStoreProtocol:Production adapter
SqlExecutionStore(database)wraps the existingDatabase;transaction()returnsdatabase.session.begin();get_or_attach()doessession.get+session.mergefor detached rows. Test adapterInMemoryExecutionStoreholds dicts ofExecutionrows for fast unit tests.Filesystem and metric-bundle I/O stay as plain helpers (existing
_copy_file_to_results,ingest_scalar_values,ingest_series_values,register_execution_outputs) called by the recorder. They were already pure-ish (callers own transactions per docstrings); deferring their port-extraction keeps the migration scope tight. They can be ported later if a non-CMEC bundle format or alternative storage backend appears.Testing Strategy
New boundary tests to write
test_recorder_completed_success_path: success result → file copy + ingestion +mark_successful+dirty=Falsetest_recorder_completed_diagnostic_failure: failure result withretryable=False→mark_failed+dirty=Falsetest_recorder_completed_system_failure: failure result withretryable=True→mark_failed+dirty=True(left for retry)test_recorder_completed_missing_log:FileNotFoundErroron log copy →mark_failed+dirty=True+ skip ingesttest_recorder_completed_update_dirty_false: reingest mode preservesdirtyflag regardless of outcometest_recorder_abandoned_per_task_timeout: synthesizes retryable failure, marks row failedtest_recorder_abandoned_diagnostic_error: caller passes pre-built failure result, recorder routes throughCompletedtest_recorder_stale_sweep: marks allsuccessful=None and created_at < cutoffrows failed; returns counttest_recorder_idempotence: recording twice for same execution_id is a no-optest_recorder_rejects_open_transaction: assertion fails fast if caller already hassession.in_transaction()test_recorder_unknown_execution_id: graceful no-op +RecordReport.failed_to_recordincrementLocalExecutor,HPCExecutor,SynchronousExecutor, Celery worker callback) proving wiring: submit → join →Execution.successful is not NoneOld tests to delete or absorb
packages/climate-ref/tests/unit/executor/test_result_handling.py— most cases (process_result wrapper, mark_execution_failed wrapper, handle_execution_result branches) reduce to the new boundary tests above. Keep the helper-level tests that exercise filesystem copy + ingestion specifics.packages/climate-ref/tests/unit/executor/test_local_executor.py— pool-mocking + future-exception tests collapse into the integration test plus boundary tests on the recorder.packages/climate-ref/tests/unit/executor/test_synchronous_executor.py—handle_execution_resultpatching disappears; behavior asserted at recorder boundary.packages/climate-ref/tests/unit/executor/test_hpc_executor.py— classification ladder tests move to recorder boundary.mark_execution_failedfixture-based tests become unnecessary —Abandonedis the new vocabulary.Test environment needs
InMemoryExecutionStoreadapter for unit tests (no sqlite, notmp_pathfor ORM)tmp_path+ CMEC bundle fixtures stay for the recorder's filesystem/ingestion paths (no port for those helpers)packages/climate-ref/tests/integration/test_executor_failure_modes.py) is the regression guard during migration — its assertions are about end-stateExecution.successful, which the recorder still owns.Implementation Recommendations
Module responsibilities
update_dirty=Falsepropagatesrecord(outcome);Completed/Abandoned/StaleSweepDTOs;RecordReportfor observabilityMigration plan
OutcomeRecorder+ExecutionStoreport +SqlExecutionStoreadapter + boundary tests, using existing helpers inresult_handling.pyinternally. No call sites change yet.SynchronousExecutor(smallest surface, no drain loop). Delete thesession.mergedance.LocalExecutor. Delete_mark_failed,_fail_outstanding, transaction blocks. Drain loop becomes ~20 lines.HPCExecutor. Same shape as Local; classification ladder collapses.solver.fail_stale_in_progress_executionstorecorder.record(StaleSweep(...)). Delete the helper.reingest.pytorecorder.record(Completed(..., update_dirty=False)). Deletehandle_execution_result(update_dirty=False)import.worker_tasks.handle_result/handle_failureto call the recorder on the coordinator side.process_resultandmark_execution_failedfromresult_handling.pyonce all callers are migrated. Demote remaining helpers to module-private (_copy_*,_ingest_*).Misuse guards worth adding
not database.session.in_transaction()insiderecord()— the bug pattern fromsynchronous.py.Completedrequiresresult;StaleSweeprejects extra fields.database.session.commit()inside the recorder's transaction.Out of scope (deferred to future RFCs)
OutputStore/MetricIngestor/MetricValidatorports (D4-style full ports & adapters split). Worth doing only if a non-CMEC bundle format or alternative storage backend appears.LocalExecutorandHPCExecutor— separate refactor (Poller / ExecutionTracker, candidate Migrate to use uv workspaces #2 in the original list).