-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathsession-durable-object.ts
More file actions
270 lines (249 loc) · 10.9 KB
/
Copy pathsession-durable-object.ts
File metadata and controls
270 lines (249 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
// ---------------------------------------------------------------------------
// Cloud MCP Session Durable Object — the cloud binding of the shared
// `McpSessionDOBase` (@executor-js/cloudflare). All session lifecycle (cold
// restore, the inactivity alarm, owner validation, transport upgrade, the
// browser-approval store, the per-request span bridge) lives in the base; cloud
// supplies ONLY its injected dependencies:
// - openSessionDb → a long-lived postgres.js handle
// - resolveSessionMeta → WorkOS/UserStore organization resolution
// - buildMcpServer → the cloud execution stack + MCP tool server
// - withTelemetry → the WebSdk tracer + W3C parent-span stitching
// - captureCause → Sentry error capture
// host-cloudflare binds the same base to D1 instead; the two stay byte-identical
// except for these seams.
// ---------------------------------------------------------------------------
import { env } from "cloudflare:workers";
import { createTraceState } from "@opentelemetry/api";
import { Data, Effect, Layer } from "effect";
import type { Cause } from "effect";
import * as OtelTracer from "@effect/opentelemetry/Tracer";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres, { type Sql } from "postgres";
import { createExecutorMcpServer } from "@executor-js/host-mcp/tool-server";
import { buildResumeApprovalUrl } from "@executor-js/host-mcp/browser-approval";
import {
McpSessionDOBase,
type BuiltMcpServer,
type IncomingTraceHeaders,
type McpSessionInit,
type SessionMeta,
} from "@executor-js/cloudflare/mcp/durable-object";
import { buildExecuteDescription } from "@executor-js/execution";
// The DO only needs the neutral boot-scoped service (WorkOSClient). It never
// bills, so it does NOT depend on any billing service — `CloudExecutionStackLayer`
// here is the no-op-decorator (Autumn-free) stack. It imports the focused
// `CoreSharedServices` root (beside `WorkOSClient`), NOT `../api/layers`, so the
// DO bundle stays small and free of the whole HTTP API assembly. (This used to
// require a dedicated `core-shared-services.ts` leaf to keep `auth/handlers.ts` →
// `@tanstack/react-start` out of the DO bundle; that coupling is gone now that
// `handlers.ts` queues cookies through `SessionAuthLive` instead.)
import { CoreSharedServices } from "../auth/workos";
import { UserStoreService } from "../auth/context";
import { resolveOrganization } from "../auth/organization";
import {
DbService,
combinedSchema,
resolveConnectionString,
type DrizzleDb,
type DbServiceShape,
} from "../db/db";
import { CloudExecutionStackLayer, makeExecutionStack } from "../engine/execution-stack";
import { DoTelemetryLive, flushTracerProvider } from "../observability/telemetry";
import { captureCause as reportCause } from "../observability";
// Re-export the shared types so existing cloud importers
// (`auth/handlers.ts`, etc.) keep their `../mcp/session-durable-object` path.
export type {
McpApprovalOwner,
McpSessionApprovalResult,
McpSessionResumeApprovalResult,
McpSessionInit,
IncomingTraceHeaders,
} from "@executor-js/cloudflare/mcp/durable-object";
// ---------------------------------------------------------------------------
// Cloud DB handle — one postgres.js client per session runtime
// ---------------------------------------------------------------------------
const LONG_LIVED_DB_IDLE_TIMEOUT_SECONDS = 5;
const LONG_LIVED_DB_MAX_LIFETIME_SECONDS = 120;
const TELEMETRY_FLUSH_TIMEOUT_MS = 1_000;
type CloudSessionDbHandle = DbServiceShape & {
readonly sql: Sql;
readonly end: () => Promise<void>;
};
class OrganizationNotFoundError extends Data.TaggedError("OrganizationNotFoundError")<{
readonly organizationId: string;
}> {}
// W3C propagation across the worker→DO boundary. The worker injects its
// `traceparent` and forwards incoming `tracestate` / `baggage`; we parse the
// context and use `OtelTracer.withSpanContext` to stitch the DO's root span
// under the worker span so the entire logical request lives in one trace.
const TRACEPARENT_PATTERN = /^([0-9a-f]{2})-([0-9a-f]{32})-([0-9a-f]{16})-([0-9a-f]{2})$/;
type IncomingSpanContext = {
readonly traceId: string;
readonly spanId: string;
readonly traceFlags: number;
readonly traceState?: ReturnType<typeof createTraceState>;
};
const parseTraceparent = (
traceparent: string | null | undefined,
tracestate: string | null | undefined,
): IncomingSpanContext | null => {
if (!traceparent) return null;
const match = TRACEPARENT_PATTERN.exec(traceparent);
if (!match) return null;
return {
traceId: match[2]!,
spanId: match[3]!,
traceFlags: parseInt(match[4]!, 16),
...(tracestate ? { traceState: createTraceState(tracestate) } : {}),
};
};
/**
* The DO keeps one postgres.js client for the MCP session runtime. postgres.js
* closes idle sockets quickly, while the runtime object stays alive so the MCP
* server can preserve session-local protocol state across requests.
*/
const makeDbHandle = (options: {
readonly idleTimeout: number;
readonly maxLifetime: number;
}): CloudSessionDbHandle => {
const sql = postgres(resolveConnectionString(), {
max: 1,
idle_timeout: options.idleTimeout,
max_lifetime: options.maxLifetime,
connect_timeout: 10,
fetch_types: false,
prepare: true,
onnotice: () => undefined,
});
return {
sql,
db: drizzle(sql, { schema: combinedSchema }) as DrizzleDb,
// oxlint-disable-next-line executor/no-promise-catch -- boundary: postgres.js close is best-effort during DO/runtime cleanup
end: () => sql.end({ timeout: 0 }).catch(() => undefined),
};
};
const makeEphemeralDb = (): CloudSessionDbHandle =>
makeDbHandle({ idleTimeout: 0, maxLifetime: 60 });
// The org-resolution + session-runtime services. They DON'T re-provide
// `DoTelemetryLive` — that would install a second WebSdk tracer in the nested
// Effect scope, disconnecting every child span from the outer DO-method trace.
// Tracer comes from the outermost `withTelemetry` at the DO method boundary.
const makeSessionServices = (dbHandle: CloudSessionDbHandle) => {
const DbLive = Layer.succeed(DbService)({ sql: dbHandle.sql, db: dbHandle.db });
const UserStoreLive = UserStoreService.Live.pipe(Layer.provide(DbLive));
return Layer.mergeAll(DbLive, UserStoreLive, CoreSharedServices);
};
// ---------------------------------------------------------------------------
// Durable Object
// ---------------------------------------------------------------------------
export class McpSessionDO extends McpSessionDOBase<CloudSessionDbHandle> {
protected override openSessionDb(): CloudSessionDbHandle {
return makeDbHandle({
idleTimeout: LONG_LIVED_DB_IDLE_TIMEOUT_SECONDS,
maxLifetime: LONG_LIVED_DB_MAX_LIFETIME_SECONDS,
});
}
protected override resolveSessionMeta(token: McpSessionInit): Effect.Effect<SessionMeta> {
const dbHandle = makeEphemeralDb();
return Effect.gen(function* () {
const org = yield* resolveOrganization(token.organizationId);
if (!org) {
return yield* new OrganizationNotFoundError({ organizationId: token.organizationId });
}
return {
organizationId: org.id,
organizationName: org.name,
organizationSlug: org.slug,
userId: token.userId,
elicitationMode: token.elicitationMode,
codeMode: token.codeMode,
} satisfies SessionMeta;
}).pipe(
Effect.withSpan("McpSessionDO.resolveSessionMeta"),
Effect.provide(makeSessionServices(dbHandle)),
Effect.ensuring(Effect.promise(() => dbHandle.end())),
// oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: a vanished org is a defect; the worker already verified the bearer
Effect.orDie,
);
}
protected override buildMcpServer(
sessionMeta: SessionMeta,
dbHandle: CloudSessionDbHandle,
): Effect.Effect<BuiltMcpServer> {
const self = this;
return Effect.gen(function* () {
const { executor, engine } = yield* makeExecutionStack(
sessionMeta.userId,
sessionMeta.organizationId,
sessionMeta.organizationName,
).pipe(
Effect.provide(CloudExecutionStackLayer),
Effect.withSpan("McpSessionDO.makeExecutionStack"),
);
// Build the description here so `executor.connections.list()` stays under
// the DO startup span and the MCP SDK receives a concrete string instead
// of invoking `engine.getDescription` across its async boundary.
const description = yield* buildExecuteDescription(executor);
const sessionElicitationMode = sessionMeta.elicitationMode ?? "model";
const mcpServer = yield* createExecutorMcpServer({
engine,
description,
parentSpan: () => self.currentParentSpan(),
debug: env.EXECUTOR_MCP_DEBUG === "true",
browserApprovalStore: self.browserApprovalStore,
codeMode: sessionMeta.codeMode,
elicitationMode:
sessionElicitationMode === "browser"
? {
mode: "browser" as const,
approvalUrl: (executionId) =>
buildResumeApprovalUrl({
origin: env.VITE_PUBLIC_SITE_URL ?? "https://executor.sh",
executionId,
sessionId: self.sessionId,
}),
}
: { mode: sessionElicitationMode },
}).pipe(Effect.withSpan("McpSessionDO.createExecutorMcpServer"));
return { mcpServer, engine } satisfies BuiltMcpServer;
}).pipe(
Effect.withSpan("McpSessionDO.buildMcpServer"),
Effect.provide(makeSessionServices(dbHandle)),
// oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: runtime-build failures surface as the base's tapCause/cleanup defect
Effect.orDie,
);
}
protected override withTelemetry<A, E>(
effect: Effect.Effect<A, E>,
incoming?: IncomingTraceHeaders,
): Effect.Effect<A, E> {
const parsed = parseTraceparent(incoming?.traceparent, incoming?.tracestate);
const traced = parsed ? OtelTracer.withSpanContext(effect, parsed) : effect;
return traced.pipe(Effect.provide(DoTelemetryLive));
}
protected override captureCause(cause: Cause.Cause<unknown>): void {
reportCause(cause);
}
// Best-effort export the DO isolate's buffered spans after the RPC settles,
// so a dying init/handleRequest can ship its own spans (and the exception +
// stack recorded on them) — not just the worker-side `mcp.do.*` span. Keep it
// off the response path and bounded: telemetry export must not hold a
// successful MCP response open.
protected override flushTelemetry(): Promise<void> {
this.ctx.waitUntil(
Effect.runPromise(
Effect.tryPromise({
try: () => flushTracerProvider(),
catch: () => undefined,
}).pipe(
Effect.ignore,
Effect.timeoutOrElse({
duration: `${TELEMETRY_FLUSH_TIMEOUT_MS} millis`,
orElse: () => Effect.void,
}),
),
),
);
return Promise.resolve();
}
}