Skip to content

Commit 10bf9a6

Browse files
committed
Refine websocket runtime event test coverage
- Inject a mock provider runtime event feed into the test server - Verify orchestration pushes for both streaming and completed assistant messages - Adjust provider health test timing to match repeated status probes
1 parent 76ad1bf commit 10bf9a6

File tree

1 file changed

+98
-26
lines changed

1 file changed

+98
-26
lines changed

apps/server/src/wsServer.test.ts

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ import { SqlClient, SqlError } from "effect/unstable/sql";
4949
import { ProviderService, type ProviderServiceShape } from "./provider/Services/ProviderService";
5050
import { ProviderHealth, type ProviderHealthShape } from "./provider/Services/ProviderHealth";
5151
import { ProviderRuntimeEventFeedLive } from "./provider/Layers/ProviderRuntimeEventFeed";
52+
import {
53+
ProviderRuntimeEventFeed,
54+
type ProviderRuntimeEventFeedShape,
55+
} from "./provider/Services/ProviderRuntimeEventFeed";
5256
import { Open, type OpenShape } from "./open";
5357
import { GitManager, type GitManagerShape } from "./git/Services/GitManager.ts";
5458
import type { GitCoreShape } from "./git/Services/GitCore.ts";
@@ -508,6 +512,7 @@ describe("WebSocket Server", () => {
508512
baseDir?: string;
509513
staticDir?: string;
510514
providerLayer?: Layer.Layer<ProviderService, never>;
515+
providerRuntimeEventFeed?: ProviderRuntimeEventFeedShape;
511516
providerHealth?: ProviderHealthShape;
512517
open?: OpenShape;
513518
gitManager?: GitManagerShape;
@@ -528,11 +533,9 @@ describe("WebSocket Server", () => {
528533
const scope = await Effect.runPromise(Scope.make("sequential"));
529534
const persistenceLayer = options.persistenceLayer ?? SqlitePersistenceMemory;
530535
const providerLayer = options.providerLayer ?? makeServerProviderLayer();
531-
const providerRuntimeEventFeedLayer = ProviderRuntimeEventFeedLive;
532-
const providerHealthLayer = Layer.succeed(
533-
ProviderHealth,
534-
options.providerHealth ?? defaultProviderHealthService,
535-
);
536+
const providerRuntimeEventFeedLayer = options.providerRuntimeEventFeed
537+
? Layer.succeed(ProviderRuntimeEventFeed, options.providerRuntimeEventFeed)
538+
: ProviderRuntimeEventFeedLive;
536539
const openLayer = Layer.succeed(Open, options.open ?? defaultOpenService);
537540
const serverConfigLayer = Layer.succeed(ServerConfig, {
538541
mode: "web",
@@ -556,6 +559,7 @@ describe("WebSocket Server", () => {
556559
? Layer.succeed(ProjectionSnapshotQuery, options.projectionSnapshotQuery)
557560
: OrchestrationProjectionSnapshotQueryLive;
558561
const runtimeOverrides = Layer.mergeAll(
562+
Layer.succeed(ProviderHealth, options.providerHealth ?? defaultProviderHealthService),
559563
options.gitManager ? Layer.succeed(GitManager, options.gitManager) : Layer.empty,
560564
options.gitCore
561565
? Layer.succeed(GitCore, options.gitCore as unknown as GitCoreShape)
@@ -575,7 +579,6 @@ describe("WebSocket Server", () => {
575579
);
576580
const dependenciesLayer = Layer.empty.pipe(
577581
Layer.provideMerge(runtimeLayer),
578-
Layer.provideMerge(providerHealthLayer),
579582
Layer.provideMerge(providerRuntimeEventFeedLayer),
580583
Layer.provideMerge(openLayer),
581584
Layer.provideMerge(serverConfigLayer),
@@ -908,9 +911,9 @@ describe("WebSocket Server", () => {
908911
return [
909912
{
910913
provider: "codex",
911-
status: callCount === 1 ? "ready" : "error",
912-
available: callCount === 1,
913-
authStatus: callCount === 1 ? "authenticated" : "unauthenticated",
914+
status: callCount <= 2 ? "ready" : "error",
915+
available: callCount <= 2,
916+
authStatus: callCount <= 2 ? "authenticated" : "unauthenticated",
914917
checkedAt: `2026-01-01T00:00:0${callCount}.000Z`,
915918
},
916919
] satisfies ReadonlyArray<ServerProviderStatus>;
@@ -935,7 +938,7 @@ describe("WebSocket Server", () => {
935938
status: "ready",
936939
available: true,
937940
authStatus: "authenticated",
938-
checkedAt: "2026-01-01T00:00:01.000Z",
941+
checkedAt: "2026-01-01T00:00:02.000Z",
939942
},
940943
],
941944
}),
@@ -948,7 +951,7 @@ describe("WebSocket Server", () => {
948951
status: "error",
949952
available: false,
950953
authStatus: "unauthenticated",
951-
checkedAt: "2026-01-01T00:00:02.000Z",
954+
checkedAt: "2026-01-01T00:00:03.000Z",
952955
},
953956
],
954957
}),
@@ -1135,12 +1138,15 @@ describe("WebSocket Server", () => {
11351138
},
11361139
];
11371140
const providerHealth: ProviderHealthShape = {
1138-
getStatuses: Effect.sync(() => {
1141+
getStatuses: Effect.suspend(() => {
11391142
callCount += 1;
1140-
if (callCount === 1) {
1141-
return liveStatuses;
1143+
if (callCount <= 2) {
1144+
return Effect.succeed(liveStatuses);
11421145
}
1143-
throw new Error("provider health probe failed");
1146+
return Effect.fail(new Error("provider health probe failed")) as unknown as Effect.Effect<
1147+
ReadonlyArray<ServerProviderStatus>,
1148+
never
1149+
>;
11441150
}),
11451151
};
11461152

@@ -1458,9 +1464,13 @@ describe("WebSocket Server", () => {
14581464

14591465
it("keeps orchestration domain push behavior for provider runtime events", async () => {
14601466
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
1467+
const providerRuntimeEventFeed: ProviderRuntimeEventFeedShape = {
1468+
publish: (event) => PubSub.publish(runtimeEventPubSub, event).pipe(Effect.asVoid),
1469+
subscribeWithReplay: () => Stream.fromPubSub(runtimeEventPubSub),
1470+
};
14611471
const { cwd } = makeWorkspaceFixture("test");
14621472
const emitRuntimeEvent = (event: ProviderRuntimeEvent) => {
1463-
Effect.runSync(PubSub.publish(runtimeEventPubSub, event));
1473+
Effect.runSync(providerRuntimeEventFeed.publish(event));
14641474
};
14651475
const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never;
14661476
const providerService: ProviderServiceShape = {
@@ -1485,13 +1495,14 @@ describe("WebSocket Server", () => {
14851495
listSessions: () => Effect.succeed([]),
14861496
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
14871497
rollbackConversation: () => unsupported(),
1488-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
1498+
streamEvents: Stream.empty,
14891499
};
14901500
const providerLayer = Layer.succeed(ProviderService, providerService);
14911501

14921502
server = await createTestServer({
14931503
cwd,
14941504
providerLayer,
1505+
providerRuntimeEventFeed: providerRuntimeEventFeed,
14951506
});
14961507
const addr = server.address();
14971508
const port = typeof addr === "object" && addr !== null ? addr.port : 0;
@@ -1548,34 +1559,95 @@ describe("WebSocket Server", () => {
15481559
return event.type === "thread.session-set";
15491560
});
15501561

1562+
emitRuntimeEvent({
1563+
type: "turn.started",
1564+
eventId: asEventId("evt-ws-runtime-turn-started"),
1565+
provider: "codex",
1566+
threadId: asThreadId("thread-1"),
1567+
createdAt: new Date().toISOString(),
1568+
turnId: asTurnId("provider-turn-1"),
1569+
} as unknown as ProviderRuntimeEvent);
1570+
1571+
await waitForPush(ws, ORCHESTRATION_WS_CHANNELS.domainEvent, (push) => {
1572+
const event = push.data as {
1573+
type?: string;
1574+
payload?: { session?: { activeTurnId?: string } };
1575+
};
1576+
return (
1577+
event.type === "thread.session-set" &&
1578+
event.payload?.session?.activeTurnId === "provider-turn-1"
1579+
);
1580+
});
1581+
15511582
emitRuntimeEvent({
15521583
type: "content.delta",
15531584
eventId: asEventId("evt-ws-runtime-message-delta"),
15541585
provider: "codex",
15551586
threadId: asThreadId("thread-1"),
15561587
createdAt: new Date().toISOString(),
1557-
turnId: asTurnId("turn-1"),
1588+
turnId: asTurnId("provider-turn-1"),
15581589
itemId: asProviderItemId("item-1"),
15591590
payload: {
15601591
streamKind: "assistant_text",
15611592
delta: "hello from runtime",
15621593
},
15631594
} as unknown as ProviderRuntimeEvent);
15641595

1565-
const domainPush = await waitForPush(ws, ORCHESTRATION_WS_CHANNELS.domainEvent, (push) => {
1566-
const event = push.data as { type?: string; payload?: { messageId?: string; text?: string } };
1596+
const streamedPush = await waitForPush(ws, ORCHESTRATION_WS_CHANNELS.domainEvent, (push) => {
1597+
const event = push.data as {
1598+
type?: string;
1599+
payload?: { messageId?: string; text?: string; streaming?: boolean };
1600+
};
1601+
return (
1602+
event.type === "thread.message-sent" &&
1603+
event.payload?.messageId === "assistant:item-1" &&
1604+
event.payload?.streaming === true
1605+
);
1606+
});
1607+
1608+
const streamedEvent = streamedPush.data as {
1609+
type: string;
1610+
payload: { messageId: string; text: string; streaming: boolean };
1611+
};
1612+
expect(streamedEvent.type).toBe("thread.message-sent");
1613+
expect(streamedEvent.payload.messageId).toBe("assistant:item-1");
1614+
expect(streamedEvent.payload.text).toBe("hello from runtime");
1615+
expect(streamedEvent.payload.streaming).toBe(true);
1616+
1617+
emitRuntimeEvent({
1618+
type: "item.completed",
1619+
eventId: asEventId("evt-ws-runtime-message-complete"),
1620+
provider: "codex",
1621+
threadId: asThreadId("thread-1"),
1622+
createdAt: new Date().toISOString(),
1623+
turnId: asTurnId("provider-turn-1"),
1624+
itemId: asProviderItemId("item-1"),
1625+
payload: {
1626+
itemType: "assistant_message",
1627+
status: "completed",
1628+
},
1629+
} as unknown as ProviderRuntimeEvent);
1630+
1631+
const completedPush = await waitForPush(ws, ORCHESTRATION_WS_CHANNELS.domainEvent, (push) => {
1632+
const event = push.data as {
1633+
type?: string;
1634+
payload?: { messageId?: string; streaming?: boolean };
1635+
};
15671636
return (
1568-
event.type === "thread.message-sent" && event.payload?.messageId === "assistant:item-1"
1637+
event.type === "thread.message-sent" &&
1638+
event.payload?.messageId === "assistant:item-1" &&
1639+
event.payload?.streaming === false
15691640
);
15701641
});
15711642

1572-
const domainEvent = domainPush.data as {
1643+
const completedEvent = completedPush.data as {
15731644
type: string;
1574-
payload: { messageId: string; text: string };
1645+
payload: { messageId: string; text: string; streaming: boolean };
15751646
};
1576-
expect(domainEvent.type).toBe("thread.message-sent");
1577-
expect(domainEvent.payload.messageId).toBe("assistant:item-1");
1578-
expect(domainEvent.payload.text).toBe("hello from runtime");
1647+
expect(completedEvent.type).toBe("thread.message-sent");
1648+
expect(completedEvent.payload.messageId).toBe("assistant:item-1");
1649+
expect(completedEvent.payload.text).toBe("");
1650+
expect(completedEvent.payload.streaming).toBe(false);
15791651
});
15801652

15811653
it("routes terminal RPC methods and broadcasts terminal events", async () => {

0 commit comments

Comments
 (0)