Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/execution-observer-foundation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"@executor-js/sdk": minor
"@executor-js/execution": minor
"@executor-js/api": minor
---

Add the execution-observer foundation. The execution engine now emits a typed
lifecycle stream (`ExecutionStarted`/`Finished`, `ToolCallStarted`/`Finished`,
`InteractionStarted`/`Resolved`), plugins can subscribe via the new
`plugin.runtime.executionObserver` hook, and `makeExecutionStack` composes every
registered plugin's observer onto the engine. Behaviour is unchanged when no
plugin observes, making this the opt-in seam the execution-history and
execution-metrics plugins build on. Also exposes `Executor.owner` and enriches
the `mcp.execute` span with the run id and trigger.
10 changes: 8 additions & 2 deletions apps/local/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from "@executor-js/api/server";
import { createExecutionEngine } from "@executor-js/execution";
import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs";
import { composeExecutionObservers, type AnyPlugin } from "@executor-js/sdk";

import { getExecutorBundle, type LocalExecutor } from "./executor";
import { makeLocalIdentityLayer } from "./identity";
Expand Down Expand Up @@ -50,12 +51,17 @@ import { ErrorCaptureLive } from "./observability";
* `HostConfig`/`CodeExecutorProvider` seams — the fixed executor is the whole
* execution model.
*/
const localFixedExecutionLayer = (executor: LocalExecutor): Layer.Layer<FixedExecutionProvider> =>
const localFixedExecutionLayer = (
executor: LocalExecutor,
plugins: readonly AnyPlugin[],
): Layer.Layer<FixedExecutionProvider> =>
Layer.succeed(FixedExecutionProvider)({
executor,
engine: createExecutionEngine({
executor,
codeExecutor: makeQuickJsExecutor(),
// Local bypasses makeExecutionStack, so compose plugin observers here.
observer: composeExecutionObservers(plugins, executor),
}),
// The executor IS its own plugin-extension map (`executor[pluginId]`); the
// fixed middleware reads `executor[id]` to satisfy each plugin's
Expand Down Expand Up @@ -86,7 +92,7 @@ export const makeLocalApiHandler = async (token: string): Promise<LocalApiHandle
// Layer is the `fixedExecution` seam declaration AND lives in `boot` so the
// fixed middleware's residual `FixedExecutionProvider` resolves there — exactly
// as self-host declares `db: SelfHostDbProvider` and puts the handle in `boot`.
const fixedExecution = localFixedExecutionLayer(executor);
const fixedExecution = localFixedExecutionLayer(executor, plugins);

// The authoritative identity gate for the typed `/api`: validates the boot
// bearer token and resolves the one local Principal. The Bun shell
Expand Down
5 changes: 4 additions & 1 deletion apps/local/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Context, Effect, Layer, ManagedRuntime } from "effect";

import { createExecutionEngine } from "@executor-js/execution";
import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs";
import { composeExecutionObservers } from "@executor-js/sdk";
import { makeLocalApiHandler } from "./app";
import { getExecutorBundle } from "./executor";
import { createMcpRequestHandler, type McpRequestHandler } from "./mcp";
Expand Down Expand Up @@ -62,10 +63,12 @@ export const createServerHandlers = async (token: string): Promise<ServerHandler
// engine instance (the browser-approval + stdio surface is local-only and not
// part of the shared API). Reuse the shared boot bundle so the MCP executor is
// byte-identical to the one the API serves.
const { executor } = await getExecutorBundle();
const { executor, plugins } = await getExecutorBundle();
const engine = createExecutionEngine({
executor,
codeExecutor: makeQuickJsExecutor(),
// Local's in-process MCP also bypasses makeExecutionStack, so compose here.
observer: composeExecutionObservers(plugins, executor),
});
const mcp = createMcpRequestHandler({ engine });

Expand Down
9 changes: 8 additions & 1 deletion packages/core/api/src/server/execution-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import { Context, Effect, Layer } from "effect";
import type * as Cause from "effect/Cause";

import { composeExecutionObservers } from "@executor-js/sdk";
import type { AnyPlugin, Executor, StorageFailure } from "@executor-js/sdk";
import {
createExecutionEngine,
Expand Down Expand Up @@ -108,7 +109,13 @@ export const makeExecutionStack = <
);
const codeExecutor = yield* CodeExecutorProvider;
const { decorate } = yield* EngineDecorator;
const engine = decorate(createExecutionEngine({ executor, codeExecutor }), {

const { plugins } = yield* PluginsProvider;
// PluginsProvider erases the tuple to AnyPlugin[]; recover the caller's
// TPlugins phantom so the extensions arg (the executor) lines up.
const observer = composeExecutionObservers(plugins() as TPlugins, executor);

const engine = decorate(createExecutionEngine({ executor, codeExecutor, observer }), {
accountId,
organizationId,
organizationName,
Expand Down
141 changes: 141 additions & 0 deletions packages/core/execution/src/engine-observer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect, Predicate, Schema } from "effect";

import { createExecutor, definePlugin, ElicitationResponse, tool } from "@executor-js/sdk";
import type { ExecutionEvent, ExecutionObserver } from "@executor-js/sdk";
import { makeTestConfig } from "@executor-js/sdk/testing";
import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core";

import { createExecutionEngine } from "./engine";

const emptyPlugin = definePlugin(() => ({
id: "observer-test" as const,
storage: () => ({}),
staticSources: () => [],
}));

const approvalPlugin = definePlugin(() => ({
id: "observer-approval-test" as const,
storage: () => ({}),
staticSources: () => [
{
id: "approval.ctl",
kind: "control" as const,
name: "Approval Ctl",
tools: [
tool({
name: "run",
description: "Requires approval",
annotations: { requiresApproval: true } as const,
inputSchema: Schema.toStandardSchemaV1(Schema.toStandardJSONSchemaV1(Schema.Struct({}))),
execute: () => Effect.succeed("ran"),
}),
],
},
],
}));

const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const }));

const makeApprovalExecutor = () =>
createExecutor(makeTestConfig({ plugins: [approvalPlugin()] as const }));

// A code executor that issues one builtin tool call (tools.search) and then
// completes, enough to exercise the full event sequence.
const toolCallingExecutor: CodeExecutor = {
execute: (_code, invoker) =>
invoker
.invoke({ path: "search", args: { query: "anything" } })
.pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie),
};

