Skip to content

Commit a6b72b4

Browse files
committed
feat(execution): add execution observer foundation
1 parent 1329cca commit a6b72b4

11 files changed

Lines changed: 655 additions & 23 deletions

File tree

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
"@executor-js/sdk": minor
3+
"@executor-js/execution": minor
4+
"@executor-js/api": minor
5+
---
6+
7+
Add the execution-observer foundation. The execution engine now emits a typed
8+
lifecycle stream (`ExecutionStarted`/`Finished`, `ToolCallStarted`/`Finished`,
9+
`InteractionStarted`/`Resolved`), plugins can subscribe via the new
10+
`plugin.runtime.executionObserver` hook, and `makeExecutionStack` composes every
11+
registered plugin's observer onto the engine. Behaviour is unchanged when no
12+
plugin observes, making this the opt-in seam the execution-history and
13+
execution-metrics plugins build on. Also exposes `Executor.owner` and enriches
14+
the `mcp.execute` span with the run id and trigger.

apps/local/src/app.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
} from "@executor-js/api/server";
1010
import { createExecutionEngine } from "@executor-js/execution";
1111
import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs";
12+
import { composeExecutionObservers, type AnyPlugin } from "@executor-js/sdk";
1213

1314
import { getExecutorBundle, type LocalExecutor } from "./executor";
1415
import { makeLocalIdentityLayer } from "./identity";
@@ -50,12 +51,17 @@ import { ErrorCaptureLive } from "./observability";
5051
* `HostConfig`/`CodeExecutorProvider` seams — the fixed executor is the whole
5152
* execution model.
5253
*/
53-
const localFixedExecutionLayer = (executor: LocalExecutor): Layer.Layer<FixedExecutionProvider> =>
54+
const localFixedExecutionLayer = (
55+
executor: LocalExecutor,
56+
plugins: readonly AnyPlugin[],
57+
): Layer.Layer<FixedExecutionProvider> =>
5458
Layer.succeed(FixedExecutionProvider)({
5559
executor,
5660
engine: createExecutionEngine({
5761
executor,
5862
codeExecutor: makeQuickJsExecutor(),
63+
// Local bypasses makeExecutionStack, so compose plugin observers here.
64+
observer: composeExecutionObservers(plugins, executor),
5965
}),
6066
// The executor IS its own plugin-extension map (`executor[pluginId]`); the
6167
// fixed middleware reads `executor[id]` to satisfy each plugin's
@@ -86,7 +92,7 @@ export const makeLocalApiHandler = async (token: string): Promise<LocalApiHandle
8692
// Layer is the `fixedExecution` seam declaration AND lives in `boot` so the
8793
// fixed middleware's residual `FixedExecutionProvider` resolves there — exactly
8894
// as self-host declares `db: SelfHostDbProvider` and puts the handle in `boot`.
89-
const fixedExecution = localFixedExecutionLayer(executor);
95+
const fixedExecution = localFixedExecutionLayer(executor, plugins);
9096

9197
// The authoritative identity gate for the typed `/api`: validates the boot
9298
// bearer token and resolves the one local Principal. The Bun shell

apps/local/src/main.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Context, Effect, Layer, ManagedRuntime } from "effect";
22

33
import { createExecutionEngine } from "@executor-js/execution";
44
import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs";
5+
import { composeExecutionObservers } from "@executor-js/sdk";
56
import { makeLocalApiHandler } from "./app";
67
import { getExecutorBundle } from "./executor";
78
import { createMcpRequestHandler, type McpRequestHandler } from "./mcp";
@@ -62,10 +63,12 @@ export const createServerHandlers = async (token: string): Promise<ServerHandler
6263
// engine instance (the browser-approval + stdio surface is local-only and not
6364
// part of the shared API). Reuse the shared boot bundle so the MCP executor is
6465
// byte-identical to the one the API serves.
65-
const { executor } = await getExecutorBundle();
66+
const { executor, plugins } = await getExecutorBundle();
6667
const engine = createExecutionEngine({
6768
executor,
6869
codeExecutor: makeQuickJsExecutor(),
70+
// Local's in-process MCP also bypasses makeExecutionStack, so compose here.
71+
observer: composeExecutionObservers(plugins, executor),
6972
});
7073
const mcp = createMcpRequestHandler({ engine });
7174

packages/core/api/src/server/execution-stack.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import { Context, Effect, Layer } from "effect";
2727
import type * as Cause from "effect/Cause";
2828

29+
import { composeExecutionObservers } from "@executor-js/sdk";
2930
import type { AnyPlugin, Executor, StorageFailure } from "@executor-js/sdk";
3031
import {
3132
createExecutionEngine,
@@ -108,7 +109,17 @@ export const makeExecutionStack = <
108109
);
109110
const codeExecutor = yield* CodeExecutorProvider;
110111
const { decorate } = yield* EngineDecorator;
111-
const engine = decorate(createExecutionEngine({ executor, codeExecutor }), {
112+
113+
// Compose every registered plugin's runtime.executionObserver, bound to the
114+
// executor's own extensions. Resolves to a no-op when no plugin observes,
115+
// so a stack with no history/metrics plugin is unaffected. `plugins()` is
116+
// the erased AnyPlugin[]; the caller's TPlugins phantom recovers the tuple.
117+
const { plugins } = yield* PluginsProvider;
118+
// PluginsProvider erases the tuple to AnyPlugin[]; recover the caller's
119+
// TPlugins phantom so the extensions arg (the executor) lines up.
120+
const observer = composeExecutionObservers(plugins() as TPlugins, executor);
121+
122+
const engine = decorate(createExecutionEngine({ executor, codeExecutor, observer }), {
112123
accountId,
113124
organizationId,
114125
organizationName,
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { describe, expect, it } from "@effect/vitest";
2+
import { Effect, Predicate } from "effect";
3+
4+
import { createExecutor, definePlugin } from "@executor-js/sdk";
5+
import type { ExecutionEvent, ExecutionObserver } from "@executor-js/sdk";
6+
import { makeTestConfig } from "@executor-js/sdk/testing";
7+
import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core";
8+
9+
import { createExecutionEngine } from "./engine";
10+
11+
const emptyPlugin = definePlugin(() => ({
12+
id: "observer-test" as const,
13+
storage: () => ({}),
14+
staticSources: () => [],
15+
}));
16+
17+
const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const }));
18+
19+
// A code executor that issues one builtin tool call (tools.search) and then
20+
// completes, enough to exercise the full event sequence.
21+
const toolCallingExecutor: CodeExecutor = {
22+
execute: (_code, invoker) =>
23+
invoker
24+
.invoke({ path: "search", args: { query: "anything" } })
25+
.pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie),
26+
};
27+
28+
const collectingObserver = () => {
29+
const events: ExecutionEvent[] = [];
30+
const observer: ExecutionObserver = {
31+
handle: (event) => Effect.sync(() => void events.push(event)),
32+
};
33+
return { events, observer };
34+
};
35+
36+
describe("execution engine observer emission", () => {
37+
it.effect("emits the full lifecycle for a completed run with a tool call", () =>
38+
Effect.gen(function* () {
39+
const executor = yield* makeExecutor();
40+
const { events, observer } = collectingObserver();
41+
const engine = createExecutionEngine({
42+
executor,
43+
codeExecutor: toolCallingExecutor,
44+
observer,
45+
});
46+
47+
const result = yield* engine.executeWithPause("noop", { trigger: { kind: "test" } });
48+
expect(result.status).toBe("completed");
49+
50+
// First event opens the run, last closes it; tool calls land in between.
51+
// `.find` with isTagged narrows each result, so the assertions read the
52+
// typed fields directly via optional chaining (no conditional blocks).
53+
const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted"));
54+
const finished = events.find((e) => Predicate.isTagged(e, "ExecutionFinished"));
55+
const toolStarted = events.find((e) => Predicate.isTagged(e, "ToolCallStarted"));
56+
const toolFinished = events.find((e) => Predicate.isTagged(e, "ToolCallFinished"));
57+
58+
expect(Predicate.isTagged(events[0], "ExecutionStarted")).toBe(true);
59+
expect(Predicate.isTagged(events[events.length - 1], "ExecutionFinished")).toBe(true);
60+
61+
expect(started?.trigger?.kind).toBe("test");
62+
expect(started?.owner.tenant).toBeDefined();
63+
expect(toolStarted).toBeDefined();
64+
expect(finished?.status).toBe("completed");
65+
66+
// Tool-call events share the run's executionId and carry the path.
67+
expect(toolFinished?.path).toBe("search");
68+
expect(toolFinished?.status).toBe("completed");
69+
expect(toolFinished?.executionId).toBe(started?.executionId);
70+
}),
71+
);
72+
73+
it.effect("does nothing observable when no observer is configured", () =>
74+
Effect.gen(function* () {
75+
const executor = yield* makeExecutor();
76+
const engine = createExecutionEngine({ executor, codeExecutor: toolCallingExecutor });
77+
const result = yield* engine.executeWithPause("noop");
78+
expect(result.status).toBe("completed");
79+
}),
80+
);
81+
});

0 commit comments

Comments
 (0)