Skip to content

Commit 0a32dc0

Browse files
authored
Archive project threads when enforcing project limits (#380)
- Cascade project deletion to active threads and clear retained thread payloads - Treat archived projects and threads as unavailable in command invariants - Add coverage for limit handling and deletion projection cleanup
1 parent fd2d349 commit 0a32dc0

9 files changed

Lines changed: 634 additions & 69 deletions

File tree

apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
ApprovalRequestId,
23
CheckpointRef,
34
CommandId,
45
CorrelationId,
@@ -910,11 +911,14 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta
910911
Effect.gen(function* () {
911912
const fileSystem = yield* FileSystem.FileSystem;
912913
const path = yield* Path.Path;
914+
const sql = yield* SqlClient.SqlClient;
913915
const projectionPipeline = yield* OrchestrationProjectionPipeline;
914916
const eventStore = yield* OrchestrationEventStore;
915917
const { attachmentsDir } = yield* ServerConfig;
916918
const now = new Date().toISOString();
917919
const threadId = ThreadId.makeUnsafe("Thread Delete.Files");
920+
const turnId = TurnId.makeUnsafe("turn-delete-files");
921+
const approvalRequestId = ApprovalRequestId.makeUnsafe("approval-delete-files");
918922
const attachmentId = "thread-delete-files-00000000-0000-4000-8000-000000000001";
919923
const otherThreadAttachmentId =
920924
"thread-delete-files-extra-00000000-0000-4000-8000-000000000002";
@@ -968,15 +972,91 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta
968972
},
969973
});
970974

975+
yield* appendAndProject({
976+
type: "thread.proposed-plan-upserted",
977+
eventId: EventId.makeUnsafe("evt-delete-files-3a"),
978+
aggregateKind: "thread",
979+
aggregateId: threadId,
980+
occurredAt: now,
981+
commandId: CommandId.makeUnsafe("cmd-delete-files-3a"),
982+
causationEventId: null,
983+
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3a"),
984+
metadata: {},
985+
payload: {
986+
threadId,
987+
proposedPlan: {
988+
id: "plan-delete-files",
989+
turnId,
990+
planMarkdown: "1. Delete files",
991+
implementedAt: null,
992+
implementationThreadId: null,
993+
createdAt: now,
994+
updatedAt: now,
995+
},
996+
},
997+
});
998+
999+
yield* appendAndProject({
1000+
type: "thread.session-set",
1001+
eventId: EventId.makeUnsafe("evt-delete-files-3b"),
1002+
aggregateKind: "thread",
1003+
aggregateId: threadId,
1004+
occurredAt: now,
1005+
commandId: CommandId.makeUnsafe("cmd-delete-files-3b"),
1006+
causationEventId: null,
1007+
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3b"),
1008+
metadata: {},
1009+
payload: {
1010+
threadId,
1011+
session: {
1012+
threadId,
1013+
status: "running",
1014+
providerName: "codex",
1015+
runtimeMode: "full-access",
1016+
activeTurnId: turnId,
1017+
lastError: null,
1018+
updatedAt: now,
1019+
},
1020+
},
1021+
});
1022+
1023+
yield* appendAndProject({
1024+
type: "thread.activity-appended",
1025+
eventId: EventId.makeUnsafe("evt-delete-files-3c"),
1026+
aggregateKind: "thread",
1027+
aggregateId: threadId,
1028+
occurredAt: now,
1029+
commandId: CommandId.makeUnsafe("cmd-delete-files-3c"),
1030+
causationEventId: null,
1031+
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3c"),
1032+
metadata: {
1033+
requestId: approvalRequestId,
1034+
},
1035+
payload: {
1036+
threadId,
1037+
activity: {
1038+
id: EventId.makeUnsafe("activity-delete-files"),
1039+
tone: "approval",
1040+
kind: "approval.requested",
1041+
summary: "Delete files approval",
1042+
payload: {
1043+
requestId: approvalRequestId,
1044+
},
1045+
turnId,
1046+
createdAt: now,
1047+
},
1048+
},
1049+
});
1050+
9711051
yield* appendAndProject({
9721052
type: "thread.message-sent",
973-
eventId: EventId.makeUnsafe("evt-delete-files-3"),
1053+
eventId: EventId.makeUnsafe("evt-delete-files-3d"),
9741054
aggregateKind: "thread",
9751055
aggregateId: threadId,
9761056
occurredAt: now,
977-
commandId: CommandId.makeUnsafe("cmd-delete-files-3"),
1057+
commandId: CommandId.makeUnsafe("cmd-delete-files-3d"),
9781058
causationEventId: null,
979-
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3"),
1059+
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3d"),
9801060
metadata: {},
9811061
payload: {
9821062
threadId,
@@ -1028,6 +1108,42 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta
10281108

10291109
assert.isFalse(yield* exists(threadAttachmentPath));
10301110
assert.isTrue(yield* exists(otherThreadAttachmentPath));
1111+
1112+
const childRowCounts = yield* sql<{
1113+
readonly messages: number;
1114+
readonly plans: number;
1115+
readonly activities: number;
1116+
readonly sessions: number;
1117+
readonly turns: number;
1118+
readonly approvals: number;
1119+
}>`
1120+
SELECT
1121+
(SELECT COUNT(*) FROM projection_thread_messages WHERE thread_id = ${threadId}) AS "messages",
1122+
(SELECT COUNT(*) FROM projection_thread_proposed_plans WHERE thread_id = ${threadId}) AS "plans",
1123+
(SELECT COUNT(*) FROM projection_thread_activities WHERE thread_id = ${threadId}) AS "activities",
1124+
(SELECT COUNT(*) FROM projection_thread_sessions WHERE thread_id = ${threadId}) AS "sessions",
1125+
(SELECT COUNT(*) FROM projection_turns WHERE thread_id = ${threadId}) AS "turns",
1126+
(SELECT COUNT(*) FROM projection_pending_approvals WHERE thread_id = ${threadId}) AS "approvals"
1127+
`;
1128+
assert.deepEqual(childRowCounts, [
1129+
{
1130+
messages: 0,
1131+
plans: 0,
1132+
activities: 0,
1133+
sessions: 0,
1134+
turns: 0,
1135+
approvals: 0,
1136+
},
1137+
]);
1138+
1139+
const threadRows = yield* sql<{
1140+
readonly deletedAt: string | null;
1141+
}>`
1142+
SELECT deleted_at AS "deletedAt"
1143+
FROM projection_threads
1144+
WHERE thread_id = ${threadId}
1145+
`;
1146+
assert.deepEqual(threadRows, [{ deletedAt: now }]);
10311147
}),
10321148
);
10331149
},

apps/server/src/orchestration/Layers/ProjectionPipeline.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent } from "@okcode/contracts";
1+
import {
2+
ApprovalRequestId,
3+
type ChatAttachment,
4+
type OrchestrationEvent,
5+
type ThreadId,
6+
} from "@okcode/contracts";
27
import * as NodeServices from "@effect/platform-node/NodeServices";
38
import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect";
49
import * as SqlClient from "effect/unstable/sql/SqlClient";
@@ -347,6 +352,27 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
347352
const path = yield* Path.Path;
348353
const serverConfig = yield* ServerConfig;
349354

355+
const deleteProjectedThreadChildren = (threadId: ThreadId) =>
356+
Effect.gen(function* () {
357+
yield* projectionThreadMessageRepository.deleteByThreadId({ threadId });
358+
yield* projectionThreadProposedPlanRepository.deleteByThreadId({ threadId });
359+
yield* projectionThreadActivityRepository.deleteByThreadId({ threadId });
360+
yield* projectionThreadSessionRepository.deleteByThreadId({ threadId });
361+
yield* projectionTurnRepository.deleteByThreadId({ threadId });
362+
363+
const pendingApprovals = yield* projectionPendingApprovalRepository.listByThreadId({
364+
threadId,
365+
});
366+
yield* Effect.forEach(
367+
pendingApprovals,
368+
(approval) =>
369+
projectionPendingApprovalRepository.deleteByRequestId({
370+
requestId: approval.requestId,
371+
}),
372+
{ concurrency: 1 },
373+
).pipe(Effect.asVoid);
374+
});
375+
350376
const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) =>
351377
Effect.gen(function* () {
352378
switch (event.type) {
@@ -489,13 +515,15 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
489515
threadId: event.payload.threadId,
490516
});
491517
if (Option.isNone(existingRow)) {
518+
yield* deleteProjectedThreadChildren(event.payload.threadId);
492519
return;
493520
}
494521
yield* projectionThreadRepository.upsert({
495522
...existingRow.value,
496523
deletedAt: event.payload.deletedAt,
497524
updatedAt: event.payload.deletedAt,
498525
});
526+
yield* deleteProjectedThreadChildren(event.payload.threadId);
499527
return;
500528
}
501529

apps/server/src/orchestration/commandInvariants.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { Effect } from "effect";
1212

1313
import {
1414
findThreadById,
15+
requireProject,
1516
listThreadsByProjectId,
1617
requireNonNegativeInteger,
1718
requireThread,
@@ -44,6 +45,16 @@ const readModel: OrchestrationReadModel = {
4445
updatedAt: now,
4546
deletedAt: null,
4647
},
48+
{
49+
id: ProjectId.makeUnsafe("project-archived"),
50+
title: "Project Archived",
51+
workspaceRoot: "/tmp/project-archived",
52+
defaultModel: "gpt-5-codex",
53+
scripts: [],
54+
createdAt: now,
55+
updatedAt: now,
56+
deletedAt: now,
57+
},
4758
],
4859
threads: [
4960
{
@@ -84,6 +95,25 @@ const readModel: OrchestrationReadModel = {
8495
checkpoints: [],
8596
deletedAt: null,
8697
},
98+
{
99+
id: ThreadId.makeUnsafe("thread-archived"),
100+
projectId: ProjectId.makeUnsafe("project-archived"),
101+
title: "Thread Archived",
102+
model: "gpt-5-codex",
103+
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
104+
runtimeMode: "full-access",
105+
branch: null,
106+
worktreePath: null,
107+
createdAt: now,
108+
updatedAt: now,
109+
latestTurn: null,
110+
messages: [],
111+
session: null,
112+
activities: [],
113+
proposedPlans: [],
114+
checkpoints: [],
115+
deletedAt: now,
116+
},
87117
],
88118
};
89119

@@ -123,6 +153,16 @@ describe("commandInvariants", () => {
123153
);
124154
expect(thread.id).toBe(ThreadId.makeUnsafe("thread-1"));
125155

156+
await expect(
157+
Effect.runPromise(
158+
requireThread({
159+
readModel,
160+
command: messageSendCommand,
161+
threadId: ThreadId.makeUnsafe("thread-archived"),
162+
}),
163+
),
164+
).rejects.toThrow("has been archived");
165+
126166
await expect(
127167
Effect.runPromise(
128168
requireThread({
@@ -134,6 +174,34 @@ describe("commandInvariants", () => {
134174
).rejects.toThrow("does not exist");
135175
});
136176

177+
it("requires active projects for non-create flows", async () => {
178+
await Effect.runPromise(
179+
requireProject({
180+
readModel,
181+
command: {
182+
type: "project.meta.update",
183+
commandId: CommandId.makeUnsafe("cmd-project-update"),
184+
projectId: ProjectId.makeUnsafe("project-a"),
185+
},
186+
projectId: ProjectId.makeUnsafe("project-a"),
187+
}),
188+
);
189+
190+
await expect(
191+
Effect.runPromise(
192+
requireProject({
193+
readModel,
194+
command: {
195+
type: "project.meta.update",
196+
commandId: CommandId.makeUnsafe("cmd-project-update-archived"),
197+
projectId: ProjectId.makeUnsafe("project-archived"),
198+
},
199+
projectId: ProjectId.makeUnsafe("project-archived"),
200+
}),
201+
),
202+
).rejects.toThrow("has been archived");
203+
});
204+
137205
it("requires missing thread for create flows", async () => {
138206
await Effect.runPromise(
139207
requireThreadAbsent({

0 commit comments

Comments
 (0)