Skip to content

Commit 4e0c003

Browse files
maria-rcksjuliusmarmingecodex
authored
fix(web): allow deleting non-empty projects from the warning toast (#1264)
Co-authored-by: Julius Marminge <julius0216@outlook.com> Co-authored-by: codex <codex@users.noreply.github.com>
1 parent 60387f6 commit 4e0c003

15 files changed

Lines changed: 679 additions & 52 deletions

apps/server/integration/OrchestrationEngineHarness.integration.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import {
5858
OrchestrationEngineService,
5959
type OrchestrationEngineShape,
6060
} from "../src/orchestration/Services/OrchestrationEngine.ts";
61+
import { ThreadDeletionReactor } from "../src/orchestration/Services/ThreadDeletionReactor.ts";
6162
import { OrchestrationReactor } from "../src/orchestration/Services/OrchestrationReactor.ts";
6263
import { ProjectionSnapshotQuery } from "../src/orchestration/Services/ProjectionSnapshotQuery.ts";
6364
import {
@@ -351,6 +352,12 @@ export const makeOrchestrationIntegrationHarness = (
351352
Layer.provideMerge(runtimeIngestionLayer),
352353
Layer.provideMerge(providerCommandReactorLayer),
353354
Layer.provideMerge(checkpointReactorLayer),
355+
Layer.provideMerge(
356+
Layer.succeed(ThreadDeletionReactor, {
357+
start: () => Effect.void,
358+
drain: Effect.void,
359+
}),
360+
),
354361
);
355362
const layer = Layer.empty.pipe(
356363
Layer.provideMerge(runtimeServicesLayer),

apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { afterEach, describe, expect, it } from "vitest";
44
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
55
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
66
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
7+
import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts";
78
import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts";
89
import { makeOrchestrationReactor } from "./OrchestrationReactor.ts";
910

@@ -17,7 +18,7 @@ describe("OrchestrationReactor", () => {
1718
runtime = null;
1819
});
1920

20-
it("starts provider ingestion, provider command, and checkpoint reactors", async () => {
21+
it("starts provider ingestion, provider command, checkpoint, and thread deletion reactors", async () => {
2122
const started: string[] = [];
2223

2324
runtime = ManagedRuntime.make(
@@ -49,17 +50,27 @@ describe("OrchestrationReactor", () => {
4950
drain: Effect.void,
5051
}),
5152
),
53+
Layer.provideMerge(
54+
Layer.succeed(ThreadDeletionReactor, {
55+
start: () => {
56+
started.push("thread-deletion-reactor");
57+
return Effect.void;
58+
},
59+
drain: Effect.void,
60+
}),
61+
),
5262
),
5363
);
5464

55-
const reactor = await runtime.runPromise(Effect.service(OrchestrationReactor));
65+
const reactor = await runtime!.runPromise(Effect.service(OrchestrationReactor));
5666
const scope = await Effect.runPromise(Scope.make("sequential"));
5767
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
5868

5969
expect(started).toEqual([
6070
"provider-runtime-ingestion",
6171
"provider-command-reactor",
6272
"checkpoint-reactor",
73+
"thread-deletion-reactor",
6374
]);
6475

6576
await Effect.runPromise(Scope.close(scope, Exit.void));

apps/server/src/orchestration/Layers/OrchestrationReactor.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,19 @@ import {
77
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
88
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
99
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
10+
import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts";
1011

1112
export const makeOrchestrationReactor = Effect.gen(function* () {
1213
const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService;
1314
const providerCommandReactor = yield* ProviderCommandReactor;
1415
const checkpointReactor = yield* CheckpointReactor;
16+
const threadDeletionReactor = yield* ThreadDeletionReactor;
1517

1618
const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () {
1719
yield* providerRuntimeIngestion.start();
1820
yield* providerCommandReactor.start();
1921
yield* checkpointReactor.start();
22+
yield* threadDeletionReactor.start();
2023
});
2124

2225
return {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { ThreadId } from "@t3tools/contracts";
2+
import { Cause, Effect, Exit } from "effect";
3+
import { describe, expect, it } from "vitest";
4+
5+
import { logCleanupCauseUnlessInterrupted } from "./ThreadDeletionReactor.ts";
6+
7+
describe("logCleanupCauseUnlessInterrupted", () => {
8+
const threadId = ThreadId.make("thread-deletion-reactor-test");
9+
10+
it("swallows ordinary cleanup failures", async () => {
11+
const exit = await Effect.runPromiseExit(
12+
logCleanupCauseUnlessInterrupted({
13+
effect: Effect.fail("cleanup failed"),
14+
message: "thread deletion cleanup skipped provider session stop",
15+
threadId,
16+
}),
17+
);
18+
19+
expect(Exit.isSuccess(exit)).toBe(true);
20+
});
21+
22+
it("preserves interrupt causes", async () => {
23+
const exit = await Effect.runPromiseExit(
24+
logCleanupCauseUnlessInterrupted({
25+
effect: Effect.interrupt,
26+
message: "thread deletion cleanup skipped provider session stop",
27+
threadId,
28+
}),
29+
);
30+
31+
expect(Exit.isFailure(exit)).toBe(true);
32+
if (Exit.isFailure(exit)) {
33+
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true);
34+
}
35+
});
36+
});
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import type { OrchestrationEvent } from "@t3tools/contracts";
2+
import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker";
3+
import { Cause, Effect, Layer, Stream } from "effect";
4+
5+
import { ProviderService } from "../../provider/Services/ProviderService.ts";
6+
import { TerminalManager } from "../../terminal/Services/Manager.ts";
7+
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
8+
import {
9+
ThreadDeletionReactor,
10+
type ThreadDeletionReactorShape,
11+
} from "../Services/ThreadDeletionReactor.ts";
12+
13+
type ThreadDeletedEvent = Extract<OrchestrationEvent, { type: "thread.deleted" }>;
14+
15+
export const logCleanupCauseUnlessInterrupted = <R, E>({
16+
effect,
17+
message,
18+
threadId,
19+
}: {
20+
readonly effect: Effect.Effect<void, E, R>;
21+
readonly message: string;
22+
readonly threadId: ThreadDeletedEvent["payload"]["threadId"];
23+
}): Effect.Effect<void, E, R> =>
24+
effect.pipe(
25+
Effect.catchCause((cause) => {
26+
if (Cause.hasInterruptsOnly(cause)) {
27+
return Effect.failCause(cause);
28+
}
29+
return Effect.logDebug(message, {
30+
threadId,
31+
cause: Cause.pretty(cause),
32+
});
33+
}),
34+
);
35+
36+
const make = Effect.gen(function* () {
37+
const orchestrationEngine = yield* OrchestrationEngineService;
38+
const providerService = yield* ProviderService;
39+
const terminalManager = yield* TerminalManager;
40+
41+
const stopProviderSession = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
42+
logCleanupCauseUnlessInterrupted({
43+
effect: providerService.stopSession({ threadId }),
44+
message: "thread deletion cleanup skipped provider session stop",
45+
threadId,
46+
});
47+
48+
const closeThreadTerminals = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
49+
logCleanupCauseUnlessInterrupted({
50+
effect: terminalManager.close({ threadId, deleteHistory: true }),
51+
message: "thread deletion cleanup skipped terminal close",
52+
threadId,
53+
});
54+
55+
const processThreadDeleted = Effect.fn("processThreadDeleted")(function* (
56+
event: ThreadDeletedEvent,
57+
) {
58+
const { threadId } = event.payload;
59+
yield* stopProviderSession(threadId);
60+
yield* closeThreadTerminals(threadId);
61+
});
62+
63+
const processThreadDeletedSafely = (event: ThreadDeletedEvent) =>
64+
processThreadDeleted(event).pipe(
65+
Effect.catchCause((cause) => {
66+
if (Cause.hasInterruptsOnly(cause)) {
67+
return Effect.failCause(cause);
68+
}
69+
return Effect.logWarning("thread deletion reactor failed to process event", {
70+
eventType: event.type,
71+
threadId: event.payload.threadId,
72+
cause: Cause.pretty(cause),
73+
});
74+
}),
75+
);
76+
77+
const worker = yield* makeDrainableWorker(processThreadDeletedSafely);
78+
79+
const start: ThreadDeletionReactorShape["start"] = Effect.fn("start")(function* () {
80+
yield* Effect.forkScoped(
81+
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
82+
if (event.type !== "thread.deleted") {
83+
return Effect.void;
84+
}
85+
return worker.enqueue(event);
86+
}),
87+
);
88+
});
89+
90+
return {
91+
start,
92+
drain: worker.drain,
93+
} satisfies ThreadDeletionReactorShape;
94+
});
95+
96+
export const ThreadDeletionReactorLive = Layer.effect(ThreadDeletionReactor, make);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* ThreadDeletionReactor - Thread deletion cleanup reactor service interface.
3+
*
4+
* Owns background workers that react to thread deletion domain events and
5+
* perform best-effort runtime cleanup for provider sessions and terminals.
6+
*
7+
* @module ThreadDeletionReactor
8+
*/
9+
import { Context } from "effect";
10+
import type { Effect, Scope } from "effect";
11+
12+
/**
13+
* ThreadDeletionReactorShape - Service API for thread deletion cleanup.
14+
*/
15+
export interface ThreadDeletionReactorShape {
16+
/**
17+
* Start reacting to thread.deleted orchestration domain events.
18+
*
19+
* The returned effect must be run in a scope so all worker fibers can be
20+
* finalized on shutdown.
21+
*/
22+
readonly start: () => Effect.Effect<void, never, Scope.Scope>;
23+
24+
/**
25+
* Resolves when the internal processing queue is empty and idle.
26+
* Intended for test use to replace timing-sensitive sleeps.
27+
*/
28+
readonly drain: Effect.Effect<void>;
29+
}
30+
31+
/**
32+
* ThreadDeletionReactor - Service tag for thread deletion cleanup workers.
33+
*/
34+
export class ThreadDeletionReactor extends Context.Service<
35+
ThreadDeletionReactor,
36+
ThreadDeletionReactorShape
37+
>()("t3/orchestration/Services/ThreadDeletionReactor") {}

0 commit comments

Comments
 (0)