diff --git a/.changeset/execution-observer-foundation.md b/.changeset/execution-observer-foundation.md new file mode 100644 index 000000000..4a4098b4b --- /dev/null +++ b/.changeset/execution-observer-foundation.md @@ -0,0 +1,14 @@ +--- +"@executor-js/sdk": minor +"@executor-js/execution": minor +"@executor-js/api": minor +--- + +Add the execution-observer foundation. The execution engine now emits a typed +lifecycle stream (`ExecutionStarted`/`Finished`, `ToolCallStarted`/`Finished`, +`InteractionStarted`/`Resolved`), plugins can subscribe via the new +`plugin.runtime.executionObserver` hook, and `makeExecutionStack` composes every +registered plugin's observer onto the engine. Behaviour is unchanged when no +plugin observes, making this the opt-in seam the execution-history and +execution-metrics plugins build on. Also exposes `Executor.owner` and enriches +the `mcp.execute` span with the run id and trigger. diff --git a/apps/local/src/app.ts b/apps/local/src/app.ts index a9915daf5..09eb0e122 100644 --- a/apps/local/src/app.ts +++ b/apps/local/src/app.ts @@ -9,6 +9,7 @@ import { } from "@executor-js/api/server"; import { createExecutionEngine } from "@executor-js/execution"; import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs"; +import { composeExecutionObservers, type AnyPlugin } from "@executor-js/sdk"; import { getExecutorBundle, type LocalExecutor } from "./executor"; import { makeLocalIdentityLayer } from "./identity"; @@ -50,12 +51,17 @@ import { ErrorCaptureLive } from "./observability"; * `HostConfig`/`CodeExecutorProvider` seams — the fixed executor is the whole * execution model. */ -const localFixedExecutionLayer = (executor: LocalExecutor): Layer.Layer => +const localFixedExecutionLayer = ( + executor: LocalExecutor, + plugins: readonly AnyPlugin[], +): Layer.Layer => Layer.succeed(FixedExecutionProvider)({ executor, engine: createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor(), + // Local bypasses makeExecutionStack, so compose plugin observers here. + observer: composeExecutionObservers(plugins, executor), }), // The executor IS its own plugin-extension map (`executor[pluginId]`); the // fixed middleware reads `executor[id]` to satisfy each plugin's @@ -86,7 +92,7 @@ export const makeLocalApiHandler = async (token: string): Promise ({ + id: "observer-test" as const, + storage: () => ({}), + staticSources: () => [], +})); + +const approvalPlugin = definePlugin(() => ({ + id: "observer-approval-test" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "approval.ctl", + kind: "control" as const, + name: "Approval Ctl", + tools: [ + tool({ + name: "run", + description: "Requires approval", + annotations: { requiresApproval: true } as const, + inputSchema: Schema.toStandardSchemaV1(Schema.toStandardJSONSchemaV1(Schema.Struct({}))), + execute: () => Effect.succeed("ran"), + }), + ], + }, + ], +})); + +const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const })); + +const makeApprovalExecutor = () => + createExecutor(makeTestConfig({ plugins: [approvalPlugin()] as const })); + +// A code executor that issues one builtin tool call (tools.search) and then +// completes, enough to exercise the full event sequence. +const toolCallingExecutor: CodeExecutor = { + execute: (_code, invoker) => + invoker + .invoke({ path: "search", args: { query: "anything" } }) + .pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie), +}; + +const approvalCallingExecutor: CodeExecutor = { + execute: (_code, invoker) => + invoker + .invoke({ path: "approval.ctl.run", args: {} }) + .pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie), +}; + +const collectingObserver = () => { + const events: ExecutionEvent[] = []; + const observer: ExecutionObserver = { + handle: (event) => Effect.sync(() => void events.push(event)), + }; + return { events, observer }; +}; + +describe("execution engine observer emission", () => { + it.effect("emits the full lifecycle for a completed run with a tool call", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const { events, observer } = collectingObserver(); + const engine = createExecutionEngine({ + executor, + codeExecutor: toolCallingExecutor, + observer, + }); + + const result = yield* engine.executeWithPause("noop", { trigger: { kind: "test" } }); + expect(result.status).toBe("completed"); + + // First event opens the run, last closes it; tool calls land in between. + // `.find` with isTagged narrows each result, so the assertions read the + // typed fields directly via optional chaining (no conditional blocks). + const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted")); + const finished = events.find((e) => Predicate.isTagged(e, "ExecutionFinished")); + const toolStarted = events.find((e) => Predicate.isTagged(e, "ToolCallStarted")); + const toolFinished = events.find((e) => Predicate.isTagged(e, "ToolCallFinished")); + + expect(Predicate.isTagged(events[0], "ExecutionStarted")).toBe(true); + expect(Predicate.isTagged(events[events.length - 1], "ExecutionFinished")).toBe(true); + + expect(started?.trigger?.kind).toBe("test"); + expect(started?.owner.tenant).toBeDefined(); + expect(toolStarted).toBeDefined(); + expect(finished?.status).toBe("completed"); + + // Tool-call events share the run's executionId and carry the path. + expect(toolFinished?.path).toBe("search"); + expect(toolFinished?.status).toBe("completed"); + expect(toolFinished?.executionId).toBe(started?.executionId); + }), + ); + + it.effect("emits inline interaction events when execute handles elicitation", () => + Effect.gen(function* () { + const executor = yield* makeApprovalExecutor(); + const { events, observer } = collectingObserver(); + const engine = createExecutionEngine({ + executor, + codeExecutor: approvalCallingExecutor, + observer, + }); + + const result = yield* engine.execute("noop", { + trigger: { kind: "test" }, + onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })), + }); + expect(result.result).toBe("ok"); + + const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted")); + const interactionStarted = events.find((e) => Predicate.isTagged(e, "InteractionStarted")); + const interactionResolved = events.find((e) => Predicate.isTagged(e, "InteractionResolved")); + + expect(interactionStarted?.executionId).toBe(started?.executionId); + expect(interactionResolved?.executionId).toBe(started?.executionId); + expect(interactionResolved?.interactionId).toBe(interactionStarted?.interactionId); + expect(interactionStarted?.context.request.message).toContain("approval"); + expect(interactionResolved?.status).toBe("accepted"); + expect(interactionResolved?.response?.action).toBe("accept"); + }), + ); + + it.effect("does nothing observable when no observer is configured", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ executor, codeExecutor: toolCallingExecutor }); + const result = yield* engine.executeWithPause("noop"); + expect(result.status).toBe("completed"); + }), + ); +}); diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 665548852..f44647970 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,5 +1,5 @@ import { Deferred, Effect, Fiber, Predicate, Queue } from "effect"; -import type * as Cause from "effect/Cause"; +import * as Cause from "effect/Cause"; import * as Exit from "effect/Exit"; import type { @@ -8,6 +8,22 @@ import type { ElicitationResponse, ElicitationHandler, ElicitationContext, + ExecutionObserver, + ExecutionTrigger, +} from "@executor-js/sdk/core"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ExecutionFinished, + ExecutionStarted, + InteractionResolved, + InteractionStarted, + ToolCallFinished, + ToolCallStarted, + emitExecutionEvent, + noopExecutionObserver, + withExecutionObserver, } from "@executor-js/sdk/core"; import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core"; @@ -30,6 +46,21 @@ export type ExecutionEngineConfig; readonly toolDiscoveryProvider?: ToolDiscoveryProvider; + /** Optional sink for execution lifecycle events. Defaults to a no-op, so a + * host that registers no observer pays only for constructing the events. */ + readonly observer?: ExecutionObserver; +}; + +/** Per-run options shared by both execute paths. */ +export type ExecutionRunOptions = { + /** What kicked off this run (e.g. `mcp.tool`, `api.http`); recorded on the + * `ExecutionStarted` event for downstream attribution. */ + readonly trigger?: ExecutionTrigger; +}; + +/** Options for the inline-elicitation execute path. */ +export type InlineExecutionOptions = ExecutionRunOptions & { + readonly onElicitation: ElicitationHandler; }; export type ExecutionResult = @@ -363,7 +394,7 @@ export type ExecutionEngine */ readonly execute: ( code: string, - options: { readonly onElicitation: ElicitationHandler }, + options: InlineExecutionOptions, ) => Effect.Effect; /** @@ -371,7 +402,10 @@ export type ExecutionEngine * Use this when the host doesn't support inline elicitation. * Returns either a completed result or a paused execution that can be resumed. */ - readonly executeWithPause: (code: string) => Effect.Effect; + readonly executeWithPause: ( + code: string, + options?: ExecutionRunOptions, + ) => Effect.Effect; /** * Resume a paused execution. Returns a completed result, a new pause, or @@ -423,6 +457,133 @@ export const createExecutionEngine = ExecutionId.make(`exec_${crypto.randomUUID()}`); + const makeToolCallId = (): ExecutionToolCallId => + ExecutionToolCallId.make(`tc_${crypto.randomUUID()}`); + const makeInteractionId = (): ExecutionInteractionId => + ExecutionInteractionId.make(`ix_${crypto.randomUUID()}`); + + const interactionStatusFromAction = (action: ResumeResponse["action"]) => + action === "accept" ? "accepted" : action === "decline" ? "declined" : "cancelled"; + + const finishFromResult = (executionId: ExecutionId, result: ExecuteResult): ExecutionFinished => + new ExecutionFinished({ + executionId, + owner, + status: result.error ? "failed" : "completed", + result: result.result, + error: result.error, + logs: result.logs, + completedAt: new Date(), + }); + + const finishFromCause = (executionId: ExecutionId, cause: Cause.Cause): ExecutionFinished => + new ExecutionFinished({ + executionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }); + + /** Wrap an invoker so each tool call brackets `ToolCallStarted`/`Finished`. */ + const observeToolCalls = ( + executionId: ExecutionId, + inner: SandboxToolInvoker, + ): SandboxToolInvoker => ({ + invoke: (call) => + Effect.gen(function* () { + const toolCallId = makeToolCallId(); + yield* emitExecutionEvent( + new ToolCallStarted({ + executionId, + toolCallId, + owner, + path: call.path, + args: call.args, + startedAt: new Date(), + }), + ); + return yield* inner.invoke(call).pipe( + Effect.tap((result) => + emitExecutionEvent( + new ToolCallFinished({ + executionId, + toolCallId, + owner, + path: call.path, + status: "completed", + result, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emitExecutionEvent( + new ToolCallFinished({ + executionId, + toolCallId, + owner, + path: call.path, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); + }), + }); + + /** Wrap an inline elicitation handler so it brackets `InteractionStarted`/ + * `Resolved`. The pausable path emits these directly (see below). */ + const observeInlineElicitation = + (executionId: ExecutionId, handler: ElicitationHandler): ElicitationHandler => + (ctx) => + Effect.gen(function* () { + const interactionId = makeInteractionId(); + yield* emitExecutionEvent( + new InteractionStarted({ + executionId, + interactionId, + owner, + context: ctx, + startedAt: new Date(), + }), + ); + return yield* handler(ctx).pipe( + Effect.tap((response) => + emitExecutionEvent( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: interactionStatusFromAction(response.action), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emitExecutionEvent( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); + }); + /** * Race a running fiber against the pause queue. Returns when either * the fiber completes or an elicitation handler fires (whichever @@ -455,12 +616,33 @@ export const createExecutionEngine = >(); @@ -476,6 +658,7 @@ export const createExecutionEngine = = { id, @@ -486,19 +669,59 @@ export const createExecutionEngine = + emitExecutionEvent( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: interactionStatusFromAction(response.action), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emitExecutionEvent( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); }); - const invoker = makeFullInvoker( - executor, - { onElicitation: elicitationHandler }, - toolDiscoveryProvider, + const invoker = observeToolCalls( + executionId, + makeFullInvoker(executor, { onElicitation: elicitationHandler }, toolDiscoveryProvider), ); fiber = yield* Effect.forkDetach( - codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")), + codeExecutor.execute(code, invoker).pipe( + Effect.withSpan("executor.code.exec"), + Effect.tap((result) => emitExecutionEvent(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))), + ), ); // When the fiber settles on its own (sandbox timeout, failure) while @@ -586,26 +809,49 @@ export const createExecutionEngine = emitExecutionEvent(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))), ); - return yield* codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")); }); return { - execute: runInlineExecution, - executeWithPause: startPausableExecution, - resume: resumeExecution, + execute: (code, options) => runInlineExecution(code, options).pipe(observeExecution), + executeWithPause: (code, options) => + startPausableExecution(code, options).pipe(observeExecution), + resume: (executionId, response) => + resumeExecution(executionId, response).pipe(observeExecution), getPausedExecution: (executionId) => Effect.sync(() => pausedExecutions.get(executionId) ?? null), getDescription: buildExecuteDescription(executor), diff --git a/packages/core/sdk/src/execution-observer.test.ts b/packages/core/sdk/src/execution-observer.test.ts new file mode 100644 index 000000000..c23612bbf --- /dev/null +++ b/packages/core/sdk/src/execution-observer.test.ts @@ -0,0 +1,141 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Cause, Effect, Exit } from "effect"; + +import { Subject, Tenant } from "./ids"; +import { + ExecutionFinished, + ExecutionId, + composeExecutionObservers, + definePlugin, + emitExecutionEvent, + withExecutionObserver, +} from "./index"; + +const owner = { tenant: Tenant.make("tenant_test"), subject: Subject.make("subject_test") }; + +let calls: string[] = []; + +const observingPlugin = (id: string, asyncBoundary = false) => + definePlugin(() => ({ + id, + storage: () => ({}), + extension: () => ({ label: id }), + runtime: { + executionObserver: (self: { label: string }) => ({ + handle: () => + (asyncBoundary ? Effect.promise(() => Promise.resolve()) : Effect.void).pipe( + Effect.flatMap(() => Effect.sync(() => calls.push(self.label))), + ), + }), + }, + })); + +const failingPlugin = definePlugin(() => ({ + id: "failing" as const, + storage: () => ({}), + extension: () => ({ label: "failing" }), + runtime: { + executionObserver: () => ({ + handle: () => Effect.die("observer failed"), + }), + }, +})); + +const interruptingPlugin = definePlugin(() => ({ + id: "interrupting" as const, + storage: () => ({}), + extension: () => ({ label: "interrupting" }), + runtime: { + executionObserver: () => ({ + handle: () => Effect.interrupt, + }), + }, +})); + +const finishedEvent = () => + new ExecutionFinished({ + executionId: ExecutionId.make("exec_test"), + owner, + status: "completed", + result: "ok", + completedAt: new Date(), + }); + +describe("composeExecutionObservers", () => { + it.effect("emits events to the scoped observer", () => + Effect.gen(function* () { + calls = []; + yield* emitExecutionEvent(finishedEvent()).pipe( + withExecutionObserver({ + handle: () => Effect.sync(() => calls.push("observed")), + }), + ); + + expect(calls).toEqual(["observed"]); + }), + ); + + it.effect("dispatches observers sequentially and isolates failures", () => + Effect.gen(function* () { + calls = []; + const first = observingPlugin("first", true)(); + const failing = failingPlugin(); + const last = observingPlugin("last")(); + const observer = composeExecutionObservers([first, failing, last] as const, { + first: { label: "first" }, + failing: { label: "failing" }, + last: { label: "last" }, + }); + + // The failing plugin dies mid-dispatch; the others must still observe. + yield* observer.handle(finishedEvent()); + + expect(calls).toEqual(["first", "last"]); + }), + ); + + it.effect("preserves interrupts from scoped observers", () => + Effect.gen(function* () { + const exit = yield* Effect.exit( + emitExecutionEvent(finishedEvent()).pipe( + withExecutionObserver({ + handle: () => Effect.interrupt, + }), + ), + ); + + expect(Exit.isFailure(exit)).toBe(true); + if (!Exit.isFailure(exit)) return; + expect(Cause.hasInterrupts(exit.cause)).toBe(true); + }), + ); + + it.effect("preserves interrupts from composed plugin observers", () => + Effect.gen(function* () { + calls = []; + const interrupting = interruptingPlugin(); + const last = observingPlugin("last")(); + const observer = composeExecutionObservers([interrupting, last] as const, { + interrupting: { label: "interrupting" }, + last: { label: "last" }, + }); + + const exit = yield* Effect.exit(observer.handle(finishedEvent())); + + expect(Exit.isFailure(exit)).toBe(true); + if (!Exit.isFailure(exit)) return; + expect(Cause.hasInterrupts(exit.cause)).toBe(true); + expect(calls).toEqual([]); + }), + ); + + it.effect("returns a no-op observer when no plugin registers one", () => + Effect.gen(function* () { + const plain = definePlugin(() => ({ id: "plain", storage: () => ({}) }))(); + const observer = composeExecutionObservers([plain] as const, { plain: {} }); + + // No observer registered: handling is a no-op and never throws. + yield* observer.handle(finishedEvent()); + }), + ); +}); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts new file mode 100644 index 000000000..93ec67690 --- /dev/null +++ b/packages/core/sdk/src/execution-observer.ts @@ -0,0 +1,194 @@ +import { Context, Data, Effect, Schema } from "effect"; +import * as Cause from "effect/Cause"; + +import type { ElicitationContext, ElicitationResponse } from "./elicitation"; +import type { AnyPlugin, OwnerBinding, PluginExtensions } from "./plugin"; + +/* The execution-observer contract: a pull-model lifecycle stream the engine + * emits as it runs code. Plugins opt in via `plugin.runtime.executionObserver` + * and receive every event; sinks (history, metrics, tracing) are built on top. + * Emission is dispatched to all registered observers with per-observer error + * logging, so an observer can never break an execution. */ + +export const ExecutionId = Schema.String.pipe(Schema.brand("ExecutionId")); +export type ExecutionId = typeof ExecutionId.Type; + +export const ExecutionToolCallId = Schema.String.pipe(Schema.brand("ExecutionToolCallId")); +export type ExecutionToolCallId = typeof ExecutionToolCallId.Type; + +export const ExecutionInteractionId = Schema.String.pipe(Schema.brand("ExecutionInteractionId")); +export type ExecutionInteractionId = typeof ExecutionInteractionId.Type; + +export type ExecutionTrigger = { + readonly kind: string; + readonly metadata?: Record; +}; + +export type ToolCallStatus = "completed" | "failed"; +export type InteractionStatus = "accepted" | "declined" | "cancelled" | "failed"; +export type ExecutionStatus = "completed" | "failed"; + +export class ExecutionStarted extends Data.TaggedClass("ExecutionStarted")<{ + readonly executionId: ExecutionId; + readonly owner: OwnerBinding; + readonly code: string; + readonly trigger?: ExecutionTrigger; + readonly startedAt: Date; +}> {} + +export class ToolCallStarted extends Data.TaggedClass("ToolCallStarted")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly owner: OwnerBinding; + readonly path: string; + readonly args: unknown; + readonly startedAt: Date; +}> {} + +export class ToolCallFinished extends Data.TaggedClass("ToolCallFinished")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly owner: OwnerBinding; + readonly path: string; + readonly status: ToolCallStatus; + readonly result?: unknown; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class InteractionStarted extends Data.TaggedClass("InteractionStarted")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly owner: OwnerBinding; + readonly context: ElicitationContext; + readonly startedAt: Date; +}> {} + +export class InteractionResolved extends Data.TaggedClass("InteractionResolved")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly owner: OwnerBinding; + readonly status: InteractionStatus; + readonly response?: ElicitationResponse; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class ExecutionFinished extends Data.TaggedClass("ExecutionFinished")<{ + readonly executionId: ExecutionId; + readonly owner: OwnerBinding; + readonly status: ExecutionStatus; + readonly result?: unknown; + readonly error?: string; + readonly logs?: readonly string[]; + readonly completedAt: Date; +}> {} + +export type ExecutionEvent = + | ExecutionStarted + | ToolCallStarted + | ToolCallFinished + | InteractionStarted + | InteractionResolved + | ExecutionFinished; + +export interface ExecutionObserver { + readonly handle: (event: ExecutionEvent) => Effect.Effect; +} + +export const noopExecutionObserver: ExecutionObserver = { + handle: () => Effect.void, +}; + +const currentExecutionObserver = Context.Reference( + "@executor-js/sdk/ExecutionObserver", + { defaultValue: () => noopExecutionObserver }, +); + +type ExecutionEventName = ExecutionEvent["_tag"]; + +const executionEventName = (event: ExecutionEvent): ExecutionEventName => { + // oxlint-disable-next-line executor/no-manual-tag-check -- boundary: logging uses the Data.TaggedClass discriminant as an event name + return event._tag; +}; + +const logExecutionObserverFailure = ( + event: ExecutionEvent, + cause: Cause.Cause, + pluginId?: string, +): Effect.Effect => + Effect.logWarning("execution observer failed", { + cause: Cause.pretty(cause), + event: executionEventName(event), + ...(pluginId ? { pluginId } : {}), + }); + +const handleExecutionObserverCause = ( + event: ExecutionEvent, + cause: Cause.Cause, + pluginId?: string, +): Effect.Effect => + Cause.hasInterrupts(cause) + ? Effect.interrupt + : logExecutionObserverFailure(event, cause, pluginId); + +/** Emit an execution lifecycle event to the observer installed in the current + * Effect context. Defaults to a no-op when no observer is installed. */ +export const emitExecutionEvent = (event: ExecutionEvent): Effect.Effect => + Effect.service(currentExecutionObserver).pipe( + Effect.flatMap((observer) => observer.handle(event)), + ); + +/** Install an execution observer for the scoped Effect. Non-interrupt observer + * failures are logged and isolated; interrupt causes still propagate as + * cancellation. */ +export const withExecutionObserver = + (observer: ExecutionObserver) => + (effect: Effect.Effect): Effect.Effect => + effect.pipe( + Effect.provideService(currentExecutionObserver, { + handle: (event) => + observer + .handle(event) + .pipe(Effect.catchCause((cause) => handleExecutionObserverCause(event, cause))), + }), + ); + +/** Collect every plugin's `runtime.executionObserver` and fan each event to + * all of them, logging per-observer errors. Returns the no-op observer when no + * plugin registers one, the common opt-out case. */ +export const composeExecutionObservers = ( + plugins: TPlugins, + extensions: PluginExtensions, +): ExecutionObserver => { + const observers: { readonly pluginId: string; readonly observer: ExecutionObserver }[] = + []; + + for (const plugin of plugins) { + const observer = plugin.runtime?.executionObserver?.( + extensions[plugin.id as keyof PluginExtensions] as never, + ); + if (observer) { + observers.push({ pluginId: plugin.id, observer }); + } + } + + if (observers.length === 0) { + return noopExecutionObserver; + } + + return { + handle: (event) => + Effect.forEach( + observers, + ({ pluginId, observer }) => + observer + .handle(event) + .pipe( + Effect.catchCause((cause) => handleExecutionObserverCause(event, cause, pluginId)), + ), + // Preserve plugin order so observers see deterministic sequencing. + { discard: true }, + ), + }; +}; diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..71d3920be 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -319,6 +319,11 @@ export type Executor = { ) => Effect.Effect; readonly close: () => Effect.Effect; + + /** The (tenant, subject) this executor acts as. Surfaced so engine-level + * machinery (e.g. execution observers) can attribute work to an owner + * without re-threading identity through every call site. */ + readonly owner: OwnerBinding; } & PluginExtensions; export interface ExecutorDb { @@ -3299,6 +3304,7 @@ export const createExecutor = => value as Executor; diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index a252febdf..7700c34be 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -168,6 +168,29 @@ export { type InvokeOptions, } from "./elicitation"; +// Execution observers: the engine lifecycle stream history/metrics/tracing build on. +export { + ExecutionId, + ExecutionToolCallId, + ExecutionInteractionId, + ExecutionStarted, + ToolCallStarted, + ToolCallFinished, + InteractionStarted, + InteractionResolved, + ExecutionFinished, + noopExecutionObserver, + composeExecutionObservers, + emitExecutionEvent, + withExecutionObserver, + type ExecutionTrigger, + type ToolCallStatus, + type InteractionStatus, + type ExecutionStatus, + type ExecutionEvent, + type ExecutionObserver, +} from "./execution-observer"; + // Blob store — the plugin-facing contract (`BlobStore`/`PluginBlobStore`) // plus the platform-neutral backends (`makeFumaBlobStore` default, // `makeInMemoryBlobStore` for tests). Platform-specific backends live with diff --git a/packages/core/sdk/src/plugin.ts b/packages/core/sdk/src/plugin.ts index 157002ceb..a1a9f96f2 100644 --- a/packages/core/sdk/src/plugin.ts +++ b/packages/core/sdk/src/plugin.ts @@ -43,6 +43,7 @@ import type { IntegrationRemovalNotAllowedError, InvalidConnectionInputError, } from "./errors"; +import type { ExecutionObserver } from "./execution-observer"; import type { OAuthService } from "./oauth-client"; import type { CredentialProvider, ProviderEntry } from "./provider"; import type { PluginStorageConfig, PluginStorageFacade } from "./plugin-storage"; @@ -514,6 +515,13 @@ export interface PluginSpec< | ((ctx: PluginCtx) => readonly CredentialProvider[]) | ((ctx: PluginCtx) => Effect.Effect); + /** Runtime hooks invoked while the engine executes code. `executionObserver` + * receives this plugin's extension and returns an observer for every + * {@link ExecutionEvent}, the seam history/metrics sinks build on. */ + readonly runtime?: { + readonly executionObserver?: (self: NoInfer) => ExecutionObserver; + }; + readonly close?: () => Effect.Effect; }