Skip to content

Commit a5d6964

Browse files
authored
(feat): Tanstack Query Sync Streams (#866)
1 parent a3feb9e commit a5d6964

13 files changed

Lines changed: 571 additions & 86 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/react': minor
3+
---
4+
5+
Exporting `useAllSyncStreamsHaveSynced()` member.

.changeset/tasty-meals-tap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/tanstack-react-query': minor
3+
---
4+
5+
Added sync stream support to the `useQuery()` and `useQueries()` hooks.

packages/react/src/hooks/streams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus |
6666
}
6767

6868
/**
69-
* @internal
69+
* Returns `true` once all streams in the array have synced at least once.
7070
*/
7171
export function useAllSyncStreamsHaveSynced(
7272
db: AbstractPowerSyncDatabase,

packages/react/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ export * from './hooks/PowerSyncContext.js';
55
export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js';
66
export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js';
77
export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js';
8-
export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
8+
export { useAllSyncStreamsHaveSynced, useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
99
export { useStatus } from './hooks/useStatus.js';
1010
export { useQuery } from './hooks/watched/useQuery.js';
1111
export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js';
12-
export { AdditionalOptions } from './hooks/watched/watch-types.js';
12+
export { AdditionalOptions, QuerySyncStreamOptions } from './hooks/watched/watch-types.js';

packages/tanstack-react-query/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"@tanstack/react-query": "^5.70.0"
4242
},
4343
"devDependencies": {
44+
"@powersync/web": "workspace:*",
4445
"@testing-library/react": "^15.0.2",
4546
"@types/react": "18.3.1",
4647
"jsdom": "catalog:",

packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
import { type CompilableQuery, parseQuery } from '@powersync/common';
2-
import { usePowerSync } from '@powersync/react';
2+
import { QuerySyncStreamOptions, useAllSyncStreamsHaveSynced, usePowerSync } from '@powersync/react';
33
import { useEffect, useState, useCallback, useMemo } from 'react';
44
import * as Tanstack from '@tanstack/react-query';
55

66
export type UsePowerSyncQueriesInput = {
77
query?: string | CompilableQuery<unknown>;
88
parameters?: unknown[];
99
queryKey: Tanstack.QueryKey;
10+
streams?: QuerySyncStreamOptions[];
1011
}[];
1112

1213
export type UsePowerSyncQueriesOutput = {
13-
sqlStatement: string;
14-
queryParameters: unknown[];
15-
tables: string[];
16-
error?: Error;
17-
queryFn: () => Promise<unknown[]>;
18-
}[];
14+
queries: {
15+
sqlStatement: string;
16+
queryParameters: unknown[];
17+
tables: string[];
18+
error?: Error;
19+
queryFn: () => Promise<unknown[]>;
20+
}[];
21+
streamsHaveSynced: boolean;
22+
};
1923

2024
export function usePowerSyncQueries(
2125
queries: UsePowerSyncQueriesInput,
@@ -35,6 +39,14 @@ export function usePowerSyncQueries(
3539
});
3640
}, []);
3741

