Skip to content

Commit 69f7394

Browse files
wip: react suspense
1 parent 7d1124c commit 69f7394

7 files changed

Lines changed: 459 additions & 308 deletions

File tree

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { BaseListener, BaseObserverInterface } from '../../utils/BaseObserver.js';
2+
13
export interface WatchedQueryState<Data> {
24
/**
35
* Indicates the initial loading state (hard loading). Loading becomes false once the first set of results from the watched query is available or an error occurs.
@@ -47,18 +49,37 @@ export interface WatchedQueryOptions<DataType> {
4749
reportFetching?: boolean;
4850
}
4951

52+
export enum WatchedQuerySubscriptionEvent {
53+
ON_DATA = 'onData',
54+
ON_ERROR = 'onError',
55+
ON_STATE_CHANGE = 'onStateChange'
56+
}
57+
5058
export interface WatchedQuerySubscription<Data> {
51-
onData?: (data: Data) => void | Promise<void>;
52-
onError?: (error: Error) => void | Promise<void>;
53-
onStateChange?: (state: WatchedQueryState<Data>) => void | Promise<void>;
59+
[WatchedQuerySubscriptionEvent.ON_DATA]?: (data: Data) => void | Promise<void>;
60+
[WatchedQuerySubscriptionEvent.ON_ERROR]?: (error: Error) => void | Promise<void>;
61+
[WatchedQuerySubscriptionEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState<Data>) => void | Promise<void>;
62+
}
63+
64+
export type SubscriptionCounts = Record<WatchedQuerySubscriptionEvent, number> & {
65+
total: number;
66+
};
67+
68+
export interface WatchedQueryListener extends BaseListener {
69+
closed: () => void;
70+
subscriptionsChanged: (counts: SubscriptionCounts) => void;
5471
}
5572

56-
export interface WatchedQuery<Data> {
73+
export interface WatchedQuery<Data> extends BaseObserverInterface<WatchedQueryListener> {
5774
/**
5875
* Current state of the watched query.
5976
*/
6077
readonly state: WatchedQueryState<Data>;
6178

79+
readonly closed: boolean;
80+
81+
readonly subscriptionCounts: SubscriptionCounts;
82+
6283
/**
6384
* Subscribe to watched query events.
6485
* @returns A function to unsubscribe from the events.

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js';
2-
import { BaseListener, BaseObserver } from '../../../utils/BaseObserver.js';
3-
import { WatchedQuery, WatchedQueryOptions, WatchedQueryState, WatchedQuerySubscription } from '../WatchedQuery.js';
2+
import { BaseObserver } from '../../../utils/BaseObserver.js';
3+
import {
4+
SubscriptionCounts,
5+
WatchedQuery,
6+
WatchedQueryListener,
7+
WatchedQueryOptions,
8+
WatchedQueryState,
9+
WatchedQuerySubscription,
10+
WatchedQuerySubscriptionEvent
11+
} from '../WatchedQuery.js';
412

513
/**
614
* @internal
@@ -18,7 +26,7 @@ export interface LinkQueryOptions<Data> {
1826
query: WatchedQueryOptions<Data>;
1927
}
2028

21-
type WatchedQueryProcessorListener<Data> = WatchedQuerySubscription<Data> & BaseListener;
29+
type WatchedQueryProcessorListener<Data> = WatchedQuerySubscription<Data> & WatchedQueryListener;
2230

2331
/**
2432
* Performs underlaying watching and yields a stream of results.
@@ -32,10 +40,25 @@ export abstract class AbstractQueryProcessor<Data = unknown[]>
3240

3341
protected abortController: AbortController;
3442
protected initialized: Promise<void>;
43+
protected _closed: boolean;
44+
45+
get closed() {
46+
return this._closed;
47+
}
48+
49+
get subscriptionCounts() {
50+
const listenersArray = Array.from(this.listeners);
51+
return Object.values(WatchedQuerySubscriptionEvent).reduce((totals: Partial<SubscriptionCounts>, key) => {
52+
totals[key] = listenersArray.filter((l) => !!l[key]).length;
53+
totals.total = (totals.total ?? 0) + totals[key];
54+
return totals;
55+
}, {}) as SubscriptionCounts;
56+
}
3557

3658
constructor(protected options: AbstractQueryProcessorOptions<Data>) {
3759
super();
3860
this.abortController = new AbortController();
61+
this._closed = false;
3962
this.state = {
4063
isLoading: true,
4164
isFetching: this.reportFetching, // Only set to true if we will report updates in future
@@ -108,12 +131,23 @@ export abstract class AbstractQueryProcessor<Data = unknown[]>
108131
}
109132

110133
subscribe(subscription: WatchedQuerySubscription<Data>): () => void {
111-
return this.registerListener({ ...subscription });
134+
// hook in to subscription events in order to report changes
135+
const baseDispose = this.registerListener({ ...subscription });
136+
137+
const counts = this.subscriptionCounts;
138+
this.iterateListeners((l) => l.subscriptionsChanged?.(counts));
139+
140+
return () => {
141+
baseDispose();
142+
this.iterateListeners((l) => l.subscriptionsChanged?.(counts));
143+
};
112144
}
113145

114146
async close() {
115147
await this.initialized;
116148
this.abortController.abort();
149+
this._closed = true;
150+
this.iterateListeners((l) => l.closed?.());
117151
}
118152

119153
/**

packages/react/src/QueryStore.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import { AbstractPowerSyncDatabase } from '@powersync/common';
2-
import { Query, WatchedQuery } from './WatchedQuery';
1+
import { AbstractPowerSyncDatabase, WatchedQuery } from '@powersync/common';
2+
import { Query } from './WatchedQuery';
33
import { AdditionalOptions } from './hooks/useQuery';
44

55
export function generateQueryKey(sqlStatement: string, parameters: any[], options: AdditionalOptions): string {
66
return `${sqlStatement} -- ${JSON.stringify(parameters)} -- ${JSON.stringify(options)}`;
77
}
88

99
export class QueryStore {
10-
cache = new Map<string, WatchedQuery>();
10+
cache = new Map<string, WatchedQuery<unknown[]>>();
1111

1212
constructor(private db: AbstractPowerSyncDatabase) {}
1313

@@ -16,17 +16,40 @@ export class QueryStore {
1616
return this.cache.get(key);
1717
}
1818

19-
const q = new WatchedQuery(this.db, query, options);
20-
const disposer = q.registerListener({
21-
disposed: () => {
19+
const customExecutor = typeof query.rawQuery !== 'string' ? query.rawQuery : null;
20+
21+
const watchedQuery = this.db.incrementalWatch({
22+
sql: query.sqlStatement,
23+
parameters: query.queryParameters,
24+
customExecutor: customExecutor
25+
? {
26+
initialData: [],
27+
execute: () => customExecutor.execute()
28+
}
29+
: undefined,
30+
throttleMs: options.throttleMs
31+
});
32+
33+
const disposer = watchedQuery.registerListener({
34+
closed: () => {
2235
this.cache.delete(key);
2336
disposer?.();
2437
}
2538
});
2639

27-
this.cache.set(key, q);
40+
watchedQuery.registerListener({
41+
subscriptionsChanged: (counts) => {
42+
// Dispose this query if there are no subscribers present
43+
if (counts.total == 0) {
44+
watchedQuery.close();
45+
this.cache.delete(key);
46+
}
47+
}
48+
});
49+
50+
this.cache.set(key, watchedQuery);
2851

29-
return q;
52+
return watchedQuery;
3053
}
3154
}
3255

0 commit comments

Comments
 (0)