Skip to content

Commit 55ceb9b

Browse files
authored
Add pending user input projections and thread queries (#404)
- project pending user inputs into persistence - add overview and thread detail projection queries - include pending user input counts in orchestration snapshot
1 parent a8d9c8e commit 55ceb9b

11 files changed

Lines changed: 1223 additions & 3 deletions

File tree

apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts

Lines changed: 428 additions & 0 deletions
Large diffs are not rendered by default.

apps/server/src/orchestration/Layers/ProjectionPipeline.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import * as SqlClient from "effect/unstable/sql/SqlClient";
1111
import { toPersistenceSqlError, type ProjectionRepositoryError } from "../../persistence/Errors.ts";
1212
import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts";
1313
import { ProjectionPendingApprovalRepository } from "../../persistence/Services/ProjectionPendingApprovals.ts";
14+
import { ProjectionPendingUserInputRepository } from "../../persistence/Services/ProjectionPendingUserInputs.ts";
1415
import { ProjectionProjectRepository } from "../../persistence/Services/ProjectionProjects.ts";
1516
import { ProjectionStateRepository } from "../../persistence/Services/ProjectionState.ts";
1617
import { ProjectionThreadActivityRepository } from "../../persistence/Services/ProjectionThreadActivities.ts";
@@ -30,6 +31,7 @@ import {
3031
} from "../../persistence/Services/ProjectionTurns.ts";
3132
import { ProjectionThreadRepository } from "../../persistence/Services/ProjectionThreads.ts";
3233
import { ProjectionPendingApprovalRepositoryLive } from "../../persistence/Layers/ProjectionPendingApprovals.ts";
34+
import { ProjectionPendingUserInputRepositoryLive } from "../../persistence/Layers/ProjectionPendingUserInputs.ts";
3335
import { ProjectionProjectRepositoryLive } from "../../persistence/Layers/ProjectionProjects.ts";
3436
import { ProjectionStateRepositoryLive } from "../../persistence/Layers/ProjectionState.ts";
3537
import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers/ProjectionThreadActivities.ts";
@@ -60,6 +62,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = {
6062
threadTurns: "projection.thread-turns",
6163
checkpoints: "projection.checkpoints",
6264
pendingApprovals: "projection.pending-approvals",
65+
pendingUserInputs: "projection.pending-user-inputs",
6366
} as const;
6467

6568
type ProjectorName =
@@ -347,6 +350,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
347350
const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository;
348351
const projectionTurnRepository = yield* ProjectionTurnRepository;
349352
const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository;
353+
const projectionPendingUserInputRepository = yield* ProjectionPendingUserInputRepository;
350354

351355
const fileSystem = yield* FileSystem.FileSystem;
352356
const path = yield* Path.Path;
@@ -371,6 +375,18 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
371375
}),
372376
{ concurrency: 1 },
373377
).pipe(Effect.asVoid);
378+
379+
const pendingUserInputs = yield* projectionPendingUserInputRepository.listByThreadId({
380+
threadId,
381+
});
382+
yield* Effect.forEach(
383+
pendingUserInputs,
384+
(userInput) =>
385+
projectionPendingUserInputRepository.deleteByRequestId({
386+
requestId: userInput.requestId,
387+
}),
388+
{ concurrency: 1 },
389+
).pipe(Effect.asVoid);
374390
});
375391

376392
const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) =>
@@ -1145,6 +1161,83 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
11451161
}
11461162
});
11471163

1164+
const applyPendingUserInputsProjection: ProjectorDefinition["apply"] = (
1165+
event,
1166+
_attachmentSideEffects,
1167+
) =>
1168+
Effect.gen(function* () {
1169+
switch (event.type) {
1170+
case "thread.activity-appended": {
1171+
const requestId =
1172+
extractActivityRequestId(event.payload.activity.payload) ??
1173+
event.metadata.requestId ??
1174+
null;
1175+
if (requestId === null) {
1176+
return;
1177+
}
1178+
const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({
1179+
requestId,
1180+
});
1181+
if (event.payload.activity.kind === "user-input.resolved") {
1182+
yield* projectionPendingUserInputRepository.upsert({
1183+
requestId,
1184+
threadId: Option.isSome(existingRow)
1185+
? existingRow.value.threadId
1186+
: event.payload.threadId,
1187+
turnId: Option.isSome(existingRow)
1188+
? existingRow.value.turnId
1189+
: event.payload.activity.turnId,
1190+
status: "resolved",
1191+
createdAt: Option.isSome(existingRow)
1192+
? existingRow.value.createdAt
1193+
: event.payload.activity.createdAt,
1194+
resolvedAt: event.payload.activity.createdAt,
1195+
});
1196+
return;
1197+
}
1198+
if (event.payload.activity.kind !== "user-input.requested") {
1199+
return;
1200+
}
1201+
if (Option.isSome(existingRow) && existingRow.value.status === "resolved") {
1202+
return;
1203+
}
1204+
yield* projectionPendingUserInputRepository.upsert({
1205+
requestId,
1206+
threadId: event.payload.threadId,
1207+
turnId: event.payload.activity.turnId,
1208+
status: "pending",
1209+
createdAt: Option.isSome(existingRow)
1210+
? existingRow.value.createdAt
1211+
: event.payload.activity.createdAt,
1212+
resolvedAt: null,
1213+
});
1214+
return;
1215+
}
1216+
1217+
case "thread.user-input-response-requested": {
1218+
const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({
1219+
requestId: event.payload.requestId,
1220+
});
1221+
yield* projectionPendingUserInputRepository.upsert({
1222+
requestId: event.payload.requestId,
1223+
threadId: Option.isSome(existingRow)
1224+
? existingRow.value.threadId
1225+
: event.payload.threadId,
1226+
turnId: Option.isSome(existingRow) ? existingRow.value.turnId : null,
1227+
status: "resolved",
1228+
createdAt: Option.isSome(existingRow)
1229+
? existingRow.value.createdAt
1230+
: event.payload.createdAt,
1231+
resolvedAt: event.payload.createdAt,
1232+
});
1233+
return;
1234+
}
1235+
1236+
default:
1237+
return;
1238+
}
1239+
});
1240+
11481241
const projectors: ReadonlyArray<ProjectorDefinition> = [
11491242
{
11501243
name: ORCHESTRATION_PROJECTOR_NAMES.projects,
@@ -1178,6 +1271,10 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
11781271
name: ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals,
11791272
apply: applyPendingApprovalsProjection,
11801273
},
1274+
{
1275+
name: ORCHESTRATION_PROJECTOR_NAMES.pendingUserInputs,
1276+
apply: applyPendingUserInputsProjection,
1277+
},
11811278
{
11821279
name: ORCHESTRATION_PROJECTOR_NAMES.threads,
11831280
apply: applyThreadsProjection,
@@ -1282,5 +1379,6 @@ export const OrchestrationProjectionPipelineLive = Layer.effect(
12821379
Layer.provideMerge(ProjectionThreadSessionRepositoryLive),
12831380
Layer.provideMerge(ProjectionTurnRepositoryLive),
12841381
Layer.provideMerge(ProjectionPendingApprovalRepositoryLive),
1382+
Layer.provideMerge(ProjectionPendingUserInputRepositoryLive),
12851383
Layer.provideMerge(ProjectionStateRepositoryLive),
12861384
);

0 commit comments

Comments
 (0)