Skip to content

RFC: OutcomeRecorder — consolidate execution result handling behind a single deep module #640

@lewisjared

Description

@lewisjared

Problem

The Executor subsystem leaves terminal-state recording for Execution rows scattered across five call sites with duplicated logic:

  • LocalExecutor.join (packages/climate-ref/src/climate_ref/executor/local.py:167-254) — drain loop, per-task timeout, _mark_failed, _fail_outstanding
  • HPCExecutor.join (packages/climate-ref/src/climate_ref/executor/hpc.py:415-528) — near-identical drain loop with extra DiagnosticError / ExecutionLost classification ladder
  • SynchronousExecutor.run (packages/climate-ref/src/climate_ref/executor/synchronous.py:48-61) — special detached-row session.merge dance
  • solver.fail_stale_in_progress_executions (packages/climate-ref/src/climate_ref/solver.py:520-572) — reaper for rows stuck in successful=None
  • reingest.py (packages/climate-ref/src/climate_ref/executor/reingest.py:269-294) — calls handle_execution_result(update_dirty=False) directly
  • Celery worker_tasks.handle_result/handle_failure — third copy of failure-classification logic

packages/climate-ref/src/climate_ref/executor/result_handling.py is a 474-line god module exposing process_result, mark_execution_failed, handle_execution_result plus seven helpers. Each caller must know: when to open a transaction, how to look up a detached Execution by id, how to classify a future's failure as retryable vs not, and how the dirty-flag policy interacts with retry semantics.

The recent stuck-execution incident (7df2750f) was caused by exactly this scattering — successful=None rows escaped one drain loop because the failure-recording call wasn't reached on a code path. Fixing it required edits to local.py, hpc.py, and solver.py plus a new reaper. Any new executor will face the same trap.

The core invariant — every Execution row submitted to an executor reaches successful=True or successful=False — is enforced by convention across six files, not by a deep module.

Proposed Interface

A single OutcomeRecorder class with one public method (record) and three discriminated DTOs.

# packages/climate-ref/src/climate_ref/executor/recorder.py

@define(frozen=True, slots=True)
class Completed:
    """Worker returned an ExecutionResult (success OR diagnostic failure)."""
    definition: ExecutionDefinition
    result: ExecutionResult
    execution_id: int | None
    update_dirty: bool = True   # reingest sets False

@define(frozen=True, slots=True)
class Abandoned:
    """Future never produced a result (timeout, walltime, OOM, worker crash)."""
    definition: ExecutionDefinition
    execution_id: int | None
    cause: str                  # "per-task timeout", "walltime", "worker crash"
    retryable: bool = True

@define(frozen=True, slots=True)
class StaleSweep:
    """Reaper: fail every Execution older than `older_than` still pending."""
    older_than: timedelta = timedelta(hours=6)

Outcome = Completed | Abandoned | StaleSweep

@define(frozen=True, slots=True)
class RecordReport:
    recorded: int = 0
    failed_to_record: int = 0
    classification: str = ""

class OutcomeRecorder:
    def __init__(self, *, config: Config, store: ExecutionStore) -> None: ...
    def record(self, outcome: Outcome | Iterable[Outcome]) -> RecordReport: ...

Caller usage

LocalExecutor.join hot loop (replaces local.py:167-232, ~65 lines → ~20):

recorder = self._recorder
while results:
    now = time.time()
    for r in results[:]:
        if r.future.done():
            try:
                exec_result = r.future.result(timeout=0)
                recorder.record(Completed(r.definition, exec_result, r.execution_id))
            except Exception as exc:
                recorder.record(Abandoned(r.definition, r.execution_id, cause="future raised"))
                results.remove(r); raise ExecutionError(...) from exc
            results.remove(r); t.update(1); continue
        if self._task_timed_out(r, now):
            r.future.cancel()
            recorder.record(Abandoned(r.definition, r.execution_id, cause="per-task timeout"))
            results.remove(r); t.update(1)
    if time.time() - start_time > timeout:
        recorder.record(Abandoned(r.definition, r.execution_id, cause="overall timeout") for r in results)
        results.clear(); raise TimeoutError(...)
    time.sleep(refresh_time)

HPCExecutor classification ladder (replaces hpc.py:451-495):

err = r.future.exception()
if err is None:
    try:
        outcome = Completed(r.definition, r.future.result(timeout=0), r.execution_id)
    except Exception:
        outcome = Abandoned(r.definition, r.execution_id, cause="result retrieval failed")
elif isinstance(err, DiagnosticError):
    outcome = Completed(r.definition, err.result, r.execution_id)
else:
    outcome = Abandoned(r.definition, r.execution_id, cause=f"system failure: {err!r}")
recorder.record(outcome)

Reaper collapses to one expression:

def fail_stale_in_progress_executions(db, config, *, stale_after_seconds=DEFAULT_STALE_EXECUTION_AGE_SECONDS):
    return OutcomeRecorder.build(config, db).record(
        StaleSweep(older_than=timedelta(seconds=stale_after_seconds))
    ).recorded

Complexity hidden inside the recorder

  • Transaction discipline (with database.session.begin() per record; asserts caller has none open)
  • Detached ORM row reattachment via session.get(Execution, id)
  • Failure classification: synthesizes ExecutionResult.build_from_failure(retryable=...) for Abandoned; respects result.retryable for Completed
  • File copy: log rescue (missing log → mark_failed + leave dirty=True) + bundle staging scratch→results
  • CV loaded once per recorder instance (fixes per-call reload at result_handling.py:433)
  • Metric ingestion (scalar + series) and output bundle registration
  • Dirty-flag policy: only cleared on terminal non-retryable outcomes; Abandoned and StaleSweep always leave dirty=True
  • Idempotence: terminal rows skipped (race-safe vs late Celery callbacks)
  • Stale sweep query (successful is None and created_at < cutoff)

Dependency Strategy

Local-substitutable with one thin port for ORM access.

OutcomeRecorder depends on Config directly and on the database via an ExecutionStore Protocol:

class ExecutionStore(Protocol):
    def transaction(self) -> ContextManager[None]: ...
    def get_or_attach(
        self, execution_id: int | None, *, detached: Execution | None = None
    ) -> Execution | None: ...
    def query_stale(self, cutoff: datetime) -> list[Execution]: ...

Production adapter SqlExecutionStore(database) wraps the existing Database; transaction() returns database.session.begin(); get_or_attach() does session.get + session.merge for detached rows. Test adapter InMemoryExecutionStore holds dicts of Execution rows for fast unit tests.

Filesystem and metric-bundle I/O stay as plain helpers (existing _copy_file_to_results, ingest_scalar_values, ingest_series_values, register_execution_outputs) called by the recorder. They were already pure-ish (callers own transactions per docstrings); deferring their port-extraction keeps the migration scope tight. They can be ported later if a non-CMEC bundle format or alternative storage backend appears.

Testing Strategy

New boundary tests to write

  • test_recorder_completed_success_path: success result → file copy + ingestion + mark_successful + dirty=False
  • test_recorder_completed_diagnostic_failure: failure result with retryable=Falsemark_failed + dirty=False
  • test_recorder_completed_system_failure: failure result with retryable=Truemark_failed + dirty=True (left for retry)
  • test_recorder_completed_missing_log: FileNotFoundError on log copy → mark_failed + dirty=True + skip ingest
  • test_recorder_completed_update_dirty_false: reingest mode preserves dirty flag regardless of outcome
  • test_recorder_abandoned_per_task_timeout: synthesizes retryable failure, marks row failed
  • test_recorder_abandoned_diagnostic_error: caller passes pre-built failure result, recorder routes through Completed
  • test_recorder_stale_sweep: marks all successful=None and created_at < cutoff rows failed; returns count
  • test_recorder_idempotence: recording twice for same execution_id is a no-op
  • test_recorder_rejects_open_transaction: assertion fails fast if caller already has session.in_transaction()
  • test_recorder_unknown_execution_id: graceful no-op + RecordReport.failed_to_record increment
  • One integration test per executor (LocalExecutor, HPCExecutor, SynchronousExecutor, Celery worker callback) proving wiring: submit → join → Execution.successful is not None

Old tests to delete or absorb

  • packages/climate-ref/tests/unit/executor/test_result_handling.py — most cases (process_result wrapper, mark_execution_failed wrapper, handle_execution_result branches) reduce to the new boundary tests above. Keep the helper-level tests that exercise filesystem copy + ingestion specifics.
  • packages/climate-ref/tests/unit/executor/test_local_executor.py — pool-mocking + future-exception tests collapse into the integration test plus boundary tests on the recorder.
  • packages/climate-ref/tests/unit/executor/test_synchronous_executor.pyhandle_execution_result patching disappears; behavior asserted at recorder boundary.
  • packages/climate-ref/tests/unit/executor/test_hpc_executor.py — classification ladder tests move to recorder boundary.
  • The mark_execution_failed fixture-based tests become unnecessary — Abandoned is the new vocabulary.

Test environment needs

  • InMemoryExecutionStore adapter for unit tests (no sqlite, no tmp_path for ORM)
  • Existing tmp_path + CMEC bundle fixtures stay for the recorder's filesystem/ingestion paths (no port for those helpers)
  • Existing integration suite (packages/climate-ref/tests/integration/test_executor_failure_modes.py) is the regression guard during migration — its assertions are about end-state Execution.successful, which the recorder still owns.

Implementation Recommendations

Module responsibilities

  • OutcomeRecorder owns: the terminal-state invariant, transaction lifecycle for outcome recording, failure classification (system vs diagnostic), ORM detached-row reattachment, dirty-flag policy, CV cache, idempotence guard
  • OutcomeRecorder hides: which file copies happen in what order, when log-missing triggers retryable failure, how scalar/series metrics are bulk-inserted, how the stale-sweep query is shaped, how update_dirty=False propagates
  • OutcomeRecorder exposes: record(outcome); Completed/Abandoned/StaleSweep DTOs; RecordReport for observability

Migration plan

  1. Add OutcomeRecorder + ExecutionStore port + SqlExecutionStore adapter + boundary tests, using existing helpers in result_handling.py internally. No call sites change yet.
  2. Migrate SynchronousExecutor (smallest surface, no drain loop). Delete the session.merge dance.
  3. Migrate LocalExecutor. Delete _mark_failed, _fail_outstanding, transaction blocks. Drain loop becomes ~20 lines.
  4. Migrate HPCExecutor. Same shape as Local; classification ladder collapses.
  5. Migrate solver.fail_stale_in_progress_executions to recorder.record(StaleSweep(...)). Delete the helper.
  6. Migrate reingest.py to recorder.record(Completed(..., update_dirty=False)). Delete handle_execution_result(update_dirty=False) import.
  7. Migrate Celery worker_tasks.handle_result/handle_failure to call the recorder on the coordinator side.
  8. Delete process_result and mark_execution_failed from result_handling.py once all callers are migrated. Demote remaining helpers to module-private (_copy_*, _ingest_*).

Misuse guards worth adding

  • Assert not database.session.in_transaction() inside record() — the bug pattern from synchronous.py.
  • DTO validation: Completed requires result; StaleSweep rejects extra fields.
  • Document and test that observers/extensions added later must not call database.session.commit() inside the recorder's transaction.

Out of scope (deferred to future RFCs)

  • Extracting OutputStore / MetricIngestor / MetricValidator ports (D4-style full ports & adapters split). Worth doing only if a non-CMEC bundle format or alternative storage backend appears.
  • Pipeline / Stage / Observer abstractions for recorder extensibility (D2-style). Add only when a concrete extension request arrives (e.g., Slack notification on non-retryable failure).
  • Unifying the executor drain loop itself across LocalExecutor and HPCExecutor — separate refactor (Poller / ExecutionTracker, candidate Migrate to use uv workspaces #2 in the original list).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions