Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
type SetStateAction,
} from 'react';
import type { Operation } from '@treecrdt/interface';
import type { SyncPeer, SyncSubscription } from '@treecrdt/sync-protocol';
import { createInboundSync, type InboundSync, type SyncScope } from '@treecrdt/sync';
import type { Filter, SyncMessage, SyncPeer } from '@treecrdt/sync-protocol';
import type { DuplexTransport } from '@treecrdt/sync-protocol/transport';

import { hexToBytes16 } from '../../sync-v0';
Expand All @@ -18,22 +19,25 @@ export type PlaygroundSyncConnection = {
detach: () => void;
};

function liveFilterLabel(filter: Filter): string {
return 'all' in filter ? 'all' : 'children';
}

export function usePlaygroundLiveSubscriptions(opts: {
syncPeerRef: MutableRefObject<SyncPeer<Operation> | null>;
syncConnRef: MutableRefObject<Map<string, PlaygroundSyncConnection>>;
setSyncError: Dispatch<SetStateAction<string | null>>;
authCanSyncAll: boolean;
}) {
const { syncPeerRef, syncConnRef, setSyncError, authCanSyncAll } = opts;
const { syncPeerRef, setSyncError, authCanSyncAll } = opts;
const [liveBusy, setLiveBusy] = useState(false);
const [liveChildrenParents, setLiveChildrenParents] = useState<Set<string>>(() => new Set());
const [liveAllEnabled, setLiveAllEnabled] = useState(false);
const liveChildrenParentsRef = useRef<Set<string>>(new Set());
const liveChildSubsRef = useRef<Map<string, Map<string, SyncSubscription>>>(new Map());
const liveAllEnabledRef = useRef(false);
const liveAllSubsRef = useRef<Map<string, SyncSubscription>>(new Map());
const liveAllStartingRef = useRef<Set<string>>(new Set());
const liveChildrenStartingRef = useRef<Set<string>>(new Set());
const inboundSyncRef = useRef<InboundSync<Operation> | null>(null);
const inboundSyncPeerRef = useRef<SyncPeer<Operation> | null>(null);
const liveAllScopeRef = useRef<SyncScope | null>(null);
const liveChildScopesRef = useRef<Map<string, SyncScope>>(new Map());
const liveBusyCountRef = useRef(0);

const beginLiveWork = () => {
Expand All @@ -46,128 +50,80 @@ export function usePlaygroundLiveSubscriptions(opts: {
setLiveBusy(liveBusyCountRef.current > 0);
};

const stopLiveAllForPeer = (peerId: string) => {
const existing = liveAllSubsRef.current.get(peerId);
if (!existing) return;
existing.stop();
liveAllSubsRef.current.delete(peerId);
const ensureInboundSync = () => {
const peer = syncPeerRef.current;
if (!peer) return null;
if (inboundSyncRef.current && inboundSyncPeerRef.current === peer) {
return inboundSyncRef.current;
}

inboundSyncRef.current?.close();
liveAllScopeRef.current = null;
liveChildScopesRef.current.clear();

const inbound = createInboundSync<Operation>({
localPeer: peer,
subscribeOptions: (peerId) => syncOnceOptionsForPeer(peerId, 1024),
onWorkStart: beginLiveWork,
onWorkEnd: endLiveWork,
onError: ({ peerId, filter, error, phase }) => {
console.error(`Live sync(${liveFilterLabel(filter)}) ${phase} failed`, peerId, error);
setSyncError(formatSyncError(error));
},
});
inboundSyncRef.current = inbound;
inboundSyncPeerRef.current = peer;
return inbound;
};

const stopAllLiveAll = () => {
for (const sub of liveAllSubsRef.current.values()) sub.stop();
liveAllSubsRef.current.clear();
const ensureLiveAllScope = () => {
const inbound = ensureInboundSync();
if (!inbound) return;
const scope = liveAllScopeRef.current ?? inbound.scope({ all: {} });
liveAllScopeRef.current = scope;
scope.startLive();
};

const startLiveAll = (peerId: string) => {
const conn = syncConnRef.current.get(peerId);
const peer = syncPeerRef.current;
if (!conn || !peer) return;

if (liveAllSubsRef.current.has(peerId)) return;
if (liveAllStartingRef.current.has(peerId)) return;
liveAllStartingRef.current.add(peerId);
beginLiveWork();

void (async () => {
let started = false;
const sub = peer.subscribe(
conn.transport,
{ all: {} },
{
immediate: true,
intervalMs: 0,
...syncOnceOptionsForPeer(peerId, 1024),
},
);
liveAllSubsRef.current.set(peerId, sub);
void sub.done.catch((err) => {
if (!started) return;
console.error('Live sync(all) failed', err);
stopLiveAllForPeer(peerId);
setSyncError(formatSyncError(err));
});

try {
await sub.ready;
started = true;
} catch (err) {
console.error('Live sync(all) initial catch-up failed', err);
stopLiveAllForPeer(peerId);
setSyncError(formatSyncError(err));
}
})().finally(() => {
liveAllStartingRef.current.delete(peerId);
endLiveWork();
});
const closeLiveAllScope = () => {
liveAllScopeRef.current?.close();
liveAllScopeRef.current = null;
};

const ensureLiveChildScope = (parentId: string) => {
const inbound = ensureInboundSync();
if (!inbound) return;
const existing = liveChildScopesRef.current.get(parentId);
if (existing) {
existing.startLive();
return;
}

const scope = inbound.scope({ children: { parent: hexToBytes16(parentId) } });
liveChildScopesRef.current.set(parentId, scope);
scope.startLive();
};

const stopLiveChildrenForPeer = (peerId: string) => {
const byParent = liveChildSubsRef.current.get(peerId);
if (!byParent) return;
for (const sub of byParent.values()) sub.stop();
liveChildSubsRef.current.delete(peerId);
const closeLiveChildScope = (parentId: string) => {
const scope = liveChildScopesRef.current.get(parentId);
if (!scope) return;
scope.close();
liveChildScopesRef.current.delete(parentId);
};

const stopLiveChildren = (peerId: string, parentId: string) => {
const byParent = liveChildSubsRef.current.get(peerId);
if (!byParent) return;
const sub = byParent.get(parentId);
if (!sub) return;
sub.stop();
byParent.delete(parentId);
if (byParent.size === 0) liveChildSubsRef.current.delete(peerId);
const startDesiredScopes = () => {
if (liveAllEnabledRef.current) ensureLiveAllScope();
for (const parentId of liveChildrenParentsRef.current) ensureLiveChildScope(parentId);
};

const stopAllLiveChildren = () => {
for (const peerId of Array.from(liveChildSubsRef.current.keys()))
stopLiveChildrenForPeer(peerId);
const addLivePeer = (peerId: string, conn: PlaygroundSyncConnection) => {
const inbound = ensureInboundSync();
if (!inbound) return;
inbound.addPeer(peerId, conn.transport as DuplexTransport<SyncMessage<Operation>>);
startDesiredScopes();
};

const startLiveChildren = (peerId: string, parentId: string) => {
const conn = syncConnRef.current.get(peerId);
const peer = syncPeerRef.current;
if (!conn || !peer) return;

const existing = liveChildSubsRef.current.get(peerId);
if (existing?.has(parentId)) return;
const startKey = `${peerId}\u0000${parentId}`;
if (liveChildrenStartingRef.current.has(startKey)) return;
liveChildrenStartingRef.current.add(startKey);
beginLiveWork();

const byParent = existing ?? new Map<string, SyncSubscription>();
void (async () => {
let started = false;
const sub = peer.subscribe(
conn.transport,
{ children: { parent: hexToBytes16(parentId) } },
{
immediate: true,
intervalMs: 0,
...syncOnceOptionsForPeer(peerId, 1024),
},
);
byParent.set(parentId, sub);
liveChildSubsRef.current.set(peerId, byParent);
void sub.done.catch((err) => {
if (!started) return;
console.error('Live sync failed', err);
stopLiveChildren(peerId, parentId);
setSyncError(formatSyncError(err));
});

try {
await sub.ready;
started = true;
} catch (err) {
console.error('Live sync(children) initial catch-up failed', err);
stopLiveChildren(peerId, parentId);
setSyncError(formatSyncError(err));
}
})().finally(() => {
liveChildrenStartingRef.current.delete(startKey);
endLiveWork();
});
const removeLivePeer = (peerId: string) => {
inboundSyncRef.current?.removePeer(peerId);
};

const toggleLiveChildren = (parentId: string) => {
Expand All @@ -180,42 +136,28 @@ export function usePlaygroundLiveSubscriptions(opts: {
};

const resetLiveWork = () => {
liveAllStartingRef.current.clear();
liveChildrenStartingRef.current.clear();
inboundSyncRef.current?.close();
inboundSyncRef.current = null;
inboundSyncPeerRef.current = null;
liveAllScopeRef.current = null;
liveChildScopesRef.current.clear();
liveBusyCountRef.current = 0;
setLiveBusy(false);
};

useEffect(() => {
liveChildrenParentsRef.current = liveChildrenParents;

const connections = syncConnRef.current;
for (const peerId of connections.keys()) {
for (const parentId of liveChildrenParents) startLiveChildren(peerId, parentId);
}

for (const peerId of Array.from(liveChildSubsRef.current.keys())) {
if (!connections.has(peerId)) {
stopLiveChildrenForPeer(peerId);
continue;
}
const byParent = liveChildSubsRef.current.get(peerId);
if (!byParent) continue;
for (const parentId of Array.from(byParent.keys())) {
if (!liveChildrenParents.has(parentId)) stopLiveChildren(peerId, parentId);
}
for (const parentId of liveChildrenParents) ensureLiveChildScope(parentId);
for (const parentId of Array.from(liveChildScopesRef.current.keys())) {
if (!liveChildrenParents.has(parentId)) closeLiveChildScope(parentId);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [liveChildrenParents]);

useEffect(() => {
liveAllEnabledRef.current = liveAllEnabled;
const connections = syncConnRef.current;
if (liveAllEnabled) {
for (const peerId of connections.keys()) startLiveAll(peerId);
} else {
stopAllLiveAll();
}
if (liveAllEnabled) ensureLiveAllScope();
else closeLiveAllScope();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [liveAllEnabled]);

Expand All @@ -234,12 +176,8 @@ export function usePlaygroundLiveSubscriptions(opts: {
liveAllEnabledRef,
beginLiveWork,
endLiveWork,
startLiveAll,
stopLiveAllForPeer,
stopAllLiveAll,
startLiveChildren,
stopLiveChildrenForPeer,
stopAllLiveChildren,
addLivePeer,
removeLivePeer,
resetLiveWork,
};
}
Loading
Loading