Skip to content

Commit eb6d8aa

Browse files
committed
Trace tool calls from code cells
1 parent 0d0215d commit eb6d8aa

2 files changed

Lines changed: 159 additions & 1 deletion

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import { randomBytes } from "node:crypto";
2+
import { createServer } from "node:http";
3+
4+
import { expect } from "@effect/vitest";
5+
import { Effect } from "effect";
6+
import { composePluginApi } from "@executor-js/api/server";
7+
import { openApiHttpPlugin } from "@executor-js/plugin-openapi/api";
8+
import { AuthTemplateSlug, ConnectionName, IntegrationSlug } from "@executor-js/sdk/shared";
9+
10+
import { scenario } from "../src/scenario";
11+
import { Api, Target, Telemetry } from "../src/services";
12+
13+
const api = composePluginApi([openApiHttpPlugin()] as const);
14+
15+
const upstreamSpec = (baseUrl: string): string =>
16+
JSON.stringify({
17+
openapi: "3.0.3",
18+
info: { title: "Cell Telemetry Upstream", version: "1.0.0" },
19+
servers: [{ url: baseUrl }],
20+
paths: {
21+
"/ok": {
22+
get: {
23+
operationId: "ok",
24+
summary: "Succeeds",
25+
tags: ["probe"],
26+
responses: { "200": { description: "" } },
27+
},
28+
},
29+
},
30+
});
31+
32+
const serveUpstream = Effect.acquireRelease(
33+
Effect.callback<{ readonly baseUrl: string; readonly close: () => void }>((resume) => {
34+
const server = createServer((_request, response) => {
35+
response.writeHead(200, { "content-type": "application/json" });
36+
response.end('{"fine":true}');
37+
});
38+
server.listen(0, "127.0.0.1", () => {
39+
const address = server.address();
40+
const port = typeof address === "object" && address ? address.port : 0;
41+
resume(
42+
Effect.succeed({
43+
baseUrl: `http://127.0.0.1:${port}`,
44+
close: () => {
45+
server.close();
46+
server.closeAllConnections();
47+
},
48+
}),
49+
);
50+
});
51+
}),
52+
(server) => Effect.sync(server.close),
53+
);
54+
55+
scenario(
56+
"Code mode cell tool calls carry trace correlation metadata",
57+
{ timeout: 180_000 },
58+
Effect.scoped(
59+
Effect.gen(function* () {
60+
const target = yield* Target;
61+
const { client: apiClient } = yield* Api;
62+
const telemetry = yield* Telemetry;
63+
const identity = yield* target.newIdentity();
64+
const client = yield* apiClient(api, identity);
65+
const upstream = yield* serveUpstream;
66+
67+
const slug = IntegrationSlug.make(`celltrace${randomBytes(4).toString("hex")}`);
68+
yield* client.openapi.addSpec({
69+
payload: {
70+
spec: { kind: "blob", value: upstreamSpec(upstream.baseUrl) },
71+
slug,
72+
baseUrl: upstream.baseUrl,
73+
authenticationTemplate: [
74+
{
75+
slug: "apiKey",
76+
type: "apiKey",
77+
headers: { Authorization: ["Bearer ", { type: "variable", name: "token" }] },
78+
},
79+
],
80+
},
81+
});
82+
yield* client.connections.create({
83+
payload: {
84+
owner: "org",
85+
name: ConnectionName.make("main"),
86+
integration: slug,
87+
template: AuthTemplateSlug.make("apiKey"),
88+
value: "cell-telemetry-token",
89+
},
90+
});
91+
92+
const tools = yield* client.tools.list({ query: {} });
93+
const tool = tools.find(
94+
(entry) =>
95+
String(entry.integration) === String(slug) && String(entry.address).endsWith(".ok"),
96+
);
97+
expect(tool, "the OpenAPI operation is in the tool catalog").toBeDefined();
98+
const address = String(tool!.address);
99+
const path = address.startsWith("tools.") ? address.slice("tools.".length) : address;
100+
101+
const specResponse = yield* Effect.promise(() =>
102+
fetch(new URL("/api/openapi.json", target.baseUrl)),
103+
);
104+
const spec = (yield* Effect.promise(() => specResponse.json())) as {
105+
readonly paths?: Record<string, unknown>;
106+
};
107+
expect(Object.keys(spec.paths ?? {}), "the cell API is documented").toContain(
108+
"/api/execution-cells",
109+
);
110+
111+
const cellResponse = yield* Effect.promise(() =>
112+
fetch(new URL("/api/execution-cells", target.baseUrl), {
113+
method: "POST",
114+
headers: {
115+
...(identity.headers ?? {}),
116+
"content-type": "application/json",
117+
origin: new URL(target.baseUrl).origin,
118+
},
119+
body: JSON.stringify({
120+
yieldAfterMs: 5_000,
121+
code: `return await tools.${path}({});`,
122+
}),
123+
}),
124+
);
125+
const cellBody = yield* Effect.promise(() => cellResponse.text());
126+
expect(cellResponse.status, `startCell response body: ${cellBody.slice(0, 500)}`).toBe(200);
127+
const cell = JSON.parse(cellBody) as {
128+
readonly status?: unknown;
129+
readonly cellId?: unknown;
130+
};
131+
expect(cell.status, "a one-shot tool call cell completes").toBe("completed");
132+
expect(cell.cellId, "the completed cell response includes a cell id").toEqual(
133+
expect.any(String),
134+
);
135+
136+
const span = yield* telemetry.expectSpan({
137+
operation: "executor.code.cell.tool",
138+
attributes: {
139+
"executor.tool.source": "code_cell",
140+
"executor.tool.path": path,
141+
},
142+
});
143+
expect(
144+
span.span.tags["executor.code.cell_id"],
145+
"the cell tool span carries a cell id",
146+
).toBeTruthy();
147+
}),
148+
),
149+
);

packages/core/execution/src/engine.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,16 @@ export const createExecutionEngine = <E extends Cause.YieldableError = CodeExecu
654654
toolDiscoveryProvider,
655655
);
656656
const invoker: SandboxToolInvoker = {
657-
invoke: (input) => baseInvoker.invoke(input),
657+
invoke: (input) =>
658+
baseInvoker.invoke(input).pipe(
659+
Effect.withSpan("executor.code.cell.tool", {
660+
attributes: {
661+
"executor.code.cell_id": cell.id,
662+
"executor.tool.source": "code_cell",
663+
"executor.tool.path": input.path,
664+
},
665+
}),
666+
),
658667
};
659668

660669
const fiber = yield* Effect.forkDetach(

0 commit comments

Comments
 (0)