Skip to content

Commit f48b553

Browse files
committed
Add provider startup probing tests
1 parent 1d9dabe commit f48b553

2 files changed

Lines changed: 240 additions & 43 deletions

File tree

apps/server/src/provider/Layers/ProviderRegistry.test.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import type { ProviderInstance } from "../ProviderDriver.ts";
4949
import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts";
5050
import { ProviderRegistry } from "../Services/ProviderRegistry.ts";
5151
import { makeManualOnlyProviderMaintenanceCapabilities } from "../providerMaintenance.ts";
52+
import { makeManagedServerProvider } from "../makeManagedServerProvider.ts";
5253
const decodeServerSettings = Schema.decodeSync(ServerSettings);
5354
const encodeServerSettings = Schema.encodeSync(ServerSettings);
5455
const encodedDefaultServerSettings = encodeServerSettings(DEFAULT_SERVER_SETTINGS);
@@ -845,6 +846,108 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
845846
}),
846847
);
847848

849+
it.effect("is the single startup refresh owner for managed provider instances", () =>
850+
Effect.scoped(
851+
Effect.gen(function* () {
852+
const codexDriver = ProviderDriverKind.make("codex");
853+
const codexInstanceId = ProviderInstanceId.make("codex");
854+
const initialProvider = {
855+
instanceId: codexInstanceId,
856+
driver: codexDriver,
857+
status: "warning",
858+
enabled: true,
859+
installed: false,
860+
auth: { status: "unknown" },
861+
checkedAt: "2026-04-29T10:00:00.000Z",
862+
version: null,
863+
message: "Checking provider availability...",
864+
models: [],
865+
slashCommands: [],
866+
skills: [],
867+
} as const satisfies ServerProvider;
868+
const { message: _message, ...initialProviderWithoutMessage } = initialProvider;
869+
const refreshedProvider = {
870+
...initialProviderWithoutMessage,
871+
status: "ready",
872+
installed: true,
873+
checkedAt: "2026-04-29T10:00:01.000Z",
874+
} as const satisfies ServerProvider;
875+
const checkCalls = yield* Ref.make(0);
876+
const managedProvider = yield* makeManagedServerProvider<{ readonly enabled: boolean }>(
877+
{
878+
maintenanceCapabilities: makeManualOnlyProviderMaintenanceCapabilities({
879+
provider: codexDriver,
880+
packageName: null,
881+
}),
882+
getSettings: Effect.succeed({ enabled: true }),
883+
streamSettings: Stream.empty,
884+
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
885+
initialSnapshot: () => Effect.succeed(initialProvider),
886+
checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe(
887+
Effect.as(refreshedProvider),
888+
),
889+
refreshInterval: "1 hour",
890+
},
891+
);
892+
893+
yield* Effect.yieldNow;
894+
assert.strictEqual(
895+
yield* Ref.get(checkCalls),
896+
0,
897+
"managed providers must not self-start their own boot probe",
898+
);
899+
900+
const instance = {
901+
instanceId: codexInstanceId,
902+
driverKind: codexDriver,
903+
continuationIdentity: {
904+
driverKind: codexDriver,
905+
continuationKey: "codex:instance:codex",
906+
},
907+
displayName: undefined,
908+
enabled: true,
909+
snapshot: managedProvider,
910+
adapter: {} as ProviderInstance["adapter"],
911+
textGeneration: {} as ProviderInstance["textGeneration"],
912+
} satisfies ProviderInstance;
913+
const instanceRegistryLayer = Layer.succeed(ProviderInstanceRegistry, {
914+
getInstance: (instanceId) =>
915+
Effect.succeed(instanceId === codexInstanceId ? instance : undefined),
916+
listInstances: Effect.succeed([instance]),
917+
listUnavailable: Effect.succeed([]),
918+
streamChanges: Stream.empty,
919+
subscribeChanges: Effect.flatMap(PubSub.unbounded<void>(), (pubsub) =>
920+
PubSub.subscribe(pubsub),
921+
),
922+
});
923+
const scope = yield* Scope.make();
924+
yield* Effect.addFinalizer(() => Scope.close(scope, Exit.void));
925+
const runtimeServices = yield* Layer.build(
926+
ProviderRegistryLive.pipe(
927+
Layer.provideMerge(instanceRegistryLayer),
928+
Layer.provideMerge(
929+
ServerConfig.layerTest(process.cwd(), {
930+
prefix: "t3-provider-registry-single-startup-refresh-",
931+
}),
932+
),
933+
Layer.provideMerge(NodeServices.layer),
934+
),
935+
).pipe(Scope.provide(scope));
936+
937+
yield* Effect.gen(function* () {
938+
const registry = yield* ProviderRegistry;
939+
940+
assert.deepStrictEqual(yield* registry.getProviders, [refreshedProvider]);
941+
assert.strictEqual(
942+
yield* Ref.get(checkCalls),
943+
1,
944+
"ProviderRegistryLive should perform exactly one startup refresh",
945+
);
946+
}).pipe(Effect.provide(runtimeServices));
947+
}),
948+
),
949+
);
950+
848951
it.effect("keeps consuming registry changes after one sync fails", () =>
849952
Effect.gen(function* () {
850953
const codexDriver = ProviderDriverKind.make("codex");
@@ -1131,6 +1234,11 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T
11311234
const initialCheckedAt = initialCodex?.checkedAt;
11321235
assert.notStrictEqual(initialCheckedAt, undefined);
11331236

1237+
// Advance the virtual clock before driving the settings change so
1238+
// the fresh probe's `checkedAt` can distinguish it from the
1239+
// boot-time probe.
1240+
yield* TestClock.adjust("50 millis");
1241+
11341242
// Drive a settings change. The Hydration layer's
11351243
// `SettingsWatcherLive` consumes this via `streamChanges`,
11361244
// calls `reconcile`, which rebuilds the codex instance (the

apps/server/src/provider/makeManagedServerProvider.test.ts

Lines changed: 132 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import * as Fiber from "effect/Fiber";
77
import * as PubSub from "effect/PubSub";
88
import * as Ref from "effect/Ref";
99
import * as Stream from "effect/Stream";
10+
import * as TestClock from "effect/testing/TestClock";
1011

1112
import { makeManagedServerProvider } from "./makeManagedServerProvider.ts";
1213

@@ -101,45 +102,71 @@ const enrichedSnapshotSecond: ServerProvider = {
101102
};
102103

103104
describe("makeManagedServerProvider", () => {
104-
it.effect(
105-
"runs the initial provider check in the background and streams the refreshed snapshot",
106-
() =>
107-
Effect.scoped(
108-
Effect.gen(function* () {
109-
const checkCalls = yield* Ref.make(0);
110-
const releaseCheck = yield* Deferred.make<void>();
111-
const provider = yield* makeManagedServerProvider<TestSettings>({
112-
maintenanceCapabilities,
113-
getSettings: Effect.succeed({ enabled: true }),
114-
streamSettings: Stream.empty,
115-
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
116-
initialSnapshot: () => Effect.succeed(initialSnapshot),
117-
checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe(
118-
Effect.flatMap(() => Deferred.await(releaseCheck)),
119-
Effect.as(refreshedSnapshot),
120-
),
121-
refreshInterval: "1 hour",
122-
});
105+
it.effect("does not probe during construction or unchanged snapshot reads", () =>
106+
Effect.scoped(
107+
Effect.gen(function* () {
108+
const checkCalls = yield* Ref.make(0);
109+
const provider = yield* makeManagedServerProvider<TestSettings>({
110+
maintenanceCapabilities,
111+
getSettings: Effect.succeed({ enabled: true }),
112+
streamSettings: Stream.empty,
113+
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
114+
initialSnapshot: () => Effect.succeed(initialSnapshot),
115+
checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe(
116+
Effect.as(refreshedSnapshot),
117+
),
118+
refreshInterval: "1 hour",
119+
});
123120

124-
const initial = yield* provider.getSnapshot;
125-
assert.deepStrictEqual(initial, initialSnapshot);
121+
yield* Effect.yieldNow;
126122

127-
const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe(
128-
Stream.runCollect,
129-
Effect.forkChild,
130-
);
131-
yield* Effect.yieldNow;
123+
assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot);
124+
assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot);
125+
assert.strictEqual(yield* Ref.get(checkCalls), 0);
126+
}),
127+
),
128+
);
132129

133-
yield* Deferred.succeed(releaseCheck, undefined);
130+
it.effect("streams an explicit provider refresh", () =>
131+
Effect.scoped(
132+
Effect.gen(function* () {
133+
const checkCalls = yield* Ref.make(0);
134+
const releaseCheck = yield* Deferred.make<void>();
135+
const provider = yield* makeManagedServerProvider<TestSettings>({
136+
maintenanceCapabilities,
137+
getSettings: Effect.succeed({ enabled: true }),
138+
streamSettings: Stream.empty,
139+
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
140+
initialSnapshot: () => Effect.succeed(initialSnapshot),
141+
checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe(
142+
Effect.flatMap(() => Deferred.await(releaseCheck)),
143+
Effect.as(refreshedSnapshot),
144+
),
145+
refreshInterval: "1 hour",
146+
});
147+
148+
const initial = yield* provider.getSnapshot;
149+
assert.deepStrictEqual(initial, initialSnapshot);
150+
151+
const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe(
152+
Stream.runCollect,
153+
Effect.forkChild,
154+
);
155+
const refreshFiber = yield* provider.refresh.pipe(Effect.forkChild);
156+
yield* Effect.yieldNow;
157+
158+
yield* Deferred.succeed(releaseCheck, undefined);
134159

135-
const updates = Array.from(yield* Fiber.join(updatesFiber));
136-
const latest = yield* provider.getSnapshot;
160+
const refreshed = yield* Fiber.join(refreshFiber);
161+
const updates = Array.from(yield* Fiber.join(updatesFiber));
162+
const latest = yield* provider.getSnapshot;
137163

138-
assert.deepStrictEqual(updates, [refreshedSnapshot]);
139-
assert.deepStrictEqual(latest, refreshedSnapshot);
140-
assert.strictEqual(yield* Ref.get(checkCalls), 1);
141-
}),
142-
),
164+
assert.deepStrictEqual(refreshed, refreshedSnapshot);
165+
assert.deepStrictEqual(updates, [refreshedSnapshot]);
166+
assert.deepStrictEqual(latest, refreshedSnapshot);
167+
assert.strictEqual(yield* Ref.get(checkCalls), 1);
168+
}),
169+
),
143170
);
144171

145172
it.effect("reruns the provider check when streamed settings change", () =>
@@ -148,7 +175,6 @@ describe("makeManagedServerProvider", () => {
148175
const settingsRef = yield* Ref.make<TestSettings>({ enabled: true });
149176
const settingsChanges = yield* PubSub.unbounded<TestSettings>();
150177
const checkCalls = yield* Ref.make(0);
151-
const releaseInitialCheck = yield* Deferred.make<void>();
152178
const releaseSettingsCheck = yield* Deferred.make<void>();
153179
const provider = yield* makeManagedServerProvider<TestSettings>({
154180
maintenanceCapabilities,
@@ -157,36 +183,95 @@ describe("makeManagedServerProvider", () => {
157183
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
158184
initialSnapshot: () => Effect.succeed(initialSnapshot),
159185
checkProvider: Ref.updateAndGet(checkCalls, (count) => count + 1).pipe(
160-
Effect.flatMap((count) =>
161-
count === 1
162-
? Deferred.await(releaseInitialCheck).pipe(Effect.as(refreshedSnapshot))
163-
: Deferred.await(releaseSettingsCheck).pipe(Effect.as(refreshedSnapshotSecond)),
186+
Effect.flatMap(() =>
187+
Deferred.await(releaseSettingsCheck).pipe(Effect.as(refreshedSnapshotSecond)),
164188
),
165189
),
166190
refreshInterval: "1 hour",
167191
});
168192

169-
const updatesFiber = yield* Stream.take(provider.streamChanges, 2).pipe(
193+
const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe(
170194
Stream.runCollect,
171195
Effect.forkChild,
172196
);
173197
yield* Effect.yieldNow;
174198

175-
yield* Deferred.succeed(releaseInitialCheck, undefined);
176199
yield* Ref.set(settingsRef, { enabled: false });
177200
yield* PubSub.publish(settingsChanges, { enabled: false });
178201
yield* Deferred.succeed(releaseSettingsCheck, undefined);
179202

180203
const updates = Array.from(yield* Fiber.join(updatesFiber));
181204
const latest = yield* provider.getSnapshot;
182205

183-
assert.deepStrictEqual(updates, [refreshedSnapshot, refreshedSnapshotSecond]);
206+
assert.deepStrictEqual(updates, [refreshedSnapshotSecond]);
184207
assert.deepStrictEqual(latest, refreshedSnapshotSecond);
185-
assert.strictEqual(yield* Ref.get(checkCalls), 2);
208+
assert.strictEqual(yield* Ref.get(checkCalls), 1);
186209
}),
187210
),
188211
);
189212

213+
it.effect("ignores streamed settings updates that do not change provider settings", () =>
214+
Effect.scoped(
215+
Effect.gen(function* () {
216+
const settingsRef = yield* Ref.make<TestSettings>({ enabled: true });
217+
const settingsChanges = yield* PubSub.unbounded<TestSettings>();
218+
const checkCalls = yield* Ref.make(0);
219+
const provider = yield* makeManagedServerProvider<TestSettings>({
220+
maintenanceCapabilities,
221+
getSettings: Ref.get(settingsRef),
222+
streamSettings: Stream.fromPubSub(settingsChanges),
223+
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
224+
initialSnapshot: () => Effect.succeed(initialSnapshot),
225+
checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe(
226+
Effect.as(refreshedSnapshot),
227+
),
228+
refreshInterval: "1 hour",
229+
});
230+
231+
yield* PubSub.publish(settingsChanges, { enabled: true });
232+
yield* Effect.yieldNow;
233+
234+
assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot);
235+
assert.strictEqual(yield* Ref.get(checkCalls), 0);
236+
}),
237+
),
238+
);
239+
240+
it.effect("still refreshes on the configured periodic interval", () =>
241+
Effect.scoped(
242+
Effect.gen(function* () {
243+
const checkCalls = yield* Ref.make(0);
244+
const provider = yield* makeManagedServerProvider<TestSettings>({
245+
maintenanceCapabilities,
246+
getSettings: Effect.succeed({ enabled: true }),
247+
streamSettings: Stream.empty,
248+
haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled,
249+
initialSnapshot: () => Effect.succeed(initialSnapshot),
250+
checkProvider: Ref.updateAndGet(checkCalls, (count) => count + 1).pipe(
251+
Effect.map((count) => ({
252+
...refreshedSnapshot,
253+
checkedAt: `2026-04-10T00:00:0${count}.000Z`,
254+
})),
255+
),
256+
refreshInterval: "1 minute",
257+
});
258+
259+
const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe(
260+
Stream.runCollect,
261+
Effect.forkChild,
262+
);
263+
264+
yield* TestClock.adjust("1 minute");
265+
yield* Effect.yieldNow;
266+
267+
const updates = Array.from(yield* Fiber.join(updatesFiber));
268+
assert.deepStrictEqual(updates, [refreshedSnapshot]);
269+
assert.deepStrictEqual(yield* provider.getSnapshot, refreshedSnapshot);
270+
assert.strictEqual(yield* Ref.get(checkCalls), 1);
271+
}),
272+
).pipe(Effect.provide(TestClock.layer())),
273+
);
274+
190275
it.effect("streams supplemental snapshot updates after the base provider check completes", () =>
191276
Effect.scoped(
192277
Effect.gen(function* () {
@@ -210,9 +295,11 @@ describe("makeManagedServerProvider", () => {
210295
Stream.runCollect,
211296
Effect.forkChild,
212297
);
298+
const refreshFiber = yield* provider.refresh.pipe(Effect.forkChild);
213299
yield* Effect.yieldNow;
214300

215301
yield* Deferred.succeed(releaseCheck, undefined);
302+
yield* Fiber.join(refreshFiber);
216303

217304
yield* Deferred.succeed(releaseEnrichment, undefined);
218305

@@ -262,10 +349,12 @@ describe("makeManagedServerProvider", () => {
262349
Stream.runCollect,
263350
Effect.forkChild,
264351
);
352+
const firstRefreshFiber = yield* provider.refresh.pipe(Effect.forkChild);
265353
yield* Effect.yieldNow;
266354

267355
yield* Deferred.succeed(allowFirstRefresh, undefined);
268356
yield* Deferred.await(firstCallbackReady);
357+
yield* Fiber.join(firstRefreshFiber);
269358

270359
yield* provider.refresh;
271360
yield* Deferred.await(secondCallbackReady);

0 commit comments

Comments
 (0)