42+
// Collect all streams from all query entries into a single flat list for sync tracking.
43+
const allStreams = useMemo(() => {
44+
const streams = queries.flatMap(q => q.streams ?? []);
45+
return streams.length > 0 ? streams : undefined;
46+
}, [queries]);
47+
48+
const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, allStreams);
49+
3850
const updateErrorsArr = useCallback((error: Error | undefined, idx: number) => {
3951
setErrorsArr((prev) => {
4052
if (prev[idx]?.message === error?.message) return prev;
@@ -164,29 +176,33 @@ export function usePowerSyncQueries(
164176
}, [powerSync, queryClient, tablesArr, updateErrorsArr, stringifiedQueryKeys]);
165177

166178
return useMemo(() => {
167-
return parsedQueries.map((pq, idx) => {
168-
const error = errorsArr[idx] || pq.parseError;
169-
170-
const queryFn = async () => {
171-
if (error) throw error;
172-
if (!pq.query) throw new Error('No query provided');
173179

174-
try {
175-
return typeof pq.query === 'string'
176-
? await powerSync.getAll(pq.sqlStatement, pq.queryParameters)
177-
: await pq.query.execute();
178-
} catch (e) {
179-
throw e;
180-
}
181-
};
182-
183-
return {
184-
sqlStatement: pq.sqlStatement,
185-
queryParameters: pq.queryParameters,
186-
tables: tablesArr[idx],
187-
error,
188-
queryFn
189-
};
190-
});
180+
return {
181+
queries: parsedQueries.map((pq, idx) => {
182+
const error = errorsArr[idx] || pq.parseError;
183+
184+
const queryFn = async () => {
185+
if (error) throw error;
186+
if (!pq.query) throw new Error('No query provided');
187+
188+
try {
189+
return typeof pq.query === 'string'
190+
? await powerSync.getAll(pq.sqlStatement, pq.queryParameters)
191+
: await pq.query.execute();
192+
} catch (e) {
193+
throw e;
194+
}
195+
};
196+
197+
return {
198+
sqlStatement: pq.sqlStatement,
199+
queryParameters: pq.queryParameters,
200+
tables: tablesArr[idx],
201+
error,
202+
queryFn
203+
};
204+
}),
205+
streamsHaveSynced
206+
};
191207
}, [parsedQueries, errorsArr, tablesArr, powerSync]);
192208
}

packages/tanstack-react-query/src/hooks/useQueries.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
11
import { type CompilableQuery } from '@powersync/common';
2-
import { usePowerSync } from '@powersync/react';
2+
import { QuerySyncStreamOptions, usePowerSync } from '@powersync/react';
33
import * as Tanstack from '@tanstack/react-query';
44
import { useMemo } from 'react';
55
import { usePowerSyncQueries } from './usePowerSyncQueries.js';
66

77
export type PowerSyncQueryOptions<T> = {
88
query?: string | CompilableQuery<T>;
99
parameters?: any[];
10+
/**
11+
* An optional array of sync streams (with names and parameters) backing the query.
12+
*
13+
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
14+
*
15+
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
16+
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
17+
* been downloaded.
18+
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
19+
* consistent sync snapshots will be made available as they've been processed by PowerSync.
20+
**/
21+
streams?: QuerySyncStreamOptions[];
1022
};
1123

1224
export type PowerSyncQueryOption<T = unknown[]> = Tanstack.UseQueryOptions<T[]> & PowerSyncQueryOptions<T>;
@@ -126,12 +138,13 @@ export function useQueries(
126138
queriesInput.map((queryOptions) => ({
127139
query: queryOptions.query,
128140
parameters: queryOptions.parameters,
129-
queryKey: queryOptions.queryKey
141+
queryKey: queryOptions.queryKey,
142+
streams: queryOptions.streams
130143
})),
131144
[queriesInput]
132145
);
133146

134-
const states = usePowerSyncQueries(powerSyncQueriesInput, queryClient);
147+
const {queries: states, streamsHaveSynced} = usePowerSyncQueries(powerSyncQueriesInput, queryClient);
135148

136149
const queries = useMemo(() => {
137150
return queriesInput.map((queryOptions, idx) => {
@@ -141,7 +154,8 @@ export function useQueries(
141154
return {
142155
...rest,
143156
queryFn: query ? state.queryFn : rest.queryFn,
144-
queryKey: rest.queryKey
157+
queryKey: rest.queryKey,
158+
enabled: streamsHaveSynced
145159
};
146160
});
147161
}, [queriesInput, states]);

packages/tanstack-react-query/src/hooks/useQuery.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
11
import { type CompilableQuery } from '@powersync/common';
2-
import { usePowerSync } from '@powersync/react';
2+
import { QuerySyncStreamOptions, usePowerSync } from '@powersync/react';
33
import * as Tanstack from '@tanstack/react-query';
44
import { usePowerSyncQueries } from './usePowerSyncQueries.js';
55

66
export type PowerSyncQueryOptions<T> = {
77
query?: string | CompilableQuery<T>;
88
parameters?: any[];
9+
/**
10+
* An optional array of sync streams (with names and parameters) backing the query.
11+
*
12+
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
13+
*
14+
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
15+
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
16+
* been downloaded.
17+
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
18+
* consistent sync snapshots will be made available as they've been processed by PowerSync.
19+
**/
20+
streams?: QuerySyncStreamOptions[];
921
};
1022

1123
export type UseBaseQueryOptions<TQueryOptions> = TQueryOptions & PowerSyncQueryOptions<any>;
@@ -120,14 +132,15 @@ function useQueryCore<
120132
throw new Error('PowerSync is not available');
121133
}
122134

123-
const { query, parameters, queryKey, ...resolvedOptions } = options;
135+
const { query, parameters, queryKey, streams, ...resolvedOptions } = options;
124136

125-
const [{ queryFn }] = usePowerSyncQueries(
137+
const { queries: [{ queryFn }], streamsHaveSynced } = usePowerSyncQueries(
126138
[
127139
{
128140
query,
129141
parameters,
130-
queryKey
142+
queryKey,
143+
streams
131144
}
132145
],
133146
queryClient
@@ -137,7 +150,8 @@ function useQueryCore<
137150
{
138151
...(resolvedOptions as TQueryOptions),
139152
queryKey,
140-
queryFn: query ? queryFn : resolvedOptions.queryFn
153+
queryFn: query ? queryFn : resolvedOptions.queryFn,
154+
enabled: streamsHaveSynced
141155
} as TQueryOptions,
142156
queryClient
143157
);

0 commit comments

Comments
 (0)