Skip to content

Commit 2c1f460

Browse files
committed
perf: optimize streaming message projection path
Cherry-picked from upstream pingdotgg#1688
1 parent 454df2f commit 2c1f460

4 files changed

Lines changed: 186 additions & 22 deletions

File tree

apps/server/src/orchestration/Layers/ProjectionPipeline.ts

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -758,37 +758,48 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
758758
)(function* (event, attachmentSideEffects) {
759759
switch (event.type) {
760760
case "thread.message-sent": {
761-
const existingMessage = yield* projectionThreadMessageRepository.getByMessageId({
762-
messageId: event.payload.messageId,
763-
});
764-
const previousMessage = Option.getOrUndefined(existingMessage);
765-
const nextText = Option.match(existingMessage, {
766-
onNone: () => event.payload.text,
767-
onSome: (message) => {
768-
if (event.payload.streaming) {
769-
return `${message.text}${event.payload.text}`;
770-
}
771-
if (event.payload.text.length === 0) {
772-
return message.text;
773-
}
774-
return event.payload.text;
775-
},
776-
});
777761
const nextAttachments =
778762
event.payload.attachments !== undefined
779763
? yield* materializeAttachmentsForProjection({
780764
attachments: event.payload.attachments,
781765
})
782-
: previousMessage?.attachments;
766+
: undefined;
767+
if (event.payload.streaming) {
768+
yield* projectionThreadMessageRepository.appendTextDelta({
769+
messageId: event.payload.messageId,
770+
threadId: event.payload.threadId,
771+
turnId: event.payload.turnId,
772+
role: event.payload.role,
773+
delta: event.payload.text,
774+
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
775+
isStreaming: true,
776+
createdAt: event.payload.createdAt,
777+
updatedAt: event.payload.updatedAt,
778+
});
779+
return;
780+
}
781+
782+
const existingMessage = yield* projectionThreadMessageRepository
783+
.getByMessageId({
784+
messageId: event.payload.messageId,
785+
})
786+
.pipe(Effect.map(Option.getOrUndefined));
787+
const nextText =
788+
existingMessage !== undefined && event.payload.text.length === 0
789+
? existingMessage.text
790+
: event.payload.text;
791+
const persistedAttachments = nextAttachments ?? existingMessage?.attachments;
783792
yield* projectionThreadMessageRepository.upsert({
784793
messageId: event.payload.messageId,
785794
threadId: event.payload.threadId,
786795
turnId: event.payload.turnId,
787796
role: event.payload.role,
788797
text: nextText,
789-
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
798+
...(persistedAttachments !== undefined
799+
? { attachments: [...persistedAttachments] }
800+
: {}),
790801
isStreaming: event.payload.streaming,
791-
createdAt: previousMessage?.createdAt ?? event.payload.createdAt,
802+
createdAt: existingMessage?.createdAt ?? event.payload.createdAt,
792803
updatedAt: event.payload.updatedAt,
793804
});
794805
return;

apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { MessageId, ThreadId } from "@t3tools/contracts";
22
import { assert, it } from "@effect/vitest";
3-
import { Effect, Layer } from "effect";
3+
import { Effect, Layer, Option } from "effect";
44

55
import { ProjectionThreadMessageRepository } from "../Services/ProjectionThreadMessages.ts";
66
import { ProjectionThreadMessageRepositoryLive } from "./ProjectionThreadMessages.ts";
@@ -110,4 +110,84 @@ layer("ProjectionThreadMessageRepository", (it) => {
110110
assert.deepEqual(rows[0]?.attachments, []);
111111
}),
112112
);
113+
114+
it.effect("looks up a projected message by id", () =>
115+
Effect.gen(function* () {
116+
const repository = yield* ProjectionThreadMessageRepository;
117+
const threadId = ThreadId.makeUnsafe("thread-get-by-message-id");
118+
const messageId = MessageId.makeUnsafe("message-get-by-message-id");
119+
const createdAt = "2026-02-28T19:20:00.000Z";
120+
const updatedAt = "2026-02-28T19:20:01.000Z";
121+
122+
yield* repository.upsert({
123+
messageId,
124+
threadId,
125+
turnId: null,
126+
role: "assistant",
127+
text: "lookup me",
128+
isStreaming: false,
129+
createdAt,
130+
updatedAt,
131+
});
132+
133+
const maybeRow = yield* repository.getByMessageId({ messageId });
134+
assert.isTrue(Option.isSome(maybeRow));
135+
const row = Option.getOrThrow(maybeRow);
136+
assert.equal(row.messageId, messageId);
137+
assert.equal(row.threadId, threadId);
138+
assert.equal(row.text, "lookup me");
139+
assert.equal(row.createdAt, createdAt);
140+
assert.equal(row.updatedAt, updatedAt);
141+
}),
142+
);
143+
144+
it.effect("appends streaming deltas without losing createdAt or attachments", () =>
145+
Effect.gen(function* () {
146+
const repository = yield* ProjectionThreadMessageRepository;
147+
const threadId = ThreadId.makeUnsafe("thread-append-delta");
148+
const messageId = MessageId.makeUnsafe("message-append-delta");
149+
const createdAt = "2026-02-28T19:30:00.000Z";
150+
const persistedAttachments = [
151+
{
152+
type: "image" as const,
153+
id: "thread-append-delta-att-1",
154+
name: "example.png",
155+
mimeType: "image/png",
156+
sizeBytes: 5,
157+
},
158+
];
159+
160+
yield* repository.appendTextDelta({
161+
messageId,
162+
threadId,
163+
turnId: null,
164+
role: "assistant",
165+
delta: "Hello",
166+
attachments: persistedAttachments,
167+
isStreaming: true,
168+
createdAt,
169+
updatedAt: "2026-02-28T19:30:01.000Z",
170+
});
171+
172+
yield* repository.appendTextDelta({
173+
messageId,
174+
threadId,
175+
turnId: null,
176+
role: "assistant",
177+
delta: " world",
178+
isStreaming: true,
179+
createdAt: "2026-02-28T19:30:05.000Z",
180+
updatedAt: "2026-02-28T19:30:06.000Z",
181+
});
182+
183+
const maybeRow = yield* repository.getByMessageId({ messageId });
184+
assert.isTrue(Option.isSome(maybeRow));
185+
const row = Option.getOrThrow(maybeRow);
186+
assert.equal(row.text, "Hello world");
187+
assert.equal(row.createdAt, createdAt);
188+
assert.equal(row.updatedAt, "2026-02-28T19:30:06.000Z");
189+
assert.deepEqual(row.attachments, persistedAttachments);
190+
assert.isTrue(row.isStreaming);
191+
}),
192+
);
113193
});

apps/server/src/persistence/Layers/ProjectionThreadMessages.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { ChatAttachment } from "@t3tools/contracts";
55

66
import { toPersistenceSqlError } from "../Errors.ts";
77
import {
8+
AppendProjectionThreadMessageDeltaInput,
89
GetProjectionThreadMessageInput,
910
ProjectionThreadMessageRepository,
1011
type ProjectionThreadMessageRepositoryShape,
@@ -133,6 +134,50 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
133134
`,
134135
});
135136

137+
const appendProjectionThreadMessageDeltaRow = SqlSchema.void({
138+
Request: AppendProjectionThreadMessageDeltaInput,
139+
execute: (row) => {
140+
const nextAttachmentsJson =
141+
row.attachments !== undefined ? JSON.stringify(row.attachments) : null;
142+
return sql`
143+
INSERT INTO projection_thread_messages (
144+
message_id,
145+
thread_id,
146+
turn_id,
147+
role,
148+
text,
149+
attachments_json,
150+
is_streaming,
151+
created_at,
152+
updated_at
153+
)
154+
VALUES (
155+
${row.messageId},
156+
${row.threadId},
157+
${row.turnId},
158+
${row.role},
159+
${row.delta},
160+
${nextAttachmentsJson},
161+
${row.isStreaming ? 1 : 0},
162+
${row.createdAt},
163+
${row.updatedAt}
164+
)
165+
ON CONFLICT (message_id)
166+
DO UPDATE SET
167+
thread_id = excluded.thread_id,
168+
turn_id = COALESCE(excluded.turn_id, projection_thread_messages.turn_id),
169+
role = excluded.role,
170+
text = projection_thread_messages.text || excluded.text,
171+
attachments_json = COALESCE(
172+
excluded.attachments_json,
173+
projection_thread_messages.attachments_json
174+
),
175+
is_streaming = excluded.is_streaming,
176+
updated_at = excluded.updated_at
177+
`;
178+
},
179+
});
180+
136181
const deleteProjectionThreadMessageRows = SqlSchema.void({
137182
Request: DeleteProjectionThreadMessagesInput,
138183
execute: ({ threadId }) =>
@@ -163,6 +208,13 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
163208
Effect.map((rows) => rows.map(toProjectionThreadMessage)),
164209
);
165210

211+
const appendTextDelta: ProjectionThreadMessageRepositoryShape["appendTextDelta"] = (input) =>
212+
appendProjectionThreadMessageDeltaRow(input).pipe(
213+
Effect.mapError(
214+
toPersistenceSqlError("ProjectionThreadMessageRepository.appendTextDelta:query"),
215+
),
216+
);
217+
166218
const deleteByThreadId: ProjectionThreadMessageRepositoryShape["deleteByThreadId"] = (input) =>
167219
deleteProjectionThreadMessageRows(input).pipe(
168220
Effect.mapError(
@@ -174,6 +226,7 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
174226
upsert,
175227
getByMessageId,
176228
listByThreadId,
229+
appendTextDelta,
177230
deleteByThreadId,
178231
} satisfies ProjectionThreadMessageRepositoryShape;
179232
});

apps/server/src/persistence/Services/ProjectionThreadMessages.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ import {
1414
TurnId,
1515
IsoDateTime,
1616
} from "@t3tools/contracts";
17-
import { Schema, Context } from "effect";
18-
import type { Option } from "effect";
17+
import { Option, Schema, Context } from "effect";
1918
import type { Effect } from "effect";
2019

2120
import type { ProjectionRepositoryError } from "../Errors.ts";
@@ -48,6 +47,20 @@ export const DeleteProjectionThreadMessagesInput = Schema.Struct({
4847
});
4948
export type DeleteProjectionThreadMessagesInput = typeof DeleteProjectionThreadMessagesInput.Type;
5049

50+
export const AppendProjectionThreadMessageDeltaInput = Schema.Struct({
51+
messageId: MessageId,
52+
threadId: ThreadId,
53+
turnId: Schema.NullOr(TurnId),
54+
role: OrchestrationMessageRole,
55+
delta: Schema.String,
56+
attachments: Schema.optional(Schema.Array(ChatAttachment)),
57+
isStreaming: Schema.Boolean,
58+
createdAt: IsoDateTime,
59+
updatedAt: IsoDateTime,
60+
});
61+
export type AppendProjectionThreadMessageDeltaInput =
62+
typeof AppendProjectionThreadMessageDeltaInput.Type;
63+
5164
/**
5265
* ProjectionThreadMessageRepositoryShape - Service API for projected thread messages.
5366
*/
@@ -77,6 +90,13 @@ export interface ProjectionThreadMessageRepositoryShape {
7790
input: ListProjectionThreadMessagesInput,
7891
) => Effect.Effect<ReadonlyArray<ProjectionThreadMessage>, ProjectionRepositoryError>;
7992

93+
/**
94+
* Append a streaming text delta to an existing projected message row, or insert it.
95+
*/
96+
readonly appendTextDelta: (
97+
input: AppendProjectionThreadMessageDeltaInput,
98+
) => Effect.Effect<void, ProjectionRepositoryError>;
99+
80100
/**
81101
* Delete projected thread messages by thread.
82102
*/

0 commit comments

Comments
 (0)