Skip to content

Commit d87ea8f

Browse files
committed
fix(orchestration): preserve interrupts in thread deletion cleanup
1 parent 6c3dc9c commit d87ea8f

2 files changed

Lines changed: 67 additions & 16 deletions

File tree

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { ThreadId } from "@t3tools/contracts";
2+
import { Cause, Effect, Exit } from "effect";
3+
import { describe, expect, it } from "vitest";
4+
5+
import { logCleanupCauseUnlessInterrupted } from "./ThreadDeletionReactor.ts";
6+
7+
describe("logCleanupCauseUnlessInterrupted", () => {
8+
const threadId = ThreadId.makeUnsafe("thread-deletion-reactor-test");
9+
10+
it("swallows ordinary cleanup failures", async () => {
11+
const exit = await Effect.runPromiseExit(
12+
logCleanupCauseUnlessInterrupted({
13+
effect: Effect.fail("cleanup failed"),
14+
message: "thread deletion cleanup skipped provider session stop",
15+
threadId,
16+
}),
17+
);
18+
19+
expect(Exit.isSuccess(exit)).toBe(true);
20+
});
21+
22+
it("preserves interrupt causes", async () => {
23+
const exit = await Effect.runPromiseExit(
24+
logCleanupCauseUnlessInterrupted({
25+
effect: Effect.interrupt,
26+
message: "thread deletion cleanup skipped provider session stop",
27+
threadId,
28+
}),
29+
);
30+
31+
expect(Exit.isFailure(exit)).toBe(true);
32+
if (Exit.isFailure(exit)) {
33+
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true);
34+
}
35+
});
36+
});

apps/server/src/orchestration/Layers/ThreadDeletionReactor.ts

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,45 @@ import {
1212

1313
type ThreadDeletedEvent = Extract<OrchestrationEvent, { type: "thread.deleted" }>;
1414

15+
export const logCleanupCauseUnlessInterrupted = <R, E>({
16+
effect,
17+
message,
18+
threadId,
19+
}: {
20+
readonly effect: Effect.Effect<void, E, R>;
21+
readonly message: string;
22+
readonly threadId: ThreadDeletedEvent["payload"]["threadId"];
23+
}): Effect.Effect<void, E, R> =>
24+
effect.pipe(
25+
Effect.catchCause((cause) => {
26+
if (Cause.hasInterruptsOnly(cause)) {
27+
return Effect.failCause(cause);
28+
}
29+
return Effect.logDebug(message, {
30+
threadId,
31+
cause: Cause.pretty(cause),
32+
});
33+
}),
34+
);
35+
1536
const make = Effect.gen(function* () {
1637
const orchestrationEngine = yield* OrchestrationEngineService;
1738
const providerService = yield* ProviderService;
1839
const terminalManager = yield* TerminalManager;
1940

2041
const stopProviderSession = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
21-
providerService.stopSession({ threadId }).pipe(
22-
Effect.catchCause((cause) =>
23-
Effect.logDebug("thread deletion cleanup skipped provider session stop", {
24-
threadId,
25-
cause: Cause.pretty(cause),
26-
}),
27-
),
28-
);
42+
logCleanupCauseUnlessInterrupted({
43+
effect: providerService.stopSession({ threadId }),
44+
message: "thread deletion cleanup skipped provider session stop",
45+
threadId,
46+
});
2947

3048
const closeThreadTerminals = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
31-
terminalManager.close({ threadId, deleteHistory: true }).pipe(
32-
Effect.catchCause((cause) =>
33-
Effect.logDebug("thread deletion cleanup skipped terminal close", {
34-
threadId,
35-
cause: Cause.pretty(cause),
36-
}),
37-
),
38-
);
49+
logCleanupCauseUnlessInterrupted({
50+
effect: terminalManager.close({ threadId, deleteHistory: true }),
51+
message: "thread deletion cleanup skipped terminal close",
52+
threadId,
53+
});
3954

4055
const processThreadDeleted = Effect.fn("processThreadDeleted")(function* (
4156
event: ThreadDeletedEvent,

0 commit comments

Comments
 (0)