Skip to content

Commit 9084758

Browse files
committed
Added useSyncStreams hook that allows you to manage a variable number of sync streams.
1 parent a465f6f commit 9084758

3 files changed

Lines changed: 152 additions & 1 deletion

File tree

.changeset/flat-jars-do.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/react': minor
3+
---
4+
5+
Added `useSyncStreams` hook that allows you to manage a variable number of sync streams, compared to the existing `useSyncStream` hook intended for the one and only one use case.

packages/react/src/hooks/streams.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,56 @@ export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus |
6565
return status.forStream(stream) ?? null;
6666
}
6767

68+
/**
69+
* Creates multiple PowerSync stream subscriptions. Subscriptions are kept alive as long as the
70+
* React component calling this function. When it unmounts, or when the streams array contents
71+
* change, all previous subscriptions are unsubscribed before new ones are created.
72+
*/
73+
export function useSyncStreams(streamOptions: UseSyncStreamOptions[]): SyncStreamStatus[] {
74+
const db = usePowerSync();
75+
const status = useStatus();
76+
77+
const stringifiedOptions = useMemo(() => JSON.stringify(streamOptions), [streamOptions]);
78+
const syncStreams = useMemo(
79+
() =>
80+
streamOptions.map((options) => {
81+
return {
82+
stream: db.syncStream(options.name, options.parameters ?? undefined),
83+
options
84+
};
85+
}),
86+
[stringifiedOptions]
87+
);
88+
89+
useEffect(() => {
90+
let active = true;
91+
const resolvedSubs: SyncStreamSubscription[] = [];
92+
93+
for (const entry of syncStreams) {
94+
entry.stream.subscribe(entry.options).then((sub) => {
95+
if (active) {
96+
resolvedSubs.push(sub);
97+
} else {
98+
// The cleanup function already ran, unsubscribe immediately.
99+
sub.unsubscribe();
100+
}
101+
});
102+
}
103+
104+
return () => {
105+
active = false;
106+
for (const sub of resolvedSubs) {
107+
sub.unsubscribe();
108+
}
109+
};
110+
}, [stringifiedOptions]);
111+
112+
return useMemo(
113+
() => syncStreams.map((entry) => status.forStream(entry.stream) ?? null),
114+
[status, stringifiedOptions]
115+
);
116+
}
117+
68118
/**
69119
* Returns `true` once all streams in the array have synced at least once.
70120
*/

packages/react/tests/streams.test.tsx

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import React, { act, useSyncExternalStore } from 'react';
44
import { AbstractPowerSyncDatabase, ConnectionManager, SyncStatus } from '@powersync/common';
55
import { openPowerSync } from './utils';
66
import { PowerSyncContext } from '../src/hooks/PowerSyncContext';
7-
import { useSyncStream, UseSyncStreamOptions } from '../src/hooks/streams';
7+
import { useSyncStream, useSyncStreams, UseSyncStreamOptions } from '../src/hooks/streams';
88
import { useQuery } from '../src/hooks/watched/useQuery';
99
import { QuerySyncStreamOptions } from '../src/hooks/watched/watch-types';
1010

@@ -142,6 +142,102 @@ describe('stream hooks', () => {
142142
await waitFor(() => expect(currentStreams()).toHaveLength(0), { timeout: 1000, interval: 100 });
143143
});
144144

145+
it('useSyncStreams subscribes and unsubscribes', async () => {
146+
expect(currentStreams()).toStrictEqual([]);
147+
148+
const { result, unmount } = renderHook(() => useSyncStreams([{ name: 'a' }, { name: 'b' }]), {
149+
wrapper: testWrapper
150+
});
151+
expect(result.current).toStrictEqual([null, null]);
152+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
153+
await waitFor(() => expect(result.current.every((s) => s !== null)).toBe(true), {
154+
timeout: 1000,
155+
interval: 100
156+
});
157+
158+
unmount();
159+
expect(currentStreams()).toStrictEqual([]);
160+
});
161+
162+
it('useSyncStreams with cached instance', async () => {
163+
const existingSubscription = await db.syncStream('a').subscribe();
164+
await existingSubscription.unsubscribe();
165+
166+
const { result } = renderHook(() => useSyncStreams([{ name: 'a' }]), {
167+
wrapper: testWrapper
168+
});
169+
expect(result.current[0]).not.toBeNull();
170+
});
171+
172+
it('useSyncStreams handles array growing from 1 to 2 entries', async () => {
173+
let streamOptions: UseSyncStreamOptions[] = [{ name: 'a' }];
174+
let streamUpdateListeners: (() => void)[] = [];
175+
176+
const { result } = renderHook(
177+
() => {
178+
const options = useSyncExternalStore(
179+
(cb) => {
180+
streamUpdateListeners.push(cb);
181+
return () => {
182+
const index = streamUpdateListeners.indexOf(cb);
183+
if (index != -1) {
184+
streamUpdateListeners.splice(index, 1);
185+
}
186+
};
187+
},
188+
() => streamOptions
189+
);
190+
return useSyncStreams(options);
191+
},
192+
{ wrapper: testWrapper }
193+
);
194+
195+
await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 });
196+
197+
// Grow to 2 entries
198+
streamOptions = [{ name: 'a' }, { name: 'b' }];
199+
act(() => streamUpdateListeners.forEach((cb) => cb()));
200+
201+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
202+
expect(result.current).toHaveLength(2);
203+
});
204+
205+
it('useSyncStreams handles array shrinking from 2 to 1 entries', async () => {
206+
let streamOptions: UseSyncStreamOptions[] = [{ name: 'a' }, { name: 'b' }];
207+
let streamUpdateListeners: (() => void)[] = [];
208+
209+
const { result, unmount } = renderHook(
210+
() => {
211+
const options = useSyncExternalStore(
212+
(cb) => {
213+
streamUpdateListeners.push(cb);
214+
return () => {
215+
const index = streamUpdateListeners.indexOf(cb);
216+
if (index != -1) {
217+
streamUpdateListeners.splice(index, 1);
218+
}
219+
};
220+
},
221+
() => streamOptions
222+
);
223+
return useSyncStreams(options);
224+
},
225+
{ wrapper: testWrapper }
226+
);
227+
228+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
229+
230+
// Shrink to 1 entry
231+
streamOptions = [{ name: 'a' }];
232+
act(() => streamUpdateListeners.forEach((cb) => cb()));
233+
234+
await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 });
235+
expect(result.current).toHaveLength(1);
236+
237+
unmount();
238+
expect(currentStreams()).toStrictEqual([]);
239+
});
240+
145241
it('handles stream parameter changes', async () => {
146242
// Start without streams
147243
let streams: QuerySyncStreamOptions[] = [];

0 commit comments

Comments
 (0)