wip: Centralize all harnesses with single websocket server#581
wip: Centralize all harnesses with single websocket server#581
Conversation
- introduce shared harness schemas, events, and WS request/response contracts - add new `@t3tools/harness-ws-server` package with adapters, connector/server/storage modules, and tests - export new contracts and register workspace package in lockfile
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
| assertSessionHarness, | ||
| } from "./adapters"; | ||
|
|
||
| export const OPENCODE_HARNESS_CAPABILITIES: HarnessCapabilitySet = { |
There was a problem hiding this comment.
🟢 Low src/opencode.ts:21
OPENCODE_HARNESS_CAPABILITIES claims support for toolLifecycle, planStream, fileArtifacts, and subagents, but parseSseEvents wraps all non-permission, non-elicitation events as native.frame without mapping the corresponding standard events like tool.call, plan.update, or file artifact events. The harness runtime will receive opaque native frames instead of structured events and fail to track tool usage, plans, or artifacts correctly.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/opencode.ts around line 21:
`OPENCODE_HARNESS_CAPABILITIES` claims support for `toolLifecycle`, `planStream`, `fileArtifacts`, and `subagents`, but `parseSseEvents` wraps all non-permission, non-elicitation events as `native.frame` without mapping the corresponding standard events like `tool.call`, `plan.update`, or file artifact events. The harness runtime will receive opaque native frames instead of structured events and fail to track tool usage, plans, or artifacts correctly.
Evidence trail:
packages/harness-ws-server/src/opencode.ts lines 20-31 (capability declaration), lines 103-214 (parseSseEvents with fallthrough to native.frame at lines 209-214); packages/contracts/src/harnessEvents.ts lines 220-259 (defined structured event types including plan.updated, item.started/updated/completed, artifact.persisted); packages/harness-ws-server/src/codex.ts (shows proper capability-to-event mapping for comparison)
| #startStreaming(sessionKey: string): void { | ||
| const liveSession = this.#liveSessions.get(sessionKey); | ||
| if (!liveSession || !liveSession.adapter.streamEvents) { | ||
| return; | ||
| } | ||
| liveSession.streamAbort?.abort(); | ||
| const controller = new AbortController(); | ||
| liveSession.streamAbort = controller; | ||
| void (async () => { | ||
| try { | ||
| for await (const event of liveSession.adapter.streamEvents!({ | ||
| session: liveSession.session, | ||
| signal: controller.signal, | ||
| })) { | ||
| this.#publish(event); | ||
| } | ||
| } catch (error) { |
There was a problem hiding this comment.
🟠 High src/server.ts:355
In startStreaming, events from adapter.streamEvents are passed directly to #publish without updating their sequence number. Since #publish calls projectHarnessEvent which overwrites the global snapshot sequence with event.sequence, adapter-emitted events (which likely start at 0 or use their own counter) cause the service's global sequence to regress or collide. This corrupts the total event ordering and can produce duplicate EventId values. Consider re-sequencing adapter events in startStreaming using #nextSequence() before publishing.
try {
for await (const event of liveSession.adapter.streamEvents!({
session: liveSession.session,
signal: controller.signal,
})) {
- this.#publish(event);
+ const resequenced = {
+ ...event,
+ sequence: this.#nextSequence(),
+ };
+ this.#publish(resequenced);
}
} catch (error) {Also found in 1 other location(s)
packages/contracts/src/harnessWs.ts:170
The
connector.publishEventrequest body schema usesHarnessEvent, which includesHarnessEventBaseand thus requires asequencefield (NonNegativeInt). Sequence numbers are typically assigned by the server to ensure total ordering of the event stream. A connector publishing an event usually does not know the correct next sequence number. Requiring this field forces the connector to send a dummy value or track server state perfectly (which is often impossible). The schema for event ingestion should likely omitsequence.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/server.ts around lines 355-371:
In `startStreaming`, events from `adapter.streamEvents` are passed directly to `#publish` without updating their `sequence` number. Since `#publish` calls `projectHarnessEvent` which overwrites the global snapshot sequence with `event.sequence`, adapter-emitted events (which likely start at 0 or use their own counter) cause the service's global sequence to regress or collide. This corrupts the total event ordering and can produce duplicate `EventId` values. Consider re-sequencing adapter events in `startStreaming` using `#nextSequence()` before publishing.
Evidence trail:
packages/harness-ws-server/src/server.ts lines 356-370 (#startStreaming passing adapter events directly to #publish), line 293-294 (#publish calling appendEvent), line 47-48 (createEventId using sequence in ID format); packages/harness-ws-server/src/storage.ts line 156-158 (appendEvent calling projectHarnessEvent); packages/harness-ws-server/src/projector.ts lines 79-87 (projectHarnessEvent setting snapshot.sequence = event.sequence)
Also found in 1 other location(s):
- packages/contracts/src/harnessWs.ts:170 -- The `connector.publishEvent` request body schema uses `HarnessEvent`, which includes `HarnessEventBase` and thus requires a `sequence` field (`NonNegativeInt`). Sequence numbers are typically assigned by the server to ensure total ordering of the event stream. A connector publishing an event usually does not know the correct next sequence number. Requiring this field forces the connector to send a dummy value or track server state perfectly (which is often impossible). The schema for event ingestion should likely omit `sequence`.
| } | ||
| } | ||
|
|
||
| function toHarnessSession(profile: HarnessProfile, session: OpenCodeSessionInfo): HarnessSession { |
There was a problem hiding this comment.
🟠 High src/opencode.ts:269
The adapter drops authentication credentials after session creation. createSession builds the Authorization header from input.profile via buildHeaders, but the returned HarnessSession does not store these credentials. Subsequent operations (sendTurn, streamEvents, resolvePermission, resolveElicitation, cancelTurn) construct requests without calling buildHeaders or including the header, so authenticated backends will return 401 for all operations after session creation. Consider persisting the headers in metadata during createSession and reusing them in downstream methods.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/opencode.ts around line 269:
The adapter drops authentication credentials after session creation. `createSession` builds the `Authorization` header from `input.profile` via `buildHeaders`, but the returned `HarnessSession` does not store these credentials. Subsequent operations (`sendTurn`, `streamEvents`, `resolvePermission`, `resolveElicitation`, `cancelTurn`) construct requests without calling `buildHeaders` or including the header, so authenticated backends will return 401 for all operations after session creation. Consider persisting the headers in `metadata` during `createSession` and reusing them in downstream methods.
Evidence trail:
packages/harness-ws-server/src/opencode.ts lines 83-92 (buildHeaders creates Authorization header), lines 269-290 (toHarnessSession only stores baseUrl in metadata), line 317 (only call to buildHeaders in createSession), lines 330-342 (sendTurn - no auth header), lines 343-351 (cancelTurn - no headers), lines 352-370 (resolvePermission - no auth header), lines 371-383 (resolveElicitation - no auth header), lines 384-404 (updateSessionConfig - no auth header), lines 410-421 (streamEvents - no headers). git_grep confirmed buildHeaders is only called at line 317.
| } | ||
| } | ||
|
|
||
| function toHarnessSession(profile: HarnessProfile, session: OpenCodeSessionInfo): HarnessSession { |
There was a problem hiding this comment.
🟡 Medium src/opencode.ts:269
toHarnessSession hardcodes adapterKey to "opencode-direct" instead of using the actual key passed to createOpenCodeHarnessAdapter. When a custom adapterKey is configured, the returned HarnessSession has the wrong key, causing a mismatch with events produced by streamEvents which use the correct key. Consider passing the adapter key as a parameter to toHarnessSession so it matches the configured value.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/opencode.ts around line 269:
`toHarnessSession` hardcodes `adapterKey` to `"opencode-direct"` instead of using the actual key passed to `createOpenCodeHarnessAdapter`. When a custom `adapterKey` is configured, the returned `HarnessSession` has the wrong key, causing a mismatch with events produced by `streamEvents` which use the correct key. Consider passing the adapter key as a parameter to `toHarnessSession` so it matches the configured value.
Evidence trail:
packages/harness-ws-server/src/opencode.ts lines 269-291 (toHarnessSession with hardcoded adapterKey: "opencode-direct" at line 276), lines 293-299 (createOpenCodeHarnessAdapter accepting options?.adapterKey), line 324 (createSession calling toHarnessSession without passing adapterKey), lines 418-421 (streamEvents calling parseSseEvents with adapterKey closure variable), lines 116-156 (parseSseEvents using adapterKey in yielded events)
| nativeSessionId: Schema.optional(TrimmedNonEmptyString), | ||
| nativeThreadId: Schema.optional(ThreadId), | ||
| nativeTurnId: Schema.optional(TurnId), |
There was a problem hiding this comment.
🟢 Low src/harness.ts:156
HarnessBinding defines nativeSessionId, nativeThreadId, and nativeTurnId using Schema.optional, which rejects explicit null values. However, HarnessSession defines nativeSessionId as Schema.NullOr(TrimmedNonEmptyString) (line 178), indicating these fields are nullable in the domain. If the backend sends null for any of these three fields, HarnessBinding validation fails. Consider using Schema.NullOr consistently to accept null values.
- nativeSessionId: Schema.optional(TrimmedNonEmptyString),
- nativeThreadId: Schema.optional(ThreadId),
- nativeTurnId: Schema.optional(TurnId),🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/contracts/src/harness.ts around lines 156-158:
`HarnessBinding` defines `nativeSessionId`, `nativeThreadId`, and `nativeTurnId` using `Schema.optional`, which rejects explicit `null` values. However, `HarnessSession` defines `nativeSessionId` as `Schema.NullOr(TrimmedNonEmptyString)` (line 178), indicating these fields are nullable in the domain. If the backend sends `null` for any of these three fields, `HarnessBinding` validation fails. Consider using `Schema.NullOr` consistently to accept `null` values.
Evidence trail:
packages/contracts/src/harness.ts lines 150-163 (`HarnessBinding` definition with `Schema.optional` for `nativeSessionId`, `nativeThreadId`, `nativeTurnId`), lines 165-184 (`HarnessSession` definition with `Schema.NullOr` for `nativeSessionId`). Effect Schema documentation confirms `Schema.optional` accepts undefined/absent but rejects explicit null, while `Schema.NullOr` accepts null values.
| case "connector.health.changed": | ||
| return { | ||
| ...nextBase, | ||
| connectors: applyConnector(nextBase.connectors, { | ||
| id: event.payload.connectorId, | ||
| profileId: null, | ||
| harness: event.harness, | ||
| adapterKey: event.adapterKey, | ||
| health: event.payload.health, | ||
| ...(event.payload.detail ? { description: event.payload.detail } : {}), | ||
| lastSeenAt: event.createdAt, | ||
| }), | ||
| }; |
There was a problem hiding this comment.
🟡 Medium src/projector.ts:296
applyConnector performs a full replacement of the connector object, so when the connector.health.changed handler (lines 296-308) calls it with a partial object containing only health and detail, fields like version and metadata are silently erased from the stored record. Consider merging the update onto the existing object instead of replacing it, or ensure the event handler constructs a complete connector object.
case "connector.health.changed":
return {
...nextBase,
connectors: applyConnector(nextBase.connectors, {
id: event.payload.connectorId,
profileId: null,
harness: event.harness,
adapterKey: event.adapterKey,
health: event.payload.health,
...(event.payload.detail ? { description: event.payload.detail } : {}),
lastSeenAt: event.createdAt,
}),
};🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/projector.ts around lines 296-308:
`applyConnector` performs a full replacement of the connector object, so when the `connector.health.changed` handler (lines 296-308) calls it with a partial object containing only `health` and `detail`, fields like `version` and `metadata` are silently erased from the stored record. Consider merging the update onto the existing object instead of replacing it, or ensure the event handler constructs a complete connector object.
Evidence trail:
packages/harness-ws-server/src/projector.ts lines 66-76 (applyConnector function does `next[index] = connector;` - full replacement), lines 296-308 (connector.health.changed handler constructs object without version/metadata), lines 280-294 (connector.connected handler includes version/metadata from payload). packages/contracts/src/harness.ts lines 211-222 (HarnessConnector type definition showing version and metadata fields).
| async shutdownSession(sessionId: HarnessSessionId): Promise<void> { | ||
| const liveSession = await this.#resolveLiveSession(sessionId); | ||
| liveSession.streamAbort?.abort(); | ||
| await liveSession.adapter.shutdownSession({ session: liveSession.session }); | ||
| this.#liveSessions.delete(asSessionKey(sessionId)); | ||
| } |
There was a problem hiding this comment.
🟡 Medium src/server.ts:282
shutdownSession aborts the event stream before calling adapter.shutdownSession(), causing final events like session.exited to be dropped. The store may retain stale session state (e.g., "running" instead of "stopped"). Abort the stream only after the adapter shutdown completes.
async shutdownSession(sessionId: HarnessSessionId): Promise<void> {
const liveSession = await this.#resolveLiveSession(sessionId);
- liveSession.streamAbort?.abort();
await liveSession.adapter.shutdownSession({ session: liveSession.session });
+ liveSession.streamAbort?.abort();
this.#liveSessions.delete(asSessionKey(sessionId));
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/server.ts around lines 282-287:
`shutdownSession` aborts the event stream before calling `adapter.shutdownSession()`, causing final events like `session.exited` to be dropped. The store may retain stale session state (e.g., "running" instead of "stopped"). Abort the stream only after the adapter shutdown completes.
Evidence trail:
packages/harness-ws-server/src/server.ts lines 282-287: shutdownSession method showing abort() called before adapter.shutdownSession()
packages/harness-ws-server/src/server.ts lines 355-392: #startStreaming showing how events are consumed and how abort signal causes early return
packages/harness-ws-server/src/projector.ts lines 179-184: session.exited event handler that updates session state to "stopped"
packages/harness-ws-server/src/adapters.ts lines 65-81: HarnessAdapter interface showing streamEvents and shutdownSession methods
| subagents: true, | ||
| }; | ||
|
|
||
| export type ClaudeAgentSdkEvent = Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode">; |
There was a problem hiding this comment.
🟢 Low src/claude.ts:38
ClaudeAgentSdkEvent uses Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode">, but HarnessEvent is likely a discriminated union. TypeScript's Omit is not distributive, so keyof HarnessEvent only returns keys shared by all union members, stripping away properties unique to individual event types. This produces a broken type that allows SDKs to return incomplete events without compile-time errors, which are then cast as HarnessEvent and passed downstream. Consider using a distributive omit pattern: type ClaudeAgentSdkEvent = HarnessEvent extends any ? Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode"> : never.
-export type ClaudeAgentSdkEvent = Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode">;
+export type ClaudeAgentSdkEvent = HarnessEvent extends any ? Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode"> : never;🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/claude.ts around line 38:
`ClaudeAgentSdkEvent` uses `Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode">`, but `HarnessEvent` is likely a discriminated union. TypeScript's `Omit` is not distributive, so `keyof HarnessEvent` only returns keys shared by *all* union members, stripping away properties unique to individual event types. This produces a broken type that allows SDKs to return incomplete events without compile-time errors, which are then cast `as HarnessEvent` and passed downstream. Consider using a distributive omit pattern: `type ClaudeAgentSdkEvent = HarnessEvent extends any ? Omit<HarnessEvent, "harness" | "adapterKey" | "connectionMode"> : never`.
Evidence trail:
packages/harness-ws-server/src/claude.ts line 38 (`ClaudeAgentSdkEvent` definition with non-distributive `Omit`), lines 123-131 (`as HarnessEvent` cast in `streamEvents`). packages/contracts/src/harnessEvents.ts lines 223-258 (`HarnessEvent` is `Schema.Union([...])` with 30+ event types, each with unique `type` literal and `payload` schema). TypeScript standard `Omit` definition: `type Omit<T, K> = Pick<T, Exclude<keyof T, K>>` is non-distributive over unions.
| async attachSession(profileId: HarnessProfile["id"]): Promise<HarnessSession> { | ||
| const snapshot = this.#store.getSnapshot(); | ||
| const profile = findProfile(snapshot, profileId); | ||
| const adapter = this.#resolveAdapter(profile.harness); | ||
| const session = buildAttachedSession(profile, adapter, this.#now()); | ||
| this.#registerLiveSession(profile, adapter, session); | ||
| this.#publish(buildSessionCreatedEvent(session, this.#nextSequence(), session.createdAt)); | ||
| return this.#requireSession(session.id); |
There was a problem hiding this comment.
🟠 High src/server.ts:215
attachSession registers the session as live but never calls #startStreaming, so adapter.streamEvents is never invoked and the session receives no events (logs, state changes, telemetry) from the underlying adapter. Consider calling #startStreaming after registering the live session to enable event streaming.
this.#registerLiveSession(profile, adapter, session);
this.#publish(buildSessionCreatedEvent(session, this.#nextSequence(), session.createdAt));
+ this.#startStreaming(asSessionKey(session.id));
return this.#requireSession(session.id);🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/harness-ws-server/src/server.ts around lines 215-222:
`attachSession` registers the session as live but never calls `#startStreaming`, so `adapter.streamEvents` is never invoked and the session receives no events (logs, state changes, telemetry) from the underlying adapter. Consider calling `#startStreaming` after registering the live session to enable event streaming.
Evidence trail:
packages/harness-ws-server/src/server.ts lines 215-222 (attachSession method), line 211 (createSession calls #startStreaming), line 229 (resumeSession calls #startStreaming), lines 355-392 (#startStreaming implementation showing it iterates adapter.streamEvents and publishes events)
juliusmarminge
left a comment
There was a problem hiding this comment.
Wha tis different here than the current Provider* lol?
|
Hey @t3dotgg and everyone, I love the direction of this PR. I ended up building the exact same To be clear upfront: this is definitely not a merge request or an actual pitch to rewrite anything in T3Code. The Node approach in this PR clearly wins for shipping simplicity. Node should continue managing the intelligence layer, TS contracts, the Claude Agent SDK integration, and Julius's excellent Cursor ACP work (PR #1355). I'm sharing this purely for feedback and architectural discussion. Observations in the schemaIn
Since our abstractions mapped 1:1, I moved the supervision boundary into Elixir/OTP to test what happens when each concern becomes an isolated process:
Three schema elements already OTP primitives
The crossover pointI ran stress tests comparing both setups. Node handles the load well up to about 150 concurrent sessions. Beyond that — or when managing unpredictable subagent trees — Node's event loop saturates. OTP natively handles concurrent crash recoveries with nearly zero lag and keeps memory usage flat. The key difference: in the current Node architecture, if This aligns with what the schema was designed for — just in Node, the tables are part of a future plan. In OTP, they close the recovery loop. I documented the full architectural mapping and benchmarking here: https://ranvier-technologies.github.io/t3code-OTP/pitch Would love your thoughts on this crossover point and how you're considering structural failure isolation as T3Code scales into the subagent workflows from PR #1199. @juliusmarminge Your question "what's different here than the current Provider*" reminded me of what led me to this experiment. If the HarnessService boundary just reorganizes what ProviderService already does, it's hard to justify. But it becomes essential when managing subagent orchestration like in PR #1199. Your WorkUnitId model already creates execution trees inside a turn, with primary agents spawning delegated agents. The question is how to handle failures when delegated agents fail unpredictably at scale. Below about 150 sessions, the current Node patterns work perfectly. But managing large forests of subagents requires isolating each into its own process. That way, a delegated agent’s crash doesn't poison the parent turn — it just dies, and the The HarnessService matters not because it's fundamentally different from ProviderService today, but because it creates a clear boundary for structural crash isolation — whether through OTP, worker threads, or something else. As an example, I built a skill that leverages T3Code itself as a tool across different harness providers: a Bun script that spawns new threads in T3Code, giving your root agent subagents across Codex, Claude Code, OpenCode, and Cursor simultaneously. When you scale that pattern — orchestrating multiple subagents across different CLIs — it stops being an application concern and becomes a supervision problem. PS: For context, I've spent the last few months building a side project local-first AI coding agent in Elixir/OTP with a similar architecture: agents as GenServers under a DynamicSupervisor, sessions with crash isolation, and React as a thin renderer that just surfaces the harness via CLI, REPL, web UI, or desktop app. That's where this intuition comes from. I don't consider myself an expert in either Elixir or Node — I appreciate any roasts and corrections. |
Trying to make a "standard" here. Still super early wip, literally 3 prompts in lol
Note
Centralize harness adapters behind a single
HarnessServiceWebSocket server and project all events into an in‑memoryHarnessSnapshotAdd a harness WebSocket server with a
HarnessServicethat emits and projects session, turn, permission, elicitation, connector, and transport events into an immutable snapshot; introduce cross-harness adapters forclaude-agent-sdk,codex-app-server, andopencode; define runtime schemas for harness entities/events and WS contracts; and add a memory store and projector utilities. Core entry points are in server.ts and projector.ts.📍Where to Start
Start with
HarnessServiceconstruction and event flow in server.ts, then review event projection logic in projector.ts.📊 Macroscope summarized e29fcbd. 16 files reviewed, 31 issues evaluated, 4 issues filtered, 9 comments posted
🗂️ Filtered Issues
apps/server/src/provider/Layers/ProviderService.ts — 0 comments posted, 2 evaluated, 2 filtered
sendTurnfunction performs adirectory.upsertthat completely overwrites the persisted session state with a newruntimePayloadobject containing onlyactiveTurnIdand timestamps. This overwrites theproviderOptions(andcwd) that were carefully persisted instartSession. Consequently, as soon as the first turn is processed, the provider options are lost. If the server restarts and attempts to resume the session, it will reload without the necessary options, defeating the purpose of the fix instartSession. [ Out of scope ]runStopAllfinalizer iterates all threads and callsdirectory.upsertwith a freshly constructed object to mark them as "stopped". This new object omits the existingresumeCursor,runtimeMode, and replacesruntimePayloadwith a minimal status object. This action unintentionally wipes out all recovery data (resume tokens, CWD, provider options) for every session on server shutdown. This guarantees that no sessions can be successfully recovered viarecoverSessionForThreadafter a server restart. [ Out of scope ]packages/contracts/src/harness.ts — 1 comment posted, 6 evaluated, 1 filtered
Schema.Literalsdoes not exist in theeffectlibrary; the correct export isSchema.Literal. Furthermore,Schema.Literalaccepts variable arguments (e.g.,Schema.Literal("a", "b")), not a single array argument. Attempting to callSchema.Literalswill result in a runtimeTypeError(undefined is not a function) when the module loads. [ Cross-file consolidated ]packages/contracts/src/harnessWs.ts — 0 comments posted, 3 evaluated, 1 filtered
connector.publishEventrequest body schema usesHarnessEvent, which includesHarnessEventBaseand thus requires asequencefield (NonNegativeInt). Sequence numbers are typically assigned by the server to ensure total ordering of the event stream. A connector publishing an event usually does not know the correct next sequence number. Requiring this field forces the connector to send a dummy value or track server state perfectly (which is often impossible). The schema for event ingestion should likely omitsequence. [ Cross-file consolidated ]