Skip to content

Commit 68c3c3e

Browse files
committed
fix(orchestration): settle turns and retain task activities
- settle the latest turn when a session transitions out of running - preserve task.started/progress/completed activities when trimming history - emit and dedupe subagent task events for OpenCode and Cursor
1 parent f8ce332 commit 68c3c3e

15 files changed

Lines changed: 1426 additions & 119 deletions

apps/server/src/orchestration/Layers/ProjectionPipeline.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
ApprovalRequestId,
33
type ChatAttachment,
44
type OrchestrationEvent,
5+
type OrchestrationSessionStatus,
56
ThreadId,
67
} from "@marcode/contracts";
78
import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect";
@@ -82,6 +83,23 @@ const materializeAttachmentsForProjection = Effect.fn("materializeAttachmentsFor
8283
Effect.succeed(input.attachments.length === 0 ? [] : input.attachments),
8384
);
8485

86+
function completedSessionStatusToTurnState(
87+
status: OrchestrationSessionStatus,
88+
): ProjectionTurn["state"] | null {
89+
switch (status) {
90+
case "ready":
91+
case "idle":
92+
return "completed";
93+
case "error":
94+
return "error";
95+
case "interrupted":
96+
case "stopped":
97+
return "interrupted";
98+
default:
99+
return null;
100+
}
101+
}
102+
85103
function extractActivityRequestId(payload: unknown): ApprovalRequestId | null {
86104
if (typeof payload !== "object" || payload === null) {
87105
return null;
@@ -982,6 +1000,33 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
9821000
case "thread.session-set": {
9831001
const turnId = event.payload.session.activeTurnId;
9841002
if (turnId === null || event.payload.session.status !== "running") {
1003+
const nextState = completedSessionStatusToTurnState(event.payload.session.status);
1004+
if (nextState === null) {
1005+
return;
1006+
}
1007+
const existingTurns = yield* projectionTurnRepository.listByThreadId({
1008+
threadId: event.payload.threadId,
1009+
});
1010+
const latestIncompleteTurn = existingTurns
1011+
.toReversed()
1012+
.find(
1013+
(turn) =>
1014+
turn.turnId !== null && turn.state === "running" && turn.completedAt === null,
1015+
);
1016+
if (latestIncompleteTurn === undefined) {
1017+
return;
1018+
}
1019+
const latestIncompleteTurnId = latestIncompleteTurn.turnId;
1020+
if (latestIncompleteTurnId === null) {
1021+
return;
1022+
}
1023+
yield* projectionTurnRepository.upsertByTurnId({
1024+
...latestIncompleteTurn,
1025+
turnId: latestIncompleteTurnId,
1026+
state: nextState,
1027+
startedAt: latestIncompleteTurn.startedAt ?? event.payload.session.updatedAt,
1028+
completedAt: event.payload.session.updatedAt,
1029+
});
9851030
return;
9861031
}
9871032

apps/server/src/orchestration/projector.test.ts

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ function makeEvent(input: {
3737
} as OrchestrationEvent;
3838
}
3939

40+
function timestamp(index: number): string {
41+
return new Date(Date.UTC(2026, 1, 27, 0, 0, index)).toISOString();
42+
}
43+
4044
describe("orchestration projector", () => {
4145
it("applies thread.created events", async () => {
4246
const now = new Date().toISOString();
@@ -301,6 +305,103 @@ describe("orchestration projector", () => {
301305
expect(thread?.session?.status).toBe("running");
302306
});
303307

308+
it("settles latest turn when Codex session becomes ready without a message or checkpoint", async () => {
309+
const createdAt = "2026-02-23T08:00:00.000Z";
310+
const startedAt = "2026-02-23T08:00:01.000Z";
311+
const completedAt = "2026-02-23T08:00:03.000Z";
312+
const model = createEmptyReadModel(createdAt);
313+
314+
const afterCreate = await Effect.runPromise(
315+
projectEvent(
316+
model,
317+
makeEvent({
318+
sequence: 1,
319+
type: "thread.created",
320+
aggregateKind: "thread",
321+
aggregateId: "thread-1",
322+
occurredAt: createdAt,
323+
commandId: "cmd-create",
324+
payload: {
325+
threadId: "thread-1",
326+
projectId: "project-1",
327+
title: "demo",
328+
modelSelection: {
329+
provider: "codex",
330+
model: "gpt-5.3-codex",
331+
},
332+
runtimeMode: "full-access",
333+
branch: null,
334+
worktreePath: null,
335+
createdAt,
336+
updatedAt: createdAt,
337+
},
338+
}),
339+
),
340+
);
341+
342+
const afterRunning = await Effect.runPromise(
343+
projectEvent(
344+
afterCreate,
345+
makeEvent({
346+
sequence: 2,
347+
type: "thread.session-set",
348+
aggregateKind: "thread",
349+
aggregateId: "thread-1",
350+
occurredAt: startedAt,
351+
commandId: "cmd-running",
352+
payload: {
353+
threadId: "thread-1",
354+
session: {
355+
threadId: "thread-1",
356+
status: "running",
357+
providerName: "codex",
358+
providerSessionId: "session-1",
359+
providerThreadId: "provider-thread-1",
360+
runtimeMode: "full-access",
361+
activeTurnId: "turn-1",
362+
lastError: null,
363+
updatedAt: startedAt,
364+
},
365+
},
366+
}),
367+
),
368+
);
369+
370+
const afterReady = await Effect.runPromise(
371+
projectEvent(
372+
afterRunning,
373+
makeEvent({
374+
sequence: 3,
375+
type: "thread.session-set",
376+
aggregateKind: "thread",
377+
aggregateId: "thread-1",
378+
occurredAt: completedAt,
379+
commandId: "cmd-ready",
380+
payload: {
381+
threadId: "thread-1",
382+
session: {
383+
threadId: "thread-1",
384+
status: "ready",
385+
providerName: "codex",
386+
providerSessionId: "session-1",
387+
providerThreadId: "provider-thread-1",
388+
runtimeMode: "full-access",
389+
activeTurnId: null,
390+
lastError: null,
391+
updatedAt: completedAt,
392+
},
393+
},
394+
}),
395+
),
396+
);
397+
398+
expect(afterReady.threads[0]?.latestTurn).toMatchObject({
399+
turnId: "turn-1",
400+
state: "completed",
401+
completedAt,
402+
});
403+
});
404+
304405
it("updates canonical thread runtime mode from thread.runtime-mode-set", async () => {
305406
const createdAt = "2026-02-23T08:00:00.000Z";
306407
const updatedAt = "2026-02-23T08:00:05.000Z";
@@ -914,4 +1015,154 @@ describe("orchestration projector", () => {
9141015
expect(thread?.checkpoints[0]?.turnId).toBe("turn-100");
9151016
expect(thread?.checkpoints.at(-1)?.turnId).toBe("turn-599");
9161017
});
1018+
1019+
it("retains task lifecycle activities when the activity window is capped", async () => {
1020+
const createdAt = timestamp(0);
1021+
const afterCreate = await Effect.runPromise(
1022+
projectEvent(
1023+
createEmptyReadModel(createdAt),
1024+
makeEvent({
1025+
sequence: 1,
1026+
type: "thread.created",
1027+
aggregateKind: "thread",
1028+
aggregateId: "thread-activity-cap",
1029+
occurredAt: createdAt,
1030+
commandId: "cmd-create",
1031+
payload: {
1032+
threadId: "thread-activity-cap",
1033+
projectId: "project-1",
1034+
title: "activity cap",
1035+
modelSelection: {
1036+
provider: "opencode",
1037+
model: "gpt-5.5",
1038+
},
1039+
runtimeMode: "full-access",
1040+
interactionMode: "default",
1041+
branch: null,
1042+
worktreePath: null,
1043+
additionalDirectories: [],
1044+
implementingJiraTicketKeys: [],
1045+
createdAt,
1046+
updatedAt: createdAt,
1047+
},
1048+
}),
1049+
),
1050+
);
1051+
1052+
const taskEvents: ReadonlyArray<OrchestrationEvent> = [
1053+
makeEvent({
1054+
sequence: 2,
1055+
type: "thread.activity-appended",
1056+
aggregateKind: "thread",
1057+
aggregateId: "thread-activity-cap",
1058+
occurredAt: timestamp(1),
1059+
commandId: "cmd-task-start",
1060+
payload: {
1061+
threadId: "thread-activity-cap",
1062+
activity: {
1063+
id: "old-task-start",
1064+
tone: "info",
1065+
kind: "task.started",
1066+
summary: "subagent task started",
1067+
payload: {
1068+
taskId: "task-1",
1069+
taskType: "subagent",
1070+
},
1071+
turnId: "turn-1",
1072+
createdAt: timestamp(1),
1073+
},
1074+
},
1075+
}),
1076+
makeEvent({
1077+
sequence: 3,
1078+
type: "thread.activity-appended",
1079+
aggregateKind: "thread",
1080+
aggregateId: "thread-activity-cap",
1081+
occurredAt: timestamp(2),
1082+
commandId: "cmd-task-progress",
1083+
payload: {
1084+
threadId: "thread-activity-cap",
1085+
activity: {
1086+
id: "old-task-progress",
1087+
tone: "info",
1088+
kind: "task.progress",
1089+
summary: "subagent task progress",
1090+
payload: {
1091+
taskId: "task-1",
1092+
detail: "Reading files",
1093+
},
1094+
turnId: "turn-1",
1095+
createdAt: timestamp(2),
1096+
},
1097+
},
1098+
}),
1099+
makeEvent({
1100+
sequence: 4,
1101+
type: "thread.activity-appended",
1102+
aggregateKind: "thread",
1103+
aggregateId: "thread-activity-cap",
1104+
occurredAt: timestamp(3),
1105+
commandId: "cmd-task-complete",
1106+
payload: {
1107+
threadId: "thread-activity-cap",
1108+
activity: {
1109+
id: "old-task-completed",
1110+
tone: "info",
1111+
kind: "task.completed",
1112+
summary: "subagent task completed",
1113+
payload: {
1114+
taskId: "task-1",
1115+
status: "completed",
1116+
},
1117+
turnId: "turn-1",
1118+
createdAt: timestamp(3),
1119+
},
1120+
},
1121+
}),
1122+
];
1123+
const noiseEvents: ReadonlyArray<OrchestrationEvent> = Array.from({ length: 501 }, (_, index) =>
1124+
makeEvent({
1125+
sequence: index + 5,
1126+
type: "thread.activity-appended",
1127+
aggregateKind: "thread",
1128+
aggregateId: "thread-activity-cap",
1129+
occurredAt: timestamp(index + 4),
1130+
commandId: `cmd-noise-${index}`,
1131+
payload: {
1132+
threadId: "thread-activity-cap",
1133+
activity: {
1134+
id: `noise-${index}`,
1135+
tone: "info",
1136+
kind: "tool.completed",
1137+
summary: `Noise ${index}`,
1138+
payload: {
1139+
itemId: `noise-${index}`,
1140+
itemType: "dynamic_tool_call",
1141+
},
1142+
turnId: "turn-1",
1143+
createdAt: timestamp(index + 4),
1144+
},
1145+
},
1146+
}),
1147+
);
1148+
1149+
const finalState = await [...taskEvents, ...noiseEvents].reduce<
1150+
Promise<ReturnType<typeof createEmptyReadModel>>
1151+
>(
1152+
(statePromise, event) =>
1153+
statePromise.then((state) => Effect.runPromise(projectEvent(state, event))),
1154+
Promise.resolve(afterCreate),
1155+
);
1156+
1157+
const retainedIds = finalState.threads[0]?.activities.map((activity) => activity.id);
1158+
expect(retainedIds).toHaveLength(500);
1159+
expect(retainedIds).toContain("old-task-start");
1160+
expect(retainedIds).toContain("old-task-progress");
1161+
expect(retainedIds).toContain("old-task-completed");
1162+
expect(retainedIds).toContain("noise-500");
1163+
expect(retainedIds).not.toContain("noise-0");
1164+
expect(retainedIds).not.toContain("noise-1");
1165+
expect(retainedIds).not.toContain("noise-2");
1166+
expect(retainedIds).not.toContain("noise-3");
1167+
});
9171168
});

0 commit comments

Comments
 (0)