const approvalCallingExecutor: CodeExecutor = {
execute: (_code, invoker) =>
invoker
.invoke({ path: "approval.ctl.run", args: {} })
.pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie),
};

const collectingObserver = () => {
const events: ExecutionEvent[] = [];
const observer: ExecutionObserver = {
handle: (event) => Effect.sync(() => void events.push(event)),
};
return { events, observer };
};

describe("execution engine observer emission", () => {
it.effect("emits the full lifecycle for a completed run with a tool call", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const { events, observer } = collectingObserver();
const engine = createExecutionEngine({
executor,
codeExecutor: toolCallingExecutor,
observer,
});

const result = yield* engine.executeWithPause("noop", { trigger: { kind: "test" } });
expect(result.status).toBe("completed");

// First event opens the run, last closes it; tool calls land in between.
// `.find` with isTagged narrows each result, so the assertions read the
// typed fields directly via optional chaining (no conditional blocks).
const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted"));
const finished = events.find((e) => Predicate.isTagged(e, "ExecutionFinished"));
const toolStarted = events.find((e) => Predicate.isTagged(e, "ToolCallStarted"));
const toolFinished = events.find((e) => Predicate.isTagged(e, "ToolCallFinished"));

expect(Predicate.isTagged(events[0], "ExecutionStarted")).toBe(true);
expect(Predicate.isTagged(events[events.length - 1], "ExecutionFinished")).toBe(true);

expect(started?.trigger?.kind).toBe("test");
expect(started?.owner.tenant).toBeDefined();
expect(toolStarted).toBeDefined();
expect(finished?.status).toBe("completed");

// Tool-call events share the run's executionId and carry the path.
expect(toolFinished?.path).toBe("search");
expect(toolFinished?.status).toBe("completed");
expect(toolFinished?.executionId).toBe(started?.executionId);
}),
);

it.effect("emits inline interaction events when execute handles elicitation", () =>
Effect.gen(function* () {
const executor = yield* makeApprovalExecutor();
const { events, observer } = collectingObserver();
const engine = createExecutionEngine({
executor,
codeExecutor: approvalCallingExecutor,
observer,
});

const result = yield* engine.execute("noop", {
trigger: { kind: "test" },
onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })),
});
expect(result.result).toBe("ok");

const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted"));
const interactionStarted = events.find((e) => Predicate.isTagged(e, "InteractionStarted"));
const interactionResolved = events.find((e) => Predicate.isTagged(e, "InteractionResolved"));

expect(interactionStarted?.executionId).toBe(started?.executionId);
expect(interactionResolved?.executionId).toBe(started?.executionId);
expect(interactionResolved?.interactionId).toBe(interactionStarted?.interactionId);
expect(interactionStarted?.context.request.message).toContain("approval");
expect(interactionResolved?.status).toBe("accepted");
expect(interactionResolved?.response?.action).toBe("accept");
}),
);

it.effect("does nothing observable when no observer is configured", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const engine = createExecutionEngine({ executor, codeExecutor: toolCallingExecutor });
const result = yield* engine.executeWithPause("noop");
expect(result.status).toBe("completed");
}),
);
});
Loading
Loading