Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ApprovalRequestId,
CheckpointRef,
CommandId,
CorrelationId,
Expand Down Expand Up @@ -910,11 +911,14 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta
Effect.gen(function* () {
const fileSystem = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const sql = yield* SqlClient.SqlClient;
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const { attachmentsDir } = yield* ServerConfig;
const now = new Date().toISOString();
const threadId = ThreadId.makeUnsafe("Thread Delete.Files");
const turnId = TurnId.makeUnsafe("turn-delete-files");
const approvalRequestId = ApprovalRequestId.makeUnsafe("approval-delete-files");
const attachmentId = "thread-delete-files-00000000-0000-4000-8000-000000000001";
const otherThreadAttachmentId =
"thread-delete-files-extra-00000000-0000-4000-8000-000000000002";
Expand Down Expand Up @@ -968,15 +972,91 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta
},
});

yield* appendAndProject({
type: "thread.proposed-plan-upserted",
eventId: EventId.makeUnsafe("evt-delete-files-3a"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-delete-files-3a"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3a"),
metadata: {},
payload: {
threadId,
proposedPlan: {
id: "plan-delete-files",
turnId,
planMarkdown: "1. Delete files",
implementedAt: null,
implementationThreadId: null,
createdAt: now,
updatedAt: now,
},
},
});

yield* appendAndProject({
type: "thread.session-set",
eventId: EventId.makeUnsafe("evt-delete-files-3b"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-delete-files-3b"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3b"),
metadata: {},
payload: {
threadId,
session: {
threadId,
status: "running",
providerName: "codex",
runtimeMode: "full-access",
activeTurnId: turnId,
lastError: null,
updatedAt: now,
},
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.makeUnsafe("evt-delete-files-3c"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-delete-files-3c"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3c"),
metadata: {
requestId: approvalRequestId,
},
payload: {
threadId,
activity: {
id: EventId.makeUnsafe("activity-delete-files"),
tone: "approval",
kind: "approval.requested",
summary: "Delete files approval",
payload: {
requestId: approvalRequestId,
},
turnId,
createdAt: now,
},
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.makeUnsafe("evt-delete-files-3"),
eventId: EventId.makeUnsafe("evt-delete-files-3d"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-delete-files-3"),
commandId: CommandId.makeUnsafe("cmd-delete-files-3d"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3"),
correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3d"),
metadata: {},
payload: {
threadId,
Expand Down Expand Up @@ -1028,6 +1108,42 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta

assert.isFalse(yield* exists(threadAttachmentPath));
assert.isTrue(yield* exists(otherThreadAttachmentPath));

const childRowCounts = yield* sql<{
readonly messages: number;
readonly plans: number;
readonly activities: number;
readonly sessions: number;
readonly turns: number;
readonly approvals: number;
}>`
SELECT
(SELECT COUNT(*) FROM projection_thread_messages WHERE thread_id = ${threadId}) AS "messages",
(SELECT COUNT(*) FROM projection_thread_proposed_plans WHERE thread_id = ${threadId}) AS "plans",
(SELECT COUNT(*) FROM projection_thread_activities WHERE thread_id = ${threadId}) AS "activities",
(SELECT COUNT(*) FROM projection_thread_sessions WHERE thread_id = ${threadId}) AS "sessions",
(SELECT COUNT(*) FROM projection_turns WHERE thread_id = ${threadId}) AS "turns",
(SELECT COUNT(*) FROM projection_pending_approvals WHERE thread_id = ${threadId}) AS "approvals"
`;
assert.deepEqual(childRowCounts, [
{
messages: 0,
plans: 0,
activities: 0,
sessions: 0,
turns: 0,
approvals: 0,
},
]);

const threadRows = yield* sql<{
readonly deletedAt: string | null;
}>`
SELECT deleted_at AS "deletedAt"
FROM projection_threads
WHERE thread_id = ${threadId}
`;
assert.deepEqual(threadRows, [{ deletedAt: now }]);
}),
);
},
Expand Down
30 changes: 29 additions & 1 deletion apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent } from "@okcode/contracts";
import {
ApprovalRequestId,
type ChatAttachment,
type OrchestrationEvent,
type ThreadId,
} from "@okcode/contracts";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect";
import * as SqlClient from "effect/unstable/sql/SqlClient";
Expand Down Expand Up @@ -347,6 +352,27 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
const path = yield* Path.Path;
const serverConfig = yield* ServerConfig;

const deleteProjectedThreadChildren = (threadId: ThreadId) =>
Effect.gen(function* () {
yield* projectionThreadMessageRepository.deleteByThreadId({ threadId });
yield* projectionThreadProposedPlanRepository.deleteByThreadId({ threadId });
yield* projectionThreadActivityRepository.deleteByThreadId({ threadId });
yield* projectionThreadSessionRepository.deleteByThreadId({ threadId });
yield* projectionTurnRepository.deleteByThreadId({ threadId });

const pendingApprovals = yield* projectionPendingApprovalRepository.listByThreadId({
threadId,
});
yield* Effect.forEach(
pendingApprovals,
(approval) =>
projectionPendingApprovalRepository.deleteByRequestId({
requestId: approval.requestId,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
});

const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) =>
Effect.gen(function* () {
switch (event.type) {
Expand Down Expand Up @@ -489,13 +515,15 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
threadId: event.payload.threadId,
});
if (Option.isNone(existingRow)) {
yield* deleteProjectedThreadChildren(event.payload.threadId);
return;
}
yield* projectionThreadRepository.upsert({
...existingRow.value,
deletedAt: event.payload.deletedAt,
updatedAt: event.payload.deletedAt,
});
yield* deleteProjectedThreadChildren(event.payload.threadId);
return;
}

Expand Down
68 changes: 68 additions & 0 deletions apps/server/src/orchestration/commandInvariants.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Effect } from "effect";

import {
findThreadById,
requireProject,
listThreadsByProjectId,
requireNonNegativeInteger,
requireThread,
Expand Down Expand Up @@ -44,6 +45,16 @@ const readModel: OrchestrationReadModel = {
updatedAt: now,
deletedAt: null,
},
{
id: ProjectId.makeUnsafe("project-archived"),
title: "Project Archived",
workspaceRoot: "/tmp/project-archived",
defaultModel: "gpt-5-codex",
scripts: [],
createdAt: now,
updatedAt: now,
deletedAt: now,
},
],
threads: [
{
Expand Down Expand Up @@ -84,6 +95,25 @@ const readModel: OrchestrationReadModel = {
checkpoints: [],
deletedAt: null,
},
{
id: ThreadId.makeUnsafe("thread-archived"),
projectId: ProjectId.makeUnsafe("project-archived"),
title: "Thread Archived",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
latestTurn: null,
messages: [],
session: null,
activities: [],
proposedPlans: [],
checkpoints: [],
deletedAt: now,
},
],
};

Expand Down Expand Up @@ -123,6 +153,16 @@ describe("commandInvariants", () => {
);
expect(thread.id).toBe(ThreadId.makeUnsafe("thread-1"));

await expect(
Effect.runPromise(
requireThread({
readModel,
command: messageSendCommand,
threadId: ThreadId.makeUnsafe("thread-archived"),
}),
),
).rejects.toThrow("has been archived");

await expect(
Effect.runPromise(
requireThread({
Expand All @@ -134,6 +174,34 @@ describe("commandInvariants", () => {
).rejects.toThrow("does not exist");
});

it("requires active projects for non-create flows", async () => {
await Effect.runPromise(
requireProject({
readModel,
command: {
type: "project.meta.update",
commandId: CommandId.makeUnsafe("cmd-project-update"),
projectId: ProjectId.makeUnsafe("project-a"),
},
projectId: ProjectId.makeUnsafe("project-a"),
}),
);

await expect(
Effect.runPromise(
requireProject({
readModel,
command: {
type: "project.meta.update",
commandId: CommandId.makeUnsafe("cmd-project-update-archived"),
projectId: ProjectId.makeUnsafe("project-archived"),
},
projectId: ProjectId.makeUnsafe("project-archived"),
}),
),
).rejects.toThrow("has been archived");
});

it("requires missing thread for create flows", async () => {
await Effect.runPromise(
requireThreadAbsent({
Expand Down
Loading
Loading