feat(execution): add execution observer foundation#1097
Conversation
ff3ac2f to
a74821d
Compare
Greptile SummaryThis PR introduces the execution observer foundation: a typed lifecycle event stream (
Confidence Score: 5/5The change is additive and opt-out by default; executions with no registered observer pay only a no-op context lookup, and the error isolation design prevents any observer from breaking a live execution. The interrupt-propagation and failure-isolation logic is correct and well-tested. The daemon fiber correctly inherits the observer context through forkDetach. Both execution paths emit a complete and symmetric event sequence. The two findings are narrow edge cases under concurrent interruption that do not affect the common path. No files require special attention for merge safety. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Caller
participant Engine
participant DaemonFiber
participant Observer
Caller->>Engine: executeWithPause(code, options)
Engine->>Observer: ExecutionStarted
Engine->>DaemonFiber: forkDetach (inherits Observer context)
DaemonFiber->>Observer: ToolCallStarted
DaemonFiber->>Observer: ToolCallFinished
DaemonFiber->>Observer: InteractionStarted
DaemonFiber-->>Engine: paused (Deferred)
Engine-->>Caller: PausedExecution
Caller->>Engine: resume(executionId, response)
Engine->>DaemonFiber: Deferred.succeed(response)
DaemonFiber->>Observer: InteractionResolved
DaemonFiber->>Observer: ExecutionFinished
Engine-->>Caller: ExecutionResult
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Caller
participant Engine
participant DaemonFiber
participant Observer
Caller->>Engine: executeWithPause(code, options)
Engine->>Observer: ExecutionStarted
Engine->>DaemonFiber: forkDetach (inherits Observer context)
DaemonFiber->>Observer: ToolCallStarted
DaemonFiber->>Observer: ToolCallFinished
DaemonFiber->>Observer: InteractionStarted
DaemonFiber-->>Engine: paused (Deferred)
Engine-->>Caller: PausedExecution
Caller->>Engine: resume(executionId, response)
Engine->>DaemonFiber: Deferred.succeed(response)
DaemonFiber->>Observer: InteractionResolved
DaemonFiber->>Observer: ExecutionFinished
Engine-->>Caller: ExecutionResult
Reviews (5): Last reviewed commit: "refactor(execution): scope observer disp..." | Re-trigger Greptile |
a5352a8 to
df4389d
Compare
## Summary - Mirror the upstream execution observer foundation and hardening from RhysSullivan#1097. - Keep the dev branch aligned with the scoped observer API: `withExecutionObserver`, `emitExecutionEvent`, and composed plugin observers. - Preserve fork-only execution actor fields while matching upstream observer failure handling and deterministic dispatch behavior. - Update dev-only execution observer plugins to use exhaustive Effect `Match` dispatch for `ExecutionEvent` handling. ## Type Safety Note Plugin observers handle `ExecutionEvent` as an exhaustive Effect tagged-union match rather than a raw `switch (event._tag)` or predicate chain. `execution-history` now uses `Match.exhaustive` for the full lifecycle stream, so a future event variant becomes a compile-time update point. The metrics observers also use exhaustive matching and explicitly ignore interaction events with no-op cases. ```ts import { Effect, Match } from "effect"; import { type ExecutionEvent } from "@executor-js/sdk"; const handleExecutionEvent = (history: ExecutionHistoryExtension) => Match.type<ExecutionEvent>().pipe( Match.withReturnType<Effect.Effect<void, unknown>>(), Match.tag("ExecutionStarted", (event) => history.store.createRun(event)), Match.tag("ToolCallStarted", (event) => history.store.createToolCall(event)), Match.tag("ToolCallFinished", (event) => history.store.finishToolCall(event)), Match.tag("InteractionStarted", (event) => history.store.createInteraction(event)), Match.tag("InteractionResolved", (event) => history.store.resolveInteraction(event)), Match.tag("ExecutionFinished", (event) => history.store.finishRun(event)), Match.exhaustive, ); ``` ## Validation - `bun run --cwd packages/core/sdk test -- execution-observer.test.ts` - `bun run --cwd packages/core/execution test -- engine-observer.test.ts` - `bun run --cwd packages/core/sdk typecheck` - `bun run --cwd packages/core/execution typecheck` - `bun run --cwd packages/plugins/execution-history test` - `bun run --cwd packages/plugins/execution-metrics test` - `bun run --cwd packages/plugins/execution-history typecheck` - `bun run --cwd packages/plugins/execution-metrics typecheck` - touched-file `oxfmt --check` - touched-file `oxlint -c .oxlintrc.jsonc --deny-warnings` - `git diff --check`
|
Additional downstream context from my fork: this observer API is the core primitive used by optional execution-history and execution-metrics plugins. High-level shape: AST outline of one downstream plugin: executionHistoryPlugin
-> pluginStorage: { runs }
-> storage(deps): makeExecutionHistoryStore(deps)
-> extension(ctx): { list, get, handleEvent }
-> runtime.executionObserver(self): makeExecutionHistoryObserver(self)Fork permalinks:
Call stack: This is why I think the PR is the right size: it contributes only the lifecycle hook, not the history or metrics products. Once this lands, those plugins can be proposed independently as ordinary consumers of the primitive. |
Summary
Notes
This is a domain lifecycle hook for plugin behavior such as history, metrics, indexing, or cache maintenance. It is not an OpenTelemetry replacement.
Non-interrupt observer failures are logged and isolated so they cannot fail the execution being observed. Interrupts still propagate as cancellation.
Implementation Shape
SDK Observer Contract
packages/core/sdk/src/execution-observer.tscurrentExecutionObserveris private. Callers install observers throughwithExecutionObserverand emit throughemitExecutionEvent, which keeps failure isolation on the public path.Plugin Hook
packages/core/sdk/src/plugin.tsThe hook receives the plugin extension so observers can write to plugin-owned stores or services without adding engine dependencies on plugin implementations.
Engine Wiring
packages/core/execution/src/engine.tsThe observer is scoped around public engine methods so detached execution fibers inherit the observer context. That matters for pause/resume because an execution can emit
InteractionResolvedafter the original request returns.Shared Stack Wiring
packages/core/api/src/server/execution-stack.tsLocal app boot paths follow the same pattern by composing plugin observers and passing the observer into
createExecutionEngine.Event Flow
Inline Execution
Pause/Resume Execution
Example Plugin
Plugin observers should handle
ExecutionEventas an exhaustive tagged union. TheMatch.exhaustivecloser makes future event variants a compile-time update point for plugins that care about every lifecycle event.Validation
bun run --cwd packages/core/sdk test -- execution-observer.test.tsbun run --cwd packages/core/execution test src/engine-observer.test.tsbun run --cwd packages/core/sdk typecheckbun run --cwd packages/core/execution typecheckbun run --cwd packages/core/api typecheckgit diff --check upstream/main...HEADoxfmt --checkoxlint -c .oxlintrc.jsonc --deny-warningsFollow-up Scope
This PR intentionally does not add execution history, metrics export, semantic search indexing, or an OpenTelemetry bridge.