Skip to content

Commit 57f6bf7

Browse files
Fix turn fold proejctions (pingdotgg#3041)
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 0baf198 commit 57f6bf7

24 files changed

Lines changed: 1646 additions & 434 deletions

apps/server/scripts/acp-mock-agent.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION ===
2222
const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1";
2323
const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1";
2424
const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT;
25+
const promptDelayMs = Number(process.env.T3_ACP_PROMPT_DELAY_MS ?? "0");
2526
const permissionOptionIds = {
2627
allowOnce: process.env.T3_ACP_ALLOW_ONCE_OPTION_ID ?? "allow-once",
2728
allowAlways: process.env.T3_ACP_ALLOW_ALWAYS_OPTION_ID ?? "allow-always",
@@ -364,6 +365,10 @@ const program = Effect.gen(function* () {
364365
Effect.gen(function* () {
365366
const requestedSessionId = String(request.sessionId ?? sessionId);
366367

368+
if (Number.isFinite(promptDelayMs) && promptDelayMs > 0) {
369+
yield* Effect.sleep(`${promptDelayMs} millis`);
370+
}
371+
367372
if (emitInterleavedAssistantToolCalls) {
368373
const toolCallId = "tool-call-1";
369374

apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,227 @@ it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => {
12321232
}),
12331233
);
12341234

1235+
it.effect("keeps the turn running across interim assistant messages until the session ends", () =>
1236+
Effect.gen(function* () {
1237+
const projectionPipeline = yield* OrchestrationProjectionPipeline;
1238+
const eventStore = yield* OrchestrationEventStore;
1239+
const sql = yield* SqlClient.SqlClient;
1240+
const now = "2026-01-01T00:00:00.000Z";
1241+
const threadId = ThreadId.make("thread-turn-lifecycle");
1242+
const turnId = TurnId.make("turn-lifecycle-1");
1243+
1244+
yield* eventStore.append({
1245+
type: "thread.created",
1246+
eventId: EventId.make("evt-tl1"),
1247+
aggregateKind: "thread",
1248+
aggregateId: threadId,
1249+
occurredAt: now,
1250+
commandId: CommandId.make("cmd-tl1"),
1251+
causationEventId: null,
1252+
correlationId: CorrelationId.make("cmd-tl1"),
1253+
metadata: {},
1254+
payload: {
1255+
threadId,
1256+
projectId: ProjectId.make("project-turn-lifecycle"),
1257+
title: "Turn lifecycle",
1258+
modelSelection: {
1259+
instanceId: ProviderInstanceId.make("claude"),
1260+
model: "claude-opus",
1261+
},
1262+
runtimeMode: "full-access",
1263+
branch: null,
1264+
worktreePath: null,
1265+
createdAt: now,
1266+
updatedAt: now,
1267+
},
1268+
});
1269+
1270+
yield* eventStore.append({
1271+
type: "thread.session-set",
1272+
eventId: EventId.make("evt-tl2"),
1273+
aggregateKind: "thread",
1274+
aggregateId: threadId,
1275+
occurredAt: "2026-01-01T00:00:01.000Z",
1276+
commandId: CommandId.make("cmd-tl2"),
1277+
causationEventId: null,
1278+
correlationId: CorrelationId.make("cmd-tl2"),
1279+
metadata: {},
1280+
payload: {
1281+
threadId,
1282+
session: {
1283+
threadId,
1284+
status: "running",
1285+
providerName: "claude",
1286+
runtimeMode: "full-access",
1287+
activeTurnId: turnId,
1288+
lastError: null,
1289+
updatedAt: "2026-01-01T00:00:01.000Z",
1290+
},
1291+
},
1292+
});
1293+
1294+
// Interim assistant message completes mid-turn (commentary between
1295+
// tool calls) — the turn must stay running and unsettled.
1296+
yield* eventStore.append({
1297+
type: "thread.message-sent",
1298+
eventId: EventId.make("evt-tl3"),
1299+
aggregateKind: "thread",
1300+
aggregateId: threadId,
1301+
occurredAt: "2026-01-01T00:00:05.000Z",
1302+
commandId: CommandId.make("cmd-tl3"),
1303+
causationEventId: null,
1304+
correlationId: CorrelationId.make("cmd-tl3"),
1305+
metadata: {},
1306+
payload: {
1307+
threadId,
1308+
messageId: MessageId.make("message-tl-interim"),
1309+
role: "assistant",
1310+
text: "interim commentary",
1311+
turnId,
1312+
streaming: false,
1313+
createdAt: "2026-01-01T00:00:05.000Z",
1314+
updatedAt: "2026-01-01T00:00:05.000Z",
1315+
},
1316+
});
1317+
1318+
yield* projectionPipeline.bootstrap;
1319+
1320+
const runningRows = yield* sql<{
1321+
readonly state: string;
1322+
readonly completedAt: string | null;
1323+
}>`
1324+
SELECT state, completed_at AS "completedAt"
1325+
FROM projection_turns
1326+
WHERE thread_id = ${threadId} AND turn_id = ${turnId}
1327+
`;
1328+
assert.deepEqual(runningRows, [{ state: "running", completedAt: null }]);
1329+
1330+
// The session leaving "running" is the turn-end signal.
1331+
yield* eventStore.append({
1332+
type: "thread.session-set",
1333+
eventId: EventId.make("evt-tl4"),
1334+
aggregateKind: "thread",
1335+
aggregateId: threadId,
1336+
occurredAt: "2026-01-01T00:01:00.000Z",
1337+
commandId: CommandId.make("cmd-tl4"),
1338+
causationEventId: null,
1339+
correlationId: CorrelationId.make("cmd-tl4"),
1340+
metadata: {},
1341+
payload: {
1342+
threadId,
1343+
session: {
1344+
threadId,
1345+
status: "ready",
1346+
providerName: "claude",
1347+
runtimeMode: "full-access",
1348+
activeTurnId: null,
1349+
lastError: null,
1350+
updatedAt: "2026-01-01T00:01:00.000Z",
1351+
},
1352+
},
1353+
});
1354+
1355+
yield* projectionPipeline.bootstrap;
1356+
1357+
const settledRows = yield* sql<{
1358+
readonly state: string;
1359+
readonly completedAt: string | null;
1360+
}>`
1361+
SELECT state, completed_at AS "completedAt"
1362+
FROM projection_turns
1363+
WHERE thread_id = ${threadId} AND turn_id = ${turnId}
1364+
`;
1365+
assert.deepEqual(settledRows, [
1366+
{ state: "completed", completedAt: "2026-01-01T00:01:00.000Z" },
1367+
]);
1368+
}),
1369+
);
1370+
1371+
it.effect("settles a superseded running turn when a new turn becomes active", () =>
1372+
Effect.gen(function* () {
1373+
const projectionPipeline = yield* OrchestrationProjectionPipeline;
1374+
const eventStore = yield* OrchestrationEventStore;
1375+
const sql = yield* SqlClient.SqlClient;
1376+
const now = "2026-01-01T00:00:00.000Z";
1377+
const threadId = ThreadId.make("thread-turn-supersede");
1378+
const oldTurnId = TurnId.make("turn-superseded");
1379+
const newTurnId = TurnId.make("turn-steer");
1380+
1381+
yield* eventStore.append({
1382+
type: "thread.created",
1383+
eventId: EventId.make("evt-ts1"),
1384+
aggregateKind: "thread",
1385+
aggregateId: threadId,
1386+
occurredAt: now,
1387+
commandId: CommandId.make("cmd-ts1"),
1388+
causationEventId: null,
1389+
correlationId: CorrelationId.make("cmd-ts1"),
1390+
metadata: {},
1391+
payload: {
1392+
threadId,
1393+
projectId: ProjectId.make("project-turn-supersede"),
1394+
title: "Turn supersede",
1395+
modelSelection: {
1396+
instanceId: ProviderInstanceId.make("opencode"),
1397+
model: "big-pickle",
1398+
},
1399+
runtimeMode: "full-access",
1400+
branch: null,
1401+
worktreePath: null,
1402+
createdAt: now,
1403+
updatedAt: now,
1404+
},
1405+
});
1406+
1407+
const appendRunningSessionSet = (eventId: string, turnId: TurnId, updatedAt: string) =>
1408+
eventStore.append({
1409+
type: "thread.session-set",
1410+
eventId: EventId.make(eventId),
1411+
aggregateKind: "thread",
1412+
aggregateId: threadId,
1413+
occurredAt: updatedAt,
1414+
commandId: CommandId.make(`cmd-${eventId}`),
1415+
causationEventId: null,
1416+
correlationId: CorrelationId.make(`cmd-${eventId}`),
1417+
metadata: {},
1418+
payload: {
1419+
threadId,
1420+
session: {
1421+
threadId,
1422+
status: "running",
1423+
providerName: "opencode",
1424+
runtimeMode: "full-access",
1425+
activeTurnId: turnId,
1426+
lastError: null,
1427+
updatedAt,
1428+
},
1429+
},
1430+
});
1431+
1432+
yield* appendRunningSessionSet("evt-ts2", oldTurnId, "2026-01-01T00:00:01.000Z");
1433+
// A steer: a new turn becomes active without the provider ever
1434+
// completing the previous one.
1435+
yield* appendRunningSessionSet("evt-ts3", newTurnId, "2026-01-01T00:00:30.000Z");
1436+
1437+
yield* projectionPipeline.bootstrap;
1438+
1439+
const rows = yield* sql<{
1440+
readonly turnId: string;
1441+
readonly state: string;
1442+
readonly completedAt: string | null;
1443+
}>`
1444+
SELECT turn_id AS "turnId", state, completed_at AS "completedAt"
1445+
FROM projection_turns
1446+
WHERE thread_id = ${threadId}
1447+
ORDER BY requested_at
1448+
`;
1449+
assert.deepEqual(rows, [
1450+
{ turnId: oldTurnId, state: "completed", completedAt: "2026-01-01T00:00:30.000Z" },
1451+
{ turnId: newTurnId, state: "running", completedAt: null },
1452+
]);
1453+
}),
1454+
);
1455+
12351456
it.effect("keeps accumulated assistant text when completion payload text is empty", () =>
12361457
Effect.gen(function* () {
12371458
const projectionPipeline = yield* OrchestrationProjectionPipeline;

0 commit comments

Comments
 (0)