Skip to content

Commit a60daa1

Browse files
Wrap orchestration reactor starts in Effect.fn (#1513)
1 parent bc1024c commit a60daa1

15 files changed

+448
-450
lines changed

apps/server/integration/OrchestrationEngineHarness.integration.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ export const makeOrchestrationIntegrationHarness = (
360360

361361
const scope = yield* Scope.make("sequential");
362362
yield* tryRuntimePromise("start OrchestrationReactor", () =>
363-
runtime.runPromise(reactor.start.pipe(Scope.provide(scope))),
363+
runtime.runPromise(reactor.start().pipe(Scope.provide(scope))),
364364
).pipe(Effect.orDie);
365365
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
366366
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ describe("CheckpointReactor", () => {
271271
const reactor = await runtime.runPromise(Effect.service(CheckpointReactor));
272272
const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore));
273273
scope = await Effect.runPromise(Scope.make("sequential"));
274-
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
274+
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
275275
const drain = () => Effect.runPromise(reactor.drain);
276276

277277
const createdAt = new Date().toISOString();

apps/server/src/orchestration/Layers/CheckpointReactor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ const make = Effect.gen(function* () {
766766

767767
const worker = yield* makeDrainableWorker(processInputSafely);
768768

769-
const start: CheckpointReactorShape["start"] = Effect.gen(function* () {
769+
const start: CheckpointReactorShape["start"] = Effect.fn("start")(function* () {
770770
yield* Effect.forkScoped(
771771
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
772772
if (

apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,28 @@ describe("OrchestrationReactor", () => {
2424
Layer.effect(OrchestrationReactor, makeOrchestrationReactor).pipe(
2525
Layer.provideMerge(
2626
Layer.succeed(ProviderRuntimeIngestionService, {
27-
start: Effect.sync(() => {
27+
start: () => {
2828
started.push("provider-runtime-ingestion");
29-
}),
29+
return Effect.void;
30+
},
3031
drain: Effect.void,
3132
}),
3233
),
3334
Layer.provideMerge(
3435
Layer.succeed(ProviderCommandReactor, {
35-
start: Effect.sync(() => {
36+
start: () => {
3637
started.push("provider-command-reactor");
37-
}),
38+
return Effect.void;
39+
},
3840
drain: Effect.void,
3941
}),
4042
),
4143
Layer.provideMerge(
4244
Layer.succeed(CheckpointReactor, {
43-
start: Effect.sync(() => {
45+
start: () => {
4446
started.push("checkpoint-reactor");
45-
}),
47+
return Effect.void;
48+
},
4649
drain: Effect.void,
4750
}),
4851
),
@@ -51,7 +54,7 @@ describe("OrchestrationReactor", () => {
5154

5255
const reactor = await runtime.runPromise(Effect.service(OrchestrationReactor));
5356
const scope = await Effect.runPromise(Scope.make("sequential"));
54-
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
57+
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
5558

5659
expect(started).toEqual([
5760
"provider-runtime-ingestion",

apps/server/src/orchestration/Layers/OrchestrationReactor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ export const makeOrchestrationReactor = Effect.gen(function* () {
1313
const providerCommandReactor = yield* ProviderCommandReactor;
1414
const checkpointReactor = yield* CheckpointReactor;
1515

16-
const start: OrchestrationReactorShape["start"] = Effect.gen(function* () {
17-
yield* providerRuntimeIngestion.start;
18-
yield* providerCommandReactor.start;
19-
yield* checkpointReactor.start;
16+
const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () {
17+
yield* providerRuntimeIngestion.start();
18+
yield* providerCommandReactor.start();
19+
yield* checkpointReactor.start();
2020
});
2121

2222
return {

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ describe("ProviderCommandReactor", () => {
235235
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
236236
const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor));
237237
scope = await Effect.runPromise(Scope.make("sequential"));
238-
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
238+
await Effect.runPromise(reactor.start().pipe(Scope.provide(scope)));
239239
const drain = () => Effect.runPromise(reactor.drain);
240240

241241
await Effect.runPromise(

apps/server/src/orchestration/Layers/ProviderCommandReactor.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -773,22 +773,24 @@ const make = Effect.gen(function* () {
773773

774774
const worker = yield* makeDrainableWorker(processDomainEventSafely);
775775

776-
const start: ProviderCommandReactorShape["start"] = Effect.forkScoped(
777-
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
776+
const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () {
777+
const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) {
778778
if (
779-
event.type !== "thread.runtime-mode-set" &&
780-
event.type !== "thread.turn-start-requested" &&
781-
event.type !== "thread.turn-interrupt-requested" &&
782-
event.type !== "thread.approval-response-requested" &&
783-
event.type !== "thread.user-input-response-requested" &&
784-
event.type !== "thread.session-stop-requested"
779+
event.type === "thread.runtime-mode-set" ||
780+
event.type === "thread.turn-start-requested" ||
781+
event.type === "thread.turn-interrupt-requested" ||
782+
event.type === "thread.approval-response-requested" ||
783+
event.type === "thread.user-input-response-requested" ||
784+
event.type === "thread.session-stop-requested"
785785
) {
786-
return Effect.void;
786+
return yield* worker.enqueue(event);
787787
}
788+
});
788789

789-
return worker.enqueue(event);
790-
}),
791-
).pipe(Effect.asVoid);
790+
yield* Effect.forkScoped(
791+
Stream.runForEach(orchestrationEngine.streamDomainEvents, processEvent),
792+
);
793+
});
792794

793795
return {
794796
start,

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ describe("ProviderRuntimeIngestion", () => {
214214
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
215215
const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService));
216216
scope = await Effect.runPromise(Scope.make("sequential"));
217-
await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope)));
217+
await Effect.runPromise(ingestion.start().pipe(Scope.provide(scope)));
218218
const drain = () => Effect.runPromise(ingestion.drain);
219219

220220
const createdAt = new Date().toISOString();

0 commit comments

Comments
 (0)