Skip to content

Commit 01596f5

Browse files
Refactor client runtime state refresh handling
- Move archived threads state into shared client-runtime - Add stale-aware refresh/invalidation for path search, discovery, and VCS refs - Wire provider invalidations through the web runtime
1 parent 9ef8d75 commit 01596f5

15 files changed

Lines changed: 536 additions & 129 deletions

apps/mobile/src/state/use-composer-path-search.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import {
1717
subscribeEnvironmentConnections,
1818
} from "./environment-session-registry";
1919

20+
const COMPOSER_PATH_SEARCH_STALE_TIME_MS = 15_000;
21+
2022
const composerPathSearchManager = createComposerPathSearchManager({
2123
getRegistry: () => appAtomRegistry,
2224
getClient: (environmentId) => getEnvironmentClient(environmentId)?.projects ?? null,
2325
subscribeClientChanges: subscribeEnvironmentConnections,
26+
staleTimeMs: COMPOSER_PATH_SEARCH_STALE_TIME_MS,
2427
});
2528

2629
export function useComposerPathSearch(target: ComposerPathSearchTarget): ComposerPathSearchState {

apps/web/src/environments/runtime/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ export {
2727
resetEnvironmentServiceForTests,
2828
startEnvironmentConnectionService,
2929
subscribeEnvironmentConnections,
30+
subscribeProviderInvalidations,
3031
} from "./service";

apps/web/src/environments/runtime/service.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ const pendingSavedEnvironmentConnections = new Map<
119119
PendingSavedEnvironmentConnection
120120
>();
121121
const environmentConnectionListeners = new Set<() => void>();
122+
const providerInvalidationListeners = new Set<() => void>();
122123
const threadDetailSubscriptions = new Map<string, ThreadDetailSubscriptionEntry>();
123124
const terminalMetadataSubscriptions = new Map<EnvironmentId, () => void>();
124125
const lastAppliedProjectionVersionByEnvironment = new Map<
@@ -604,6 +605,12 @@ function emitEnvironmentConnectionRegistryChange() {
604605
}
605606
}
606607

608+
function emitProviderInvalidation() {
609+
for (const listener of providerInvalidationListeners) {
610+
listener();
611+
}
612+
}
613+
607614
function getRuntimeErrorFields(error: unknown) {
608615
return {
609616
lastError: error instanceof Error ? error.message : String(error),
@@ -1543,6 +1550,13 @@ export function subscribeEnvironmentConnections(listener: () => void): () => voi
15431550
};
15441551
}
15451552

1553+
export function subscribeProviderInvalidations(listener: () => void): () => void {
1554+
providerInvalidationListeners.add(listener);
1555+
return () => {
1556+
providerInvalidationListeners.delete(listener);
1557+
};
1558+
}
1559+
15461560
export function listEnvironmentConnections(): ReadonlyArray<EnvironmentConnection> {
15471561
return [...environmentConnections.values()];
15481562
}
@@ -1769,6 +1783,7 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): ()
17691783
}
17701784
needsProviderInvalidation = false;
17711785
void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all });
1786+
emitProviderInvalidation();
17721787
},
17731788
{
17741789
wait: 100,
Lines changed: 16 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,23 @@
11
import { useAtomValue } from "@effect/atom-react";
2-
import { EnvironmentId, type OrchestrationShellSnapshot } from "@t3tools/contracts";
3-
import { AsyncResult, Atom } from "effect/unstable/reactivity";
2+
import {
3+
type ArchivedSnapshotEntry,
4+
createArchivedThreadsManager,
5+
makeArchivedThreadsEnvironmentKey,
6+
readArchivedThreadsSnapshotState,
7+
} from "@t3tools/client-runtime";
8+
import type { EnvironmentId } from "@t3tools/contracts";
49
import { useCallback, useMemo } from "react";
5-
import * as Cause from "effect/Cause";
6-
import * as Effect from "effect/Effect";
7-
import * as Option from "effect/Option";
10+
811
import { readEnvironmentApi } from "../environmentApi";
912
import { appAtomRegistry } from "../rpc/atomRegistry";
1013

11-
const ARCHIVED_THREADS_STALE_TIME_MS = 5_000;
12-
const ARCHIVED_THREADS_IDLE_TTL_MS = 5 * 60_000;
13-
const ARCHIVED_THREADS_ENVIRONMENT_KEY_SEPARATOR = "\u001f";
14-
15-
export type ArchivedSnapshotEntry = {
16-
readonly environmentId: EnvironmentId;
17-
readonly snapshot: OrchestrationShellSnapshot;
18-
};
19-
20-
const knownArchivedThreadEnvironmentKeys = new Set<string>();
21-
22-
function makeArchivedThreadsEnvironmentKey(environmentIds: ReadonlyArray<EnvironmentId>): string {
23-
return environmentIds.toSorted().join(ARCHIVED_THREADS_ENVIRONMENT_KEY_SEPARATOR);
24-
}
25-
26-
function parseArchivedThreadsEnvironmentKey(key: string): ReadonlyArray<EnvironmentId> {
27-
if (key.length === 0) {
28-
return [];
29-
}
30-
return key
31-
.split(ARCHIVED_THREADS_ENVIRONMENT_KEY_SEPARATOR)
32-
.map((environmentId) => EnvironmentId.make(environmentId));
33-
}
34-
35-
const archivedThreadSnapshotsAtom = Atom.family((environmentKey: string) => {
36-
knownArchivedThreadEnvironmentKeys.add(environmentKey);
37-
return Atom.make(
38-
Effect.promise(async (): Promise<ReadonlyArray<ArchivedSnapshotEntry>> => {
39-
const environmentIds = parseArchivedThreadsEnvironmentKey(environmentKey);
40-
const snapshots = await Promise.all(
41-
environmentIds.map(async (environmentId) => {
42-
const api = readEnvironmentApi(environmentId);
43-
if (!api) {
44-
return null;
45-
}
46-
return {
47-
environmentId,
48-
snapshot: await api.orchestration.getArchivedShellSnapshot(),
49-
};
50-
}),
51-
);
52-
return snapshots.filter((snapshot) => snapshot !== null);
53-
}),
54-
).pipe(
55-
Atom.swr({
56-
staleTime: ARCHIVED_THREADS_STALE_TIME_MS,
57-
revalidateOnMount: true,
58-
}),
59-
Atom.setIdleTTL(ARCHIVED_THREADS_IDLE_TTL_MS),
60-
Atom.withLabel(`archived-thread-snapshots:${environmentKey}`),
61-
);
14+
const archivedThreadsManager = createArchivedThreadsManager({
15+
getRegistry: () => appAtomRegistry,
16+
getClient: (environmentId) => readEnvironmentApi(environmentId)?.orchestration ?? null,
6217
});
6318

64-
function readArchivedThreadsError(
65-
result: AsyncResult.AsyncResult<ReadonlyArray<ArchivedSnapshotEntry>, unknown>,
66-
): string | null {
67-
if (result._tag !== "Failure") {
68-
return null;
69-
}
70-
71-
const error = Cause.squash(result.cause);
72-
return error instanceof Error ? error.message : "Failed to load archived threads.";
73-
}
74-
7519
export function refreshArchivedThreadsForEnvironment(environmentId: EnvironmentId): void {
76-
for (const key of knownArchivedThreadEnvironmentKeys) {
77-
if (parseArchivedThreadsEnvironmentKey(key).includes(environmentId)) {
78-
appAtomRegistry.refresh(archivedThreadSnapshotsAtom(key));
79-
}
80-
}
20+
archivedThreadsManager.refreshForEnvironment(environmentId);
8121
}
8222

8323
export function useArchivedThreadSnapshots(environmentIds: ReadonlyArray<EnvironmentId>): {
@@ -90,17 +30,14 @@ export function useArchivedThreadSnapshots(environmentIds: ReadonlyArray<Environ
9030
() => makeArchivedThreadsEnvironmentKey(environmentIds),
9131
[environmentIds],
9232
);
93-
const atom = archivedThreadSnapshotsAtom(environmentKey);
33+
const atom = archivedThreadsManager.getAtom(environmentKey);
9434
const result = useAtomValue(atom);
95-
const snapshots = Option.getOrElse(AsyncResult.value(result), () => []);
9635
const refresh = useCallback(() => {
97-
appAtomRegistry.refresh(atom);
98-
}, [atom]);
36+
archivedThreadsManager.refresh(environmentIds);
37+
}, [environmentIds]);
9938

10039
return {
101-
snapshots,
102-
error: readArchivedThreadsError(result),
103-
isLoading: result.waiting,
40+
...readArchivedThreadsSnapshotState(result),
10441
refresh,
10542
};
10643
}

apps/web/src/lib/composerPathSearchState.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,29 @@ import { useEffect, useMemo } from "react";
1414
import {
1515
readEnvironmentConnection,
1616
subscribeEnvironmentConnections,
17+
subscribeProviderInvalidations,
1718
} from "../environments/runtime";
1819
import { appAtomRegistry } from "../rpc/atomRegistry";
1920

2021
const COMPOSER_PATH_SEARCH_LIMIT = 80;
2122
const COMPOSER_PATH_SEARCH_DEBOUNCE_MS = 120;
23+
const COMPOSER_PATH_SEARCH_STALE_TIME_MS = 15_000;
2224

2325
const composerPathSearchManager = createComposerPathSearchManager({
2426
getRegistry: () => appAtomRegistry,
2527
getClient: (environmentId) => readEnvironmentConnection(environmentId)?.client.projects ?? null,
2628
subscribeClientChanges: subscribeEnvironmentConnections,
2729
limit: COMPOSER_PATH_SEARCH_LIMIT,
2830
debounceMs: COMPOSER_PATH_SEARCH_DEBOUNCE_MS,
31+
staleTimeMs: COMPOSER_PATH_SEARCH_STALE_TIME_MS,
2932
});
3033

34+
export function invalidateComposerPathSearches(): void {
35+
composerPathSearchManager.invalidate();
36+
}
37+
38+
subscribeProviderInvalidations(invalidateComposerPathSearches);
39+
3140
export function useComposerPathSearch(target: ComposerPathSearchTarget): ComposerPathSearchState {
3241
const stableTarget = useMemo(
3342
() => ({

apps/web/src/lib/sourceControlDiscoveryState.ts

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import {
77
sourceControlDiscoveryStateAtom,
88
} from "@t3tools/client-runtime";
99
import { EnvironmentId, type SourceControlDiscoveryResult } from "@t3tools/contracts";
10-
import * as Effect from "effect/Effect";
11-
import { Atom } from "effect/unstable/reactivity";
10+
import { useEffect } from "react";
1211

1312
import { readPrimaryEnvironmentDescriptor } from "../environments/primary";
14-
import { readEnvironmentConnection } from "../environments/runtime";
13+
import {
14+
readEnvironmentConnection,
15+
subscribeEnvironmentConnections,
16+
} from "../environments/runtime";
1517
import { readLocalApi } from "../localApi";
1618
import { appAtomRegistry } from "../rpc/atomRegistry";
1719

@@ -59,21 +61,11 @@ const sourceControlDiscoveryManager = createSourceControlDiscoveryManager({
5961
}
6062
return null;
6163
},
64+
subscribeClientChanges: subscribeEnvironmentConnections,
65+
staleTimeMs: SOURCE_CONTROL_DISCOVERY_STALE_TIME_MS,
66+
idleTtlMs: SOURCE_CONTROL_DISCOVERY_IDLE_TTL_MS,
6267
});
6368

64-
const sourceControlDiscoveryAutoRefreshAtom = Atom.family((targetKey: string) =>
65-
Atom.make(() =>
66-
Effect.promise(() => sourceControlDiscoveryManager.refresh({ key: targetKey })),
67-
).pipe(
68-
Atom.swr({
69-
staleTime: SOURCE_CONTROL_DISCOVERY_STALE_TIME_MS,
70-
revalidateOnMount: true,
71-
}),
72-
Atom.setIdleTTL(SOURCE_CONTROL_DISCOVERY_IDLE_TTL_MS),
73-
Atom.withLabel(`source-control-discovery:auto-refresh:${targetKey}`),
74-
),
75-
);
76-
7769
export function refreshSourceControlDiscovery(
7870
input?: SourceControlDiscoveryTargetInput,
7971
): Promise<SourceControlDiscoveryResult | null> {
@@ -97,7 +89,7 @@ export function useSourceControlDiscovery(
9789
getSourceControlDiscoveryTargetKey(sourceControlDiscoveryTarget(input)) ??
9890
SOURCE_CONTROL_DISCOVERY_TARGET.key;
9991

100-
useAtomValue(sourceControlDiscoveryAutoRefreshAtom(targetKey));
92+
useEffect(() => sourceControlDiscoveryManager.watch({ key: targetKey }), [targetKey]);
10193

10294
return useAtomValue(sourceControlDiscoveryStateAtom(targetKey));
10395
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { EnvironmentId, type OrchestrationShellSnapshot } from "@t3tools/contracts";
2+
import { AtomRegistry } from "effect/unstable/reactivity";
3+
import { afterEach, describe, expect, it, vi } from "vitest";
4+
5+
import {
6+
type ArchivedThreadsClient,
7+
createArchivedThreadsManager,
8+
makeArchivedThreadsEnvironmentKey,
9+
parseArchivedThreadsEnvironmentKey,
10+
readArchivedThreadsSnapshotState,
11+
} from "./archivedThreadsState.ts";
12+
13+
let registry = AtomRegistry.make();
14+
15+
function resetAtomRegistry() {
16+
registry.dispose();
17+
registry = AtomRegistry.make();
18+
}
19+
20+
function createSnapshot(id: string): OrchestrationShellSnapshot {
21+
return {
22+
snapshotSequence: 1,
23+
projects: [],
24+
threads: [],
25+
updatedAt: `2026-05-08T00:00:00.000Z`,
26+
id,
27+
} as OrchestrationShellSnapshot;
28+
}
29+
30+
describe("createArchivedThreadsManager", () => {
31+
afterEach(() => {
32+
resetAtomRegistry();
33+
});
34+
35+
it("loads archived snapshots for configured environment clients", async () => {
36+
const envA = EnvironmentId.make("env-a");
37+
const envB = EnvironmentId.make("env-b");
38+
const clients = new Map<EnvironmentId, ArchivedThreadsClient>([
39+
[
40+
envA,
41+
{
42+
getArchivedShellSnapshot: vi.fn(async () => createSnapshot("a")),
43+
},
44+
],
45+
[
46+
envB,
47+
{
48+
getArchivedShellSnapshot: vi.fn(async () => createSnapshot("b")),
49+
},
50+
],
51+
]);
52+
const manager = createArchivedThreadsManager({
53+
getRegistry: () => registry,
54+
getClient: (environmentId) => clients.get(environmentId) ?? null,
55+
});
56+
57+
const result = registry.get(manager.getAtom(makeArchivedThreadsEnvironmentKey([envB, envA])));
58+
59+
await vi.waitFor(() => {
60+
const state = readArchivedThreadsSnapshotState(
61+
registry.get(manager.getAtom(makeArchivedThreadsEnvironmentKey([envA, envB]))),
62+
);
63+
expect(state.snapshots.map((snapshot) => snapshot.environmentId)).toEqual([envA, envB]);
64+
});
65+
expect(readArchivedThreadsSnapshotState(result).isLoading).toBe(true);
66+
});
67+
68+
it("refreshes known snapshot groups that include an environment", async () => {
69+
const envA = EnvironmentId.make("env-a");
70+
const envB = EnvironmentId.make("env-b");
71+
const getArchivedShellSnapshot = vi.fn(async () =>
72+
createSnapshot(`a-${getArchivedShellSnapshot.mock.calls.length}`),
73+
);
74+
const manager = createArchivedThreadsManager({
75+
getRegistry: () => registry,
76+
getClient: (environmentId) => (environmentId === envA ? { getArchivedShellSnapshot } : null),
77+
staleTimeMs: 60_000,
78+
});
79+
80+
const atom = manager.getAtom(makeArchivedThreadsEnvironmentKey([envA, envB]));
81+
registry.get(atom);
82+
await vi.waitFor(() => expect(getArchivedShellSnapshot).toHaveBeenCalledTimes(1));
83+
84+
manager.refreshForEnvironment(envA);
85+
86+
await vi.waitFor(() => expect(getArchivedShellSnapshot).toHaveBeenCalledTimes(2));
87+
});
88+
89+
it("round-trips environment keys in sorted order", () => {
90+
const envA = EnvironmentId.make("env-a");
91+
const envB = EnvironmentId.make("env-b");
92+
const key = makeArchivedThreadsEnvironmentKey([envB, envA]);
93+
94+
expect(parseArchivedThreadsEnvironmentKey(key)).toEqual([envA, envB]);
95+
});
96+
});

0 commit comments

Comments
 (0)