Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions packages/core/execution/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import {
InteractionStarted,
ToolCallFinished,
ToolCallStarted,
ignoreExecutionObserverErrors,
emitExecutionEvent,
noopExecutionObserver,
withExecutionObserver,
} from "@executor-js/sdk/core";
import { CodeExecutionError } from "@executor-js/codemode-core";
import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core";
Expand Down Expand Up @@ -462,10 +463,9 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
}
};

// Observer wiring. `emit` never fails (errors are swallowed per-observer) and
// resolves to a no-op when no plugin registered one — the opt-out default.
const observer = ignoreExecutionObserverErrors(config.observer ?? noopExecutionObserver);
const emit = observer.handle;
// Observer wiring is scoped around public execution methods so detached
// execution fibers inherit the observer context for pause/resume events.
const observeExecution = withExecutionObserver(config.observer ?? noopExecutionObserver);
const owner = executor.owner;

const makeExecutionId = (): ExecutionId => ExecutionId.make(`exec_${crypto.randomUUID()}`);
Expand Down Expand Up @@ -505,7 +505,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
invoke: (call) =>
Effect.gen(function* () {
const toolCallId = makeToolCallId();
yield* emit(
yield* emitExecutionEvent(
new ToolCallStarted({
executionId,
toolCallId,
Expand All @@ -517,7 +517,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
);
return yield* inner.invoke(call).pipe(
Effect.tap((result) =>
emit(
emitExecutionEvent(
new ToolCallFinished({
executionId,
toolCallId,
Expand All @@ -530,7 +530,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
),
),
Effect.tapCause((cause) =>
emit(
emitExecutionEvent(
new ToolCallFinished({
executionId,
toolCallId,
Expand All @@ -553,7 +553,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
(ctx) =>
Effect.gen(function* () {
const interactionId = makeInteractionId();
yield* emit(
yield* emitExecutionEvent(
new InteractionStarted({
executionId,
interactionId,
Expand All @@ -564,7 +564,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
);
return yield* handler(ctx).pipe(
Effect.tap((response) =>
emit(
emitExecutionEvent(
new InteractionResolved({
executionId,
interactionId,
Expand All @@ -576,7 +576,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
),
),
Effect.tapCause((cause) =>
emit(
emitExecutionEvent(
new InteractionResolved({
executionId,
interactionId,
Expand Down Expand Up @@ -635,7 +635,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
// resume handle below so a run with N interactions is one execution with N
// interaction events.
const executionId = makeExecutionId();
yield* emit(
yield* emitExecutionEvent(
new ExecutionStarted({
executionId,
owner,
Expand Down Expand Up @@ -675,7 +675,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
};
pausedExecutions.set(id, paused);

yield* emit(
yield* emitExecutionEvent(
new InteractionStarted({
executionId,
interactionId,
Expand All @@ -692,7 +692,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
// pair — mirroring observeInlineElicitation on the inline path.
return yield* Deferred.await(responseDeferred).pipe(
Effect.tap((response) =>
emit(
emitExecutionEvent(
new InteractionResolved({
executionId,
interactionId,
Expand All @@ -704,7 +704,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
),
),
Effect.tapCause((cause) =>
emit(
emitExecutionEvent(
new InteractionResolved({
executionId,
interactionId,
Expand All @@ -725,8 +725,8 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
fiber = yield* Effect.forkDetach(
codeExecutor.execute(code, invoker).pipe(
Effect.withSpan("executor.code.exec"),
Effect.tap((result) => emit(finishFromResult(executionId, result))),
Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))),
Effect.tap((result) => emitExecutionEvent(finishFromResult(executionId, result))),
Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))),
),
);

Expand Down Expand Up @@ -823,7 +823,7 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
});

const executionId = makeExecutionId();
yield* emit(
yield* emitExecutionEvent(
new ExecutionStarted({
executionId,
owner,
Expand All @@ -847,15 +847,17 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
);
return yield* codeExecutor.execute(code, invoker).pipe(
Effect.withSpan("executor.code.exec"),
Effect.tap((result) => emit(finishFromResult(executionId, result))),
Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))),
Effect.tap((result) => emitExecutionEvent(finishFromResult(executionId, result))),
Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))),
);
});

return {
execute: runInlineExecution,
executeWithPause: startPausableExecution,
resume: resumeExecution,
execute: (code, options) => runInlineExecution(code, options).pipe(observeExecution),
executeWithPause: (code, options) =>
startPausableExecution(code, options).pipe(observeExecution),
resume: (executionId, response) =>
resumeExecution(executionId, response).pipe(observeExecution),
getPausedExecution: (executionId) =>
Effect.sync(() => pausedExecutions.get(executionId) ?? null),
pausedExecutionCount: () => Effect.sync(() => pausedExecutions.size),
Expand Down
85 changes: 77 additions & 8 deletions packages/core/sdk/src/execution-observer.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect } from "effect";
import { Cause, Effect, Exit } from "effect";

import { Subject, Tenant } from "./ids";
import { ExecutionFinished, ExecutionId, composeExecutionObservers, definePlugin } from "./index";
import {
ExecutionFinished,
ExecutionId,
composeExecutionObservers,
definePlugin,
emitExecutionEvent,
withExecutionObserver,
} from "./index";

const owner = { tenant: Tenant.make("tenant_test"), subject: Subject.make("subject_test") };

let calls: string[] = [];

const observingPlugin = (id: string) =>
const observingPlugin = (id: string, asyncBoundary = false) =>
definePlugin(() => ({
id,
storage: () => ({}),
extension: () => ({ label: id }),
runtime: {
executionObserver: (self: { label: string }) => ({
handle: () => Effect.sync(() => calls.push(self.label)),
handle: () =>
(asyncBoundary ? Effect.promise(() => Promise.resolve()) : Effect.void).pipe(
Effect.flatMap(() => Effect.sync(() => calls.push(self.label))),
),
}),
},
}));
Expand All @@ -31,6 +41,17 @@ const failingPlugin = definePlugin(() => ({
},
}));

const interruptingPlugin = definePlugin(() => ({
id: "interrupting" as const,
storage: () => ({}),
extension: () => ({ label: "interrupting" }),
runtime: {
executionObserver: () => ({
handle: () => Effect.interrupt,
}),
},
}));

const finishedEvent = () =>
new ExecutionFinished({
executionId: ExecutionId.make("exec_test"),
Expand All @@ -41,10 +62,23 @@ const finishedEvent = () =>
});

describe("composeExecutionObservers", () => {
it.effect("fans an event to every plugin observer and isolates failures", () =>
it.effect("emits events to the scoped observer", () =>
Effect.gen(function* () {
calls = [];
const first = observingPlugin("first")();
yield* emitExecutionEvent(finishedEvent()).pipe(
withExecutionObserver({
handle: () => Effect.sync(() => calls.push("observed")),
}),
);

expect(calls).toEqual(["observed"]);
}),
);

it.effect("dispatches observers sequentially and isolates failures", () =>
Effect.gen(function* () {
calls = [];
const first = observingPlugin("first", true)();
const failing = failingPlugin();
const last = observingPlugin("last")();
const observer = composeExecutionObservers([first, failing, last] as const, {
Expand All @@ -53,19 +87,54 @@ describe("composeExecutionObservers", () => {
last: { label: "last" },
});

// The failing plugin dies mid-fan; the others must still observe.
// The failing plugin dies mid-dispatch; the others must still observe.
yield* observer.handle(finishedEvent());

expect(calls).toEqual(["first", "last"]);
}),
);

it.effect("preserves interrupts from scoped observers", () =>
Effect.gen(function* () {
const exit = yield* Effect.exit(
emitExecutionEvent(finishedEvent()).pipe(
withExecutionObserver({
handle: () => Effect.interrupt,
}),
),
);

expect(Exit.isFailure(exit)).toBe(true);
if (!Exit.isFailure(exit)) return;
expect(Cause.hasInterrupts(exit.cause)).toBe(true);
}),
);

it.effect("preserves interrupts from composed plugin observers", () =>
Effect.gen(function* () {
calls = [];
const interrupting = interruptingPlugin();
const last = observingPlugin("last")();
const observer = composeExecutionObservers([interrupting, last] as const, {
interrupting: { label: "interrupting" },
last: { label: "last" },
});

const exit = yield* Effect.exit(observer.handle(finishedEvent()));

expect(Exit.isFailure(exit)).toBe(true);
if (!Exit.isFailure(exit)) return;
expect(Cause.hasInterrupts(exit.cause)).toBe(true);
expect(calls).toEqual([]);
}),
);

it.effect("returns a no-op observer when no plugin registers one", () =>
Effect.gen(function* () {
const plain = definePlugin(() => ({ id: "plain", storage: () => ({}) }))();
const observer = composeExecutionObservers([plain] as const, { plain: {} });

// No observer registeredhandling is a silent no-op, never throws.
// No observer registered: handling is a no-op and never throws.
yield* observer.handle(finishedEvent());
}),
);
Expand Down
Loading
Loading