diff --git a/.changeset/clean-inbound-sync.md b/.changeset/clean-inbound-sync.md new file mode 100644 index 00000000..c009c723 --- /dev/null +++ b/.changeset/clean-inbound-sync.md @@ -0,0 +1,5 @@ +--- +'@treecrdt/sync': minor +--- + +Add an inbound sync helper for one-shot reconciliation and live filter subscriptions. diff --git a/examples/playground/src/playground/hooks/usePlaygroundLiveSubscriptions.ts b/examples/playground/src/playground/hooks/usePlaygroundLiveSubscriptions.ts index 729d6617..7eb38e82 100644 --- a/examples/playground/src/playground/hooks/usePlaygroundLiveSubscriptions.ts +++ b/examples/playground/src/playground/hooks/usePlaygroundLiveSubscriptions.ts @@ -7,7 +7,8 @@ import { type SetStateAction, } from 'react'; import type { Operation } from '@treecrdt/interface'; -import type { SyncPeer, SyncSubscription } from '@treecrdt/sync-protocol'; +import { createInboundSync, type InboundSync, type InboundSyncOnceOptions } from '@treecrdt/sync'; +import type { Filter, SyncMessage, SyncPeer } from '@treecrdt/sync-protocol'; import type { DuplexTransport } from '@treecrdt/sync-protocol/transport'; import { hexToBytes16 } from '../../sync-v0'; @@ -18,156 +19,83 @@ export type PlaygroundSyncConnection = { detach: () => void; }; +function subscriptionFilterLabel(filter: Filter): string { + return 'all' in filter ? 'all' : 'children'; +} + +function childrenFilter(parentId: string): Filter { + return { children: { parent: hexToBytes16(parentId) } }; +} + export function usePlaygroundLiveSubscriptions(opts: { syncPeerRef: MutableRefObject | null>; - syncConnRef: MutableRefObject>; setSyncError: Dispatch>; authCanSyncAll: boolean; }) { - const { syncPeerRef, syncConnRef, setSyncError, authCanSyncAll } = opts; + const { syncPeerRef, setSyncError, authCanSyncAll } = opts; const [liveBusy, setLiveBusy] = useState(false); const [liveChildrenParents, setLiveChildrenParents] = useState>(() => new Set()); const [liveAllEnabled, setLiveAllEnabled] = useState(false); const liveChildrenParentsRef = useRef>(new Set()); - const liveChildSubsRef = useRef>>(new Map()); const liveAllEnabledRef = useRef(false); - const liveAllSubsRef = useRef>(new Map()); - const liveAllStartingRef = useRef>(new Set()); - const liveChildrenStartingRef = useRef>(new Set()); - const liveBusyCountRef = useRef(0); - - const beginLiveWork = () => { - liveBusyCountRef.current += 1; - setLiveBusy(true); - }; - - const endLiveWork = () => { - liveBusyCountRef.current = Math.max(0, liveBusyCountRef.current - 1); - setLiveBusy(liveBusyCountRef.current > 0); - }; - - const stopLiveAllForPeer = (peerId: string) => { - const existing = liveAllSubsRef.current.get(peerId); - if (!existing) return; - existing.stop(); - liveAllSubsRef.current.delete(peerId); - }; + const inboundSyncRef = useRef | null>(null); + const inboundSyncPeerRef = useRef | null>(null); - const stopAllLiveAll = () => { - for (const sub of liveAllSubsRef.current.values()) sub.stop(); - liveAllSubsRef.current.clear(); + const currentSubscriptionFilters = () => { + if (liveAllEnabledRef.current) return [{ all: {} } satisfies Filter]; + return Array.from(liveChildrenParentsRef.current).map(childrenFilter); }; - const startLiveAll = (peerId: string) => { - const conn = syncConnRef.current.get(peerId); + const ensureInboundSync = () => { const peer = syncPeerRef.current; - if (!conn || !peer) return; - - if (liveAllSubsRef.current.has(peerId)) return; - if (liveAllStartingRef.current.has(peerId)) return; - liveAllStartingRef.current.add(peerId); - beginLiveWork(); - - void (async () => { - let started = false; - const sub = peer.subscribe( - conn.transport, - { all: {} }, - { - immediate: true, - intervalMs: 0, - ...syncOnceOptionsForPeer(peerId, 1024), - }, - ); - liveAllSubsRef.current.set(peerId, sub); - void sub.done.catch((err) => { - if (!started) return; - console.error('Live sync(all) failed', err); - stopLiveAllForPeer(peerId); - setSyncError(formatSyncError(err)); - }); - - try { - await sub.ready; - started = true; - } catch (err) { - console.error('Live sync(all) initial catch-up failed', err); - stopLiveAllForPeer(peerId); - setSyncError(formatSyncError(err)); - } - })().finally(() => { - liveAllStartingRef.current.delete(peerId); - endLiveWork(); + if (!peer) return null; + if (inboundSyncRef.current && inboundSyncPeerRef.current === peer) { + return inboundSyncRef.current; + } + + inboundSyncRef.current?.close(); + + const inbound = createInboundSync({ + localPeer: peer, + syncOptions: (peerId) => syncOnceOptionsForPeer(peerId, 2048), + subscribeOptions: (peerId) => syncOnceOptionsForPeer(peerId, 1024), + onStatus: (status) => setLiveBusy(status.busy), + onError: ({ peerId, filter, error, phase }) => { + console.error( + `Inbound sync(${subscriptionFilterLabel(filter)}) ${phase} failed`, + peerId, + error, + ); + setSyncError(formatSyncError(error)); + }, }); + inboundSyncRef.current = inbound; + inboundSyncPeerRef.current = peer; + return inbound; }; - const stopLiveChildrenForPeer = (peerId: string) => { - const byParent = liveChildSubsRef.current.get(peerId); - if (!byParent) return; - for (const sub of byParent.values()) sub.stop(); - liveChildSubsRef.current.delete(peerId); + const applySubscriptions = () => { + ensureInboundSync()?.subscribe(currentSubscriptionFilters()); }; - const stopLiveChildren = (peerId: string, parentId: string) => { - const byParent = liveChildSubsRef.current.get(peerId); - if (!byParent) return; - const sub = byParent.get(parentId); - if (!sub) return; - sub.stop(); - byParent.delete(parentId); - if (byParent.size === 0) liveChildSubsRef.current.delete(peerId); + const addInboundPeer = (peerId: string, conn: PlaygroundSyncConnection) => { + const inbound = ensureInboundSync(); + if (!inbound) return; + inbound.addPeer(peerId, conn.transport as DuplexTransport>); + inbound.subscribe(currentSubscriptionFilters()); }; - const stopAllLiveChildren = () => { - for (const peerId of Array.from(liveChildSubsRef.current.keys())) - stopLiveChildrenForPeer(peerId); + const removeLivePeer = (peerId: string) => { + inboundSyncRef.current?.removePeer(peerId); }; - const startLiveChildren = (peerId: string, parentId: string) => { - const conn = syncConnRef.current.get(peerId); - const peer = syncPeerRef.current; - if (!conn || !peer) return; - - const existing = liveChildSubsRef.current.get(peerId); - if (existing?.has(parentId)) return; - const startKey = `${peerId}\u0000${parentId}`; - if (liveChildrenStartingRef.current.has(startKey)) return; - liveChildrenStartingRef.current.add(startKey); - beginLiveWork(); - - const byParent = existing ?? new Map(); - void (async () => { - let started = false; - const sub = peer.subscribe( - conn.transport, - { children: { parent: hexToBytes16(parentId) } }, - { - immediate: true, - intervalMs: 0, - ...syncOnceOptionsForPeer(peerId, 1024), - }, - ); - byParent.set(parentId, sub); - liveChildSubsRef.current.set(peerId, byParent); - void sub.done.catch((err) => { - if (!started) return; - console.error('Live sync failed', err); - stopLiveChildren(peerId, parentId); - setSyncError(formatSyncError(err)); - }); - - try { - await sub.ready; - started = true; - } catch (err) { - console.error('Live sync(children) initial catch-up failed', err); - stopLiveChildren(peerId, parentId); - setSyncError(formatSyncError(err)); - } - })().finally(() => { - liveChildrenStartingRef.current.delete(startKey); - endLiveWork(); - }); + const syncInboundOnce = async ( + filters: Filter | readonly Filter[], + opts?: InboundSyncOnceOptions, + ) => { + const inbound = ensureInboundSync(); + if (!inbound) throw new Error('Inbound sync is not ready yet.'); + await inbound.syncOnce(filters, opts); }; const toggleLiveChildren = (parentId: string) => { @@ -180,42 +108,21 @@ export function usePlaygroundLiveSubscriptions(opts: { }; const resetLiveWork = () => { - liveAllStartingRef.current.clear(); - liveChildrenStartingRef.current.clear(); - liveBusyCountRef.current = 0; + inboundSyncRef.current?.close(); + inboundSyncRef.current = null; + inboundSyncPeerRef.current = null; setLiveBusy(false); }; useEffect(() => { liveChildrenParentsRef.current = liveChildrenParents; - - const connections = syncConnRef.current; - for (const peerId of connections.keys()) { - for (const parentId of liveChildrenParents) startLiveChildren(peerId, parentId); - } - - for (const peerId of Array.from(liveChildSubsRef.current.keys())) { - if (!connections.has(peerId)) { - stopLiveChildrenForPeer(peerId); - continue; - } - const byParent = liveChildSubsRef.current.get(peerId); - if (!byParent) continue; - for (const parentId of Array.from(byParent.keys())) { - if (!liveChildrenParents.has(parentId)) stopLiveChildren(peerId, parentId); - } - } + applySubscriptions(); // eslint-disable-next-line react-hooks/exhaustive-deps }, [liveChildrenParents]); useEffect(() => { liveAllEnabledRef.current = liveAllEnabled; - const connections = syncConnRef.current; - if (liveAllEnabled) { - for (const peerId of connections.keys()) startLiveAll(peerId); - } else { - stopAllLiveAll(); - } + applySubscriptions(); // eslint-disable-next-line react-hooks/exhaustive-deps }, [liveAllEnabled]); @@ -230,16 +137,9 @@ export function usePlaygroundLiveSubscriptions(opts: { liveAllEnabled, setLiveAllEnabled, toggleLiveChildren, - liveChildrenParentsRef, - liveAllEnabledRef, - beginLiveWork, - endLiveWork, - startLiveAll, - stopLiveAllForPeer, - stopAllLiveAll, - startLiveChildren, - stopLiveChildrenForPeer, - stopAllLiveChildren, + addInboundPeer, + removeLivePeer, + syncInboundOnce, resetLiveWork, }; } diff --git a/examples/playground/src/playground/hooks/usePlaygroundSync.ts b/examples/playground/src/playground/hooks/usePlaygroundSync.ts index f4f44d1f..96459bfa 100644 --- a/examples/playground/src/playground/hooks/usePlaygroundSync.ts +++ b/examples/playground/src/playground/hooks/usePlaygroundSync.ts @@ -49,9 +49,7 @@ import { isTransientRemoteConnectError, normalizeSyncServerUrl, previewDiscoveryHost, - syncOnceOptionsForPeer, syncTimeoutMsForPeer, - withTimeout, } from '../syncHelpers'; const RECENT_SYNC_TARGET_MS = 5_000; @@ -65,38 +63,6 @@ function childrenFilter(parentId: string): Filter { return { children: { parent: hexToBytes16(parentId) } }; } -function syncFilterLabel(filter: Filter, action = 'sync'): string { - return 'all' in filter - ? action - : `${action}(children ${bytesToHex(filter.children.parent).slice(0, 8)}…)`; -} - -async function syncFiltersWithTransport( - peer: SyncPeer, - peerId: string, - transport: DuplexTransport, - filters: readonly Filter[], - opts: { - autoSync?: boolean; - multipleTargets?: boolean; - codewordsPerMessage?: number; - label?: string; - } = {}, -) { - const perPeerTimeoutMs = syncTimeoutMsForPeer(peerId, { - autoSync: opts.autoSync, - multipleTargets: opts.multipleTargets, - }); - const codewordsPerMessage = opts.codewordsPerMessage ?? 2048; - for (const filter of filters) { - await withTimeout( - peer.syncOnce(transport, filter, syncOnceOptionsForPeer(peerId, codewordsPerMessage)), - perPeerTimeoutMs, - `${syncFilterLabel(filter, opts.label)} with ${peerId.slice(0, 8)}… timed out`, - ); - } -} - type PlaygroundSyncApi = { peers: PeerInfo[]; remoteSyncStatus: RemoteSyncStatus; @@ -196,18 +162,12 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn liveAllEnabled, setLiveAllEnabled, toggleLiveChildren, - liveChildrenParentsRef, - liveAllEnabledRef, - startLiveAll, - stopLiveAllForPeer, - stopAllLiveAll, - startLiveChildren, - stopLiveChildrenForPeer, - stopAllLiveChildren, + addInboundPeer, + removeLivePeer, + syncInboundOnce, resetLiveWork, } = usePlaygroundLiveSubscriptions({ syncPeerRef, - syncConnRef, setSyncError, authCanSyncAll, }); @@ -220,6 +180,36 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn outboundSyncRef.current?.queueOps(ops); }; + const removePeerConnection = ( + peerId: string, + opts: { detach?: boolean; closeTransport?: boolean } = {}, + ) => { + const connections = syncConnRef.current; + const conn = connections.get(peerId); + if (conn) { + if (opts.detach) { + try { + conn.detach(); + } catch { + // ignore + } + } + if (opts.closeTransport) { + try { + (conn.transport as any).close?.(); + } catch { + // ignore + } + } + connections.delete(peerId); + } + + removeLivePeer(peerId); + + if (isRemotePeerId(peerId)) setRemotePeer(null); + else removeMeshPeer(peerId); + }; + const dropPeerConnection = (peerId: string) => { const mesh = presenceMeshRef.current; if (mesh && !isRemotePeerId(peerId)) { @@ -227,25 +217,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn return; } - const connections = syncConnRef.current; - const conn = connections.get(peerId); - if (!conn) return; - try { - conn.detach(); - } catch { - // ignore - } - try { - (conn.transport as any).close?.(); - } catch { - // ignore - } - connections.delete(peerId); - stopLiveAllForPeer(peerId); - stopLiveChildrenForPeer(peerId); - - if (isRemotePeerId(peerId)) setRemotePeer(null); - else removeMeshPeer(peerId); + removePeerConnection(peerId, { detach: true, closeTransport: true }); }; const selectSyncTargetIds = (connections: ReadonlyMap) => { @@ -276,27 +248,16 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn setSyncBusy(true); setSyncError(null); try { - const targets = selectSyncTargetIds(connections); - let successes = 0; - let lastErr: unknown = null; + const targets = selectSyncTargetIds(connections).filter((peerId) => connections.has(peerId)); for (const peerId of targets) { const conn = connections.get(peerId); - if (!conn) continue; - try { - await syncFiltersWithTransport(peer, peerId, conn.transport, filters, { - multipleTargets: targets.length > 1, - }); - successes += 1; - } catch (err) { - lastErr = err; - console.error(`${label} failed for peer`, peerId, err); - if (!isCapabilityRevokedError(err)) dropPeerConnection(peerId); - } - } - if (successes === 0) { - if (lastErr) throw lastErr; - throw new Error('No peers responded to sync.'); + if (conn) addInboundPeer(peerId, conn); } + await syncInboundOnce(filters, { + peerIds: targets, + syncTimeoutMs: (peerId) => + syncTimeoutMsForPeer(peerId, { multipleTargets: targets.length > 1 }), + }); await refreshMeta(); } catch (err) { console.error(`${label} failed`, err); @@ -359,9 +320,8 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn } if (!peerId) return; - const peer = syncPeerRef.current; const conn = connections.get(peerId); - if (!peer || !conn) return; + if (!conn) return; if (!authCanSyncAll) { const clean = viewRootId.toLowerCase(); @@ -377,9 +337,10 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn setSyncError(null); try { const filter: Filter = authCanSyncAll ? { all: {} } : childrenFilter(viewRootId); - await syncFiltersWithTransport(peer, peerId, conn.transport, [filter], { - autoSync: true, - label: 'auto sync', + addInboundPeer(peerId, conn); + await syncInboundOnce(filter, { + peerIds: [peerId], + syncTimeoutMs: (targetPeerId) => syncTimeoutMsForPeer(targetPeerId, { autoSync: true }), }); await refreshMeta(); @@ -571,8 +532,9 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn const mesh = presenceMeshRef.current; if (!mesh || !mesh.isPeerReady(peerId)) return; } - if (liveAllEnabledRef.current) startLiveAll(peerId); - for (const parentId of liveChildrenParentsRef.current) startLiveChildren(peerId, parentId); + const conn = connections.get(peerId); + if (!conn) return; + addInboundPeer(peerId, conn); }; const queueAutoSyncForPeer = (peerId: string) => { @@ -615,10 +577,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn return registerPeerConnection(peerId, transport); }, onPeerDisconnected: (peerId) => { - connections.delete(peerId); - stopLiveAllForPeer(peerId); - stopLiveChildrenForPeer(peerId); - removeMeshPeer(peerId); + removePeerConnection(peerId); }, onBroadcastMessage: (data) => { if (!data || typeof data !== 'object') return; @@ -775,8 +734,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn }; return () => { - stopAllLiveAll(); - stopAllLiveChildren(); if (presenceMeshRef.current === mesh) presenceMeshRef.current = null; mesh?.stop(); stopRemoteSocket(); diff --git a/examples/playground/src/playground/syncHelpers.ts b/examples/playground/src/playground/syncHelpers.ts index a5abf830..599556e0 100644 --- a/examples/playground/src/playground/syncHelpers.ts +++ b/examples/playground/src/playground/syncHelpers.ts @@ -14,22 +14,6 @@ import { const REMOTE_SYNC_CODEWORDS_PER_MESSAGE = 512; -export function withTimeout(promise: Promise, ms: number, message: string): Promise { - return new Promise((resolve, reject) => { - const timer = setTimeout(() => reject(new Error(message)), ms); - promise.then( - (value) => { - clearTimeout(timer); - resolve(value); - }, - (err) => { - clearTimeout(timer); - reject(err); - }, - ); - }); -} - export function normalizeSyncServerUrl(raw: string, docId: string): URL { return normalizeDirectSyncWebSocketUrl(raw, docId); } diff --git a/packages/treecrdt-sync/src/inbound-sync.ts b/packages/treecrdt-sync/src/inbound-sync.ts new file mode 100644 index 00000000..c286a8e7 --- /dev/null +++ b/packages/treecrdt-sync/src/inbound-sync.ts @@ -0,0 +1,345 @@ +import { bytesToHex } from '@treecrdt/interface/ids'; +import type { + Filter, + SyncMessage, + SyncOnceOptions, + SyncPeer, + SyncSubscribeOptions, + SyncSubscription, +} from '@treecrdt/sync-protocol'; +import type { DuplexTransport } from '@treecrdt/sync-protocol/transport'; + +export type InboundSyncErrorPhase = 'sync' | 'ready' | 'live'; + +export type InboundSyncErrorContext = { + peerId: string; + filter: Filter; + error: unknown; + phase: InboundSyncErrorPhase; + localPeer: SyncPeer; +}; + +export type InboundSyncStatus = { + peerCount: number; + subscriptionFilterCount: number; + activeSubscriptionCount: number; + busy: boolean; +}; + +export type InboundSyncOnceOptions = { + peerIds?: readonly string[]; + syncTimeoutMs?: number | ((peerId: string, filter: Filter) => number | undefined); +}; + +export type InboundSyncOptions = { + localPeer: SyncPeer; + syncOptions?: (peerId: string, filter: Filter) => SyncOnceOptions | undefined; + syncTimeoutMs?: number | ((peerId: string, filter: Filter) => number | undefined); + subscribeOptions?: (peerId: string, filter: Filter) => SyncSubscribeOptions | undefined; + onError?: (ctx: InboundSyncErrorContext) => void; + onStatus?: (status: InboundSyncStatus) => void; +}; + +export type InboundSync = { + readonly status: InboundSyncStatus; + readonly peerCount: number; + addPeer: (peerId: string, transport: DuplexTransport>) => void; + removePeer: (peerId: string) => void; + clearPeers: () => void; + syncOnce: (filters: Filter | readonly Filter[], opts?: InboundSyncOnceOptions) => Promise; + subscribe: (filters: Filter | readonly Filter[]) => void; + close: () => void; +}; + +function filterKey(filter: Filter): string { + return 'all' in filter ? 'all' : `children:${bytesToHex(filter.children.parent)}`; +} + +function normalizeFilters(filters: Filter | readonly Filter[]): readonly Filter[] { + return Array.isArray(filters) ? filters : [filters as Filter]; +} + +function timeoutMsFor( + peerId: string, + filter: Filter, + local: InboundSyncOnceOptions['syncTimeoutMs'], + fallback: InboundSyncOptions['syncTimeoutMs'], +) { + const option = local ?? fallback; + return typeof option === 'function' ? option(peerId, filter) : option; +} + +function withTimeout(promise: Promise, ms: number | undefined, message: string): Promise { + if (ms === undefined) return promise; + if (!Number.isFinite(ms) || ms <= 0) { + return Promise.reject(new Error(`invalid syncTimeoutMs: ${ms}`)); + } + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error(message)), ms); + promise.then( + (value) => { + clearTimeout(timer); + resolve(value); + }, + (error) => { + clearTimeout(timer); + reject(error); + }, + ); + }); +} + +export function createInboundSync(options: InboundSyncOptions): InboundSync { + const peers = new Map>>(); + const subscriptionFilters = new Map(); + let busyCount = 0; + let closed = false; + + const activeSubscriptionCount = () => { + let count = 0; + for (const subscription of subscriptionFilters.values()) count += subscription.activeCount; + return count; + }; + + const statusSnapshot = (): InboundSyncStatus => ({ + peerCount: peers.size, + subscriptionFilterCount: subscriptionFilters.size, + activeSubscriptionCount: activeSubscriptionCount(), + busy: busyCount > 0, + }); + + const emitStatus = () => { + options.onStatus?.(statusSnapshot()); + }; + + const beginWork = () => { + busyCount += 1; + emitStatus(); + }; + + const endWork = () => { + busyCount = Math.max(0, busyCount - 1); + emitStatus(); + }; + + const selectedPeers = (peerIds?: readonly string[]) => { + if (!peerIds) return Array.from(peers.entries()); + const seen = new Set(); + return peerIds.flatMap((peerId) => { + if (seen.has(peerId)) return []; + seen.add(peerId); + const transport = peers.get(peerId); + return transport ? ([[peerId, transport]] as const) : []; + }); + }; + + class SubscriptionFilter { + private readonly subscriptions = new Map(); + private readonly starting = new Set(); + private isClosed = false; + + constructor(readonly filter: Filter) {} + + get activeCount() { + return this.subscriptions.size; + } + + startAll() { + if (this.isClosed || closed) return; + for (const [peerId, transport] of peers) this.startPeer(peerId, transport); + } + + stopAll() { + for (const peerId of Array.from(this.subscriptions.keys())) this.stopPeer(peerId); + this.starting.clear(); + emitStatus(); + } + + close() { + if (this.isClosed) return; + this.isClosed = true; + this.stopAll(); + } + + addPeer(peerId: string, transport: DuplexTransport>) { + if (this.isClosed || closed) return; + this.startPeer(peerId, transport); + } + + removePeer(peerId: string) { + this.stopPeer(peerId); + emitStatus(); + } + + private startPeer(peerId: string, transport: DuplexTransport>) { + if (this.subscriptions.has(peerId) || this.starting.has(peerId)) return; + this.starting.add(peerId); + beginWork(); + + let ready = false; + const sub = options.localPeer.subscribe(transport, this.filter, { + immediate: true, + intervalMs: 0, + ...options.subscribeOptions?.(peerId, this.filter), + }); + this.subscriptions.set(peerId, sub); + emitStatus(); + + const isCurrent = () => !this.isClosed && !closed && this.subscriptions.get(peerId) === sub; + + void sub.done.catch((error) => { + if (!ready || !isCurrent()) return; + this.stopPeer(peerId); + options.onError?.({ + localPeer: options.localPeer, + peerId, + filter: this.filter, + error, + phase: 'live', + }); + emitStatus(); + }); + + void (async () => { + try { + await sub.ready; + if (isCurrent()) ready = true; + } catch (error) { + if (isCurrent()) { + this.stopPeer(peerId); + options.onError?.({ + localPeer: options.localPeer, + peerId, + filter: this.filter, + error, + phase: 'ready', + }); + } + } finally { + this.starting.delete(peerId); + endWork(); + } + })(); + } + + private stopPeer(peerId: string) { + this.starting.delete(peerId); + const sub = this.subscriptions.get(peerId); + if (!sub) return; + try { + sub.stop(); + } finally { + this.subscriptions.delete(peerId); + } + } + } + + const syncOnce = async ( + filters: Filter | readonly Filter[], + opts: InboundSyncOnceOptions = {}, + ) => { + if (closed) return; + const filterList = normalizeFilters(filters); + if (filterList.length === 0) return; + const targets = selectedPeers(opts.peerIds); + if (targets.length === 0) throw new Error('No peers available for scoped sync.'); + + let successes = 0; + let lastError: unknown = null; + beginWork(); + try { + for (const filter of filterList) { + for (const [peerId, transport] of targets) { + try { + await withTimeout( + options.localPeer.syncOnce(transport, filter, options.syncOptions?.(peerId, filter)), + timeoutMsFor(peerId, filter, opts.syncTimeoutMs, options.syncTimeoutMs), + `sync with ${peerId.slice(0, 8)} timed out`, + ); + successes += 1; + } catch (error) { + lastError = error; + options.onError?.({ + localPeer: options.localPeer, + peerId, + filter, + error, + phase: 'sync', + }); + } + } + } + } finally { + endWork(); + } + + if (successes === 0) { + if (lastError) throw lastError; + throw new Error('No peers responded to scoped sync.'); + } + }; + + const inbound: InboundSync = { + get status() { + return statusSnapshot(); + }, + get peerCount() { + return peers.size; + }, + addPeer: (peerId, transport) => { + if (closed) return; + const previous = peers.get(peerId); + peers.set(peerId, transport); + if (previous && previous !== transport) { + for (const subscription of subscriptionFilters.values()) subscription.removePeer(peerId); + } + for (const subscription of subscriptionFilters.values()) + subscription.addPeer(peerId, transport); + emitStatus(); + }, + removePeer: (peerId) => { + peers.delete(peerId); + for (const subscription of subscriptionFilters.values()) subscription.removePeer(peerId); + emitStatus(); + }, + clearPeers: () => { + peers.clear(); + for (const subscription of subscriptionFilters.values()) subscription.stopAll(); + emitStatus(); + }, + syncOnce, + subscribe: (filters) => { + if (closed) return; + const next = new Map(); + for (const filter of normalizeFilters(filters)) { + const key = filterKey(filter); + if (!next.has(key)) next.set(key, filter); + } + + for (const [key, subscription] of Array.from(subscriptionFilters.entries())) { + if (next.has(key)) continue; + subscription.close(); + subscriptionFilters.delete(key); + } + + for (const [key, filter] of next) { + if (subscriptionFilters.has(key)) continue; + const subscription = new SubscriptionFilter(filter); + subscriptionFilters.set(key, subscription); + subscription.startAll(); + } + emitStatus(); + }, + close: () => { + if (closed) return; + closed = true; + for (const subscription of Array.from(subscriptionFilters.values())) subscription.close(); + subscriptionFilters.clear(); + peers.clear(); + emitStatus(); + }, + }; + + emitStatus(); + return inbound; +} diff --git a/packages/treecrdt-sync/src/index.ts b/packages/treecrdt-sync/src/index.ts index 949f79d7..97015d35 100644 --- a/packages/treecrdt-sync/src/index.ts +++ b/packages/treecrdt-sync/src/index.ts @@ -1,6 +1,7 @@ export { connectTreecrdtWebSocketSync } from './connect.js'; export { createOutboundSync } from './controller.js'; export { createTreecrdtWebSocketSyncFromTransport } from './create-sync-from-transport.js'; +export { createInboundSync } from './inbound-sync.js'; export type { ConnectTreecrdtWebSocketSyncOptions, CreateTreecrdtWebSocketSyncFromTransportOptions, @@ -12,3 +13,11 @@ export type { TreecrdtWebSocketSync, TreecrdtWebSocketSyncClient, } from './types.js'; +export type { + InboundSync, + InboundSyncErrorContext, + InboundSyncErrorPhase, + InboundSyncOnceOptions, + InboundSyncOptions, + InboundSyncStatus, +} from './inbound-sync.js'; diff --git a/packages/treecrdt-sync/tests/inbound-sync.test.ts b/packages/treecrdt-sync/tests/inbound-sync.test.ts new file mode 100644 index 00000000..a416f24f --- /dev/null +++ b/packages/treecrdt-sync/tests/inbound-sync.test.ts @@ -0,0 +1,170 @@ +import { expect, test, vi } from 'vitest'; +import type { Operation } from '@treecrdt/interface'; +import type { Filter, SyncPeer, SyncSubscription } from '@treecrdt/sync-protocol'; + +import { createInboundSync } from '../src/inbound-sync.js'; + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +function createFakeSubscription(): SyncSubscription & { + resolveReady: () => void; + rejectReady: (error: unknown) => void; + rejectDone: (error: unknown) => void; + stopped: ReturnType; +} { + const ready = deferred(); + const done = deferred(); + const stopped = vi.fn(); + return { + ready: ready.promise, + done: done.promise, + stop: stopped, + resolveReady: () => ready.resolve(), + rejectReady: (error) => ready.reject(error), + rejectDone: (error) => done.reject(error), + stopped, + }; +} + +function createFakePeer() { + const subscriptions: ReturnType[] = []; + const peer = { + subscribe: vi.fn(() => { + const sub = createFakeSubscription(); + subscriptions.push(sub); + return sub; + }), + syncOnce: vi.fn(async () => {}), + } as unknown as SyncPeer; + return { peer, subscriptions }; +} + +const allFilter: Filter = { all: {} }; +const childFilter: Filter = { children: { parent: new Uint8Array(16) } }; + +test('inbound sync subscribes existing and future peers', () => { + const { peer } = createFakePeer(); + const inbound = createInboundSync({ localPeer: peer }); + inbound.addPeer('peer-a', {} as any); + + inbound.subscribe(allFilter); + + expect(peer.subscribe).toHaveBeenCalledTimes(1); + expect(inbound.status.activeSubscriptionCount).toBe(1); + + inbound.addPeer('peer-b', {} as any); + + expect(peer.subscribe).toHaveBeenCalledTimes(2); + expect(inbound.status.activeSubscriptionCount).toBe(2); +}); + +test('inbound sync declaratively diffs subscription filters', () => { + const { peer, subscriptions } = createFakePeer(); + const inbound = createInboundSync({ localPeer: peer }); + inbound.addPeer('peer-a', {} as any); + + inbound.subscribe(allFilter); + inbound.subscribe(childFilter); + + expect(subscriptions[0]!.stopped).toHaveBeenCalledTimes(1); + expect(peer.subscribe).toHaveBeenCalledTimes(2); + expect(inbound.status.subscriptionFilterCount).toBe(1); + expect(inbound.status.activeSubscriptionCount).toBe(1); + + inbound.subscribe([]); + + expect(subscriptions[1]!.stopped).toHaveBeenCalledTimes(1); + expect(inbound.status.subscriptionFilterCount).toBe(0); + expect(inbound.status.activeSubscriptionCount).toBe(0); +}); + +test('inbound sync stops subscriptions when peers are removed', () => { + const { peer, subscriptions } = createFakePeer(); + const inbound = createInboundSync({ localPeer: peer }); + inbound.subscribe(allFilter); + inbound.addPeer('peer-a', {} as any); + + expect(subscriptions).toHaveLength(1); + + inbound.removePeer('peer-a'); + + expect(subscriptions[0]!.stopped).toHaveBeenCalledTimes(1); + expect(inbound.status.activeSubscriptionCount).toBe(0); +}); + +test('inbound sync suppresses stale subscription failures after peer removal', async () => { + const { peer, subscriptions } = createFakePeer(); + const errors: unknown[] = []; + const inbound = createInboundSync({ + localPeer: peer, + onError: ({ error }) => errors.push(error), + }); + inbound.subscribe(allFilter); + inbound.addPeer('peer-a', {} as any); + + inbound.removePeer('peer-a'); + subscriptions[0]!.rejectReady(new Error('stale ready failure')); + await subscriptions[0]!.ready.catch(() => undefined); + + expect(errors).toEqual([]); +}); + +test('inbound sync reports ready and live subscription failures', async () => { + const { peer, subscriptions } = createFakePeer(); + const errors: string[] = []; + const inbound = createInboundSync({ + localPeer: peer, + onError: ({ phase, error }) => + errors.push(`${phase}:${error instanceof Error ? error.message : String(error)}`), + }); + inbound.subscribe(allFilter); + inbound.addPeer('peer-a', {} as any); + subscriptions[0]!.rejectReady(new Error('initial catch-up failed')); + await subscriptions[0]!.ready.catch(() => undefined); + + expect(errors).toEqual(['ready:initial catch-up failed']); + expect(inbound.status.activeSubscriptionCount).toBe(0); + + inbound.addPeer('peer-b', {} as any); + subscriptions[1]!.resolveReady(); + await subscriptions[1]!.ready; + subscriptions[1]!.rejectDone(new Error('live failed')); + await subscriptions[1]!.done.catch(() => undefined); + + expect(errors).toEqual(['ready:initial catch-up failed', 'live:live failed']); + expect(inbound.status.activeSubscriptionCount).toBe(0); +}); + +test('inbound sync syncOnce reconciles all available peers', async () => { + const { peer } = createFakePeer(); + const inbound = createInboundSync({ localPeer: peer }); + inbound.addPeer('peer-a', {} as any); + inbound.addPeer('peer-b', {} as any); + + await inbound.syncOnce(allFilter); + + expect(peer.syncOnce).toHaveBeenCalledTimes(2); + expect(peer.syncOnce).toHaveBeenNthCalledWith(1, expect.anything(), allFilter, undefined); + expect(peer.syncOnce).toHaveBeenNthCalledWith(2, expect.anything(), allFilter, undefined); +}); + +test('inbound sync syncOnce supports selected peer ids', async () => { + const { peer } = createFakePeer(); + const inbound = createInboundSync({ localPeer: peer }); + inbound.addPeer('peer-a', {} as any); + inbound.addPeer('peer-b', {} as any); + + await inbound.syncOnce([allFilter, childFilter], { peerIds: ['peer-b'] }); + + expect(peer.syncOnce).toHaveBeenCalledTimes(2); + expect(peer.syncOnce).toHaveBeenNthCalledWith(1, expect.anything(), allFilter, undefined); + expect(peer.syncOnce).toHaveBeenNthCalledWith(2, expect.anything(), childFilter, undefined); +});