fix(execution): mirror observer upstream hardening#59
Conversation
Greptile SummaryThis PR migrates the execution observer system from a closure-based
Confidence Score: 5/5Safe to merge — the context-scoped observer wiring is architecturally sound, detached fibers correctly inherit the observer at fork time, and interrupt re-propagation is explicitly covered by new tests. The core change — replacing an ad-hoc closure with Effect's Context.Reference pattern — is straightforward and the type signatures enforce correctness end-to-end. The sequential dispatch trade-off is deliberate, tested, and the interrupt short-circuit behaviour is validated. Only a stale JSDoc comment was found; no logic defects or runtime regressions are present. packages/plugins/execution-metrics/src/cloudflare/index.ts has a stale JSDoc describing the old "errors are swallowed" contract; worth updating before merging to keep observer contract documentation accurate for future implementors. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Caller
participant Engine
participant ObserverCtx as Context.Reference<br/>(currentExecutionObserver)
participant ComposedObs as composeExecutionObservers
participant PluginA as Plugin Observer A
participant PluginB as Plugin Observer B
participant DetachedFiber as Detached Fiber<br/>(pausable exec)
Caller->>Engine: executeWithPause(code, options)
Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>provideService to entire effect scope"
Engine->>ObserverCtx: emitExecutionEvent(ExecutionStarted)
ObserverCtx->>ComposedObs: handle(ExecutionStarted)
ComposedObs->>PluginA: handle(event) [sequential]
PluginA-->>ComposedObs: Effect.void or logs error
ComposedObs->>PluginB: handle(event) [sequential, only if A did not interrupt]
PluginB-->>ComposedObs: Effect.void
Engine->>DetachedFiber: Effect.forkDetach (inherits observer context)
Engine-->>Caller: pause handle
Caller->>Engine: resume(executionId, response)
Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>wraps resume scope"
DetachedFiber->>ObserverCtx: "emitExecutionEvent(ExecutionFinished)<br/>(from inherited context)"
ObserverCtx->>ComposedObs: handle(ExecutionFinished)
Note over ComposedObs,PluginB: interrupt from any observer<br/>stops remaining dispatch
Engine-->>Caller: ExecutionResult
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Caller
participant Engine
participant ObserverCtx as Context.Reference<br/>(currentExecutionObserver)
participant ComposedObs as composeExecutionObservers
participant PluginA as Plugin Observer A
participant PluginB as Plugin Observer B
participant DetachedFiber as Detached Fiber<br/>(pausable exec)
Caller->>Engine: executeWithPause(code, options)
Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>provideService to entire effect scope"
Engine->>ObserverCtx: emitExecutionEvent(ExecutionStarted)
ObserverCtx->>ComposedObs: handle(ExecutionStarted)
ComposedObs->>PluginA: handle(event) [sequential]
PluginA-->>ComposedObs: Effect.void or logs error
ComposedObs->>PluginB: handle(event) [sequential, only if A did not interrupt]
PluginB-->>ComposedObs: Effect.void
Engine->>DetachedFiber: Effect.forkDetach (inherits observer context)
Engine-->>Caller: pause handle
Caller->>Engine: resume(executionId, response)
Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>wraps resume scope"
DetachedFiber->>ObserverCtx: "emitExecutionEvent(ExecutionFinished)<br/>(from inherited context)"
ObserverCtx->>ComposedObs: handle(ExecutionFinished)
Note over ComposedObs,PluginB: interrupt from any observer<br/>stops remaining dispatch
Engine-->>Caller: ExecutionResult
Reviews (4): Last reviewed commit: "refactor(plugins): match execution event..." | Re-trigger Greptile |
Summary
withExecutionObserver,emitExecutionEvent, and composed plugin observers.Matchdispatch forExecutionEventhandling.Type Safety Note
Plugin observers handle
ExecutionEventas an exhaustive Effect tagged-union match rather than a rawswitch (event._tag)or predicate chain.execution-historynow usesMatch.exhaustivefor the full lifecycle stream, so a future event variant becomes a compile-time update point. The metrics observers also use exhaustive matching and explicitly ignore interaction events with no-op cases.Validation
bun run --cwd packages/core/sdk test -- execution-observer.test.tsbun run --cwd packages/core/execution test -- engine-observer.test.tsbun run --cwd packages/core/sdk typecheckbun run --cwd packages/core/execution typecheckbun run --cwd packages/plugins/execution-history testbun run --cwd packages/plugins/execution-metrics testbun run --cwd packages/plugins/execution-history typecheckbun run --cwd packages/plugins/execution-metrics typecheckoxfmt --checkoxlint -c .oxlintrc.jsonc --deny-warningsgit diff --check