Skip to content

Commit 7d1124c

Browse files
Update generic typing and APIs
1 parent 8874c75 commit 7d1124c

8 files changed

Lines changed: 367 additions & 393 deletions

File tree

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { throttleTrailing } from '../utils/async.js';
1919
import { mutexRunExclusive } from '../utils/mutex.js';
2020
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2121
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
22-
import { runOnSchemaChange } from './runOnSchemaChange.js';
2322
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter.js';
2423
import { CrudBatch } from './sync/bucket/CrudBatch.js';
2524
import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js';
@@ -34,8 +33,7 @@ import {
3433
type RequiredAdditionalConnectionOptions
3534
} from './sync/stream/AbstractStreamingSyncImplementation.js';
3635
import { WatchedQuery } from './watched/WatchedQuery.js';
37-
import { WatchedQueryImpl } from './watched/WatchedQueryImpl.js';
38-
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
36+
import { OnChangeQueryProcessor, WatchedQueryComparator } from './watched/processors/OnChangeQueryProcessor.js';
3937

4038
export interface DisconnectAndClearOptions {
4139
/** When set to false, data in local-only tables is preserved. */
@@ -89,6 +87,12 @@ export interface SQLWatchOptions {
8987
* Emits an empty result set immediately
9088
*/
9189
triggerImmediate?: boolean;
90+
91+
/**
92+
* Optional comparator which will be used to compare the results of the query.
93+
* The watched query will only yield results if the comparator returns false.
94+
*/
95+
comparator?: WatchedQueryComparator<QueryResult>;
9296
}
9397

9498
export interface WatchOnChangeEvent {
@@ -868,25 +872,28 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
868872
}
869873

870874
// TODO names
871-
incrementalWatch<T>(options: {
875+
incrementalWatch<DataType>(options: {
872876
sql: string;
873877
parameters?: any[];
874878
throttleMs?: number;
875-
queryExecutor?: () => Promise<T[]>;
879+
customExecutor?: {
880+
initialData: DataType;
881+
execute: () => Promise<DataType>;
882+
};
876883
reportFetching?: boolean;
877-
}): WatchedQuery<T> {
878-
return new WatchedQueryImpl({
879-
processor: new OnChangeQueryProcessor({
880-
db: this,
881-
compareBy: (item) => JSON.stringify(item), // TODO make configurable
882-
watchedQuery: {
883-
query: options.sql,
884-
parameters: options.parameters,
885-
throttleMs: options.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
886-
queryExecutor: options.queryExecutor,
887-
reportFetching: options.reportFetching
888-
}
889-
})
884+
}): WatchedQuery<DataType> {
885+
return new OnChangeQueryProcessor({
886+
db: this,
887+
comparator: {
888+
checkEquality: (a, b) => JSON.stringify(a) == JSON.stringify(b)
889+
},
890+
query: {
891+
sql: options.sql,
892+
parameters: options.parameters,
893+
throttleMs: options.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
894+
customExecutor: options.customExecutor,
895+
reportFetching: options.reportFetching
896+
}
890897
});
891898
}
892899

@@ -908,38 +915,42 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
908915
throw new Error('onResult is required');
909916
}
910917

911-
const watchQuery = async (abortSignal: AbortSignal) => {
912-
try {
913-
const resolvedTables = await this.resolveTables(sql, parameters, options);
914-
// Fetch initial data
915-
const result = await this.executeReadOnly(sql, parameters);
916-
onResult(result);
917-
918-
this.onChangeWithCallback(
919-
{
920-
onChange: async () => {
921-
try {
922-
const result = await this.executeReadOnly(sql, parameters);
923-
onResult(result);
924-
} catch (error) {
925-
onError?.(error);
926-
}
927-
},
928-
onError
918+
const watch = new OnChangeQueryProcessor({
919+
db: this,
920+
// Comparisons are disabled if no comparator is provided
921+
comparator: options?.comparator,
922+
query: {
923+
sql,
924+
parameters,
925+
throttleMs: options?.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
926+
reportFetching: false,
927+
// The default watch implementation returns QueryResult as the Data type
928+
customExecutor: {
929+
execute: async () => {
930+
return this.executeReadOnly(sql, parameters);
929931
},
930-
{
931-
...(options ?? {}),
932-
tables: resolvedTables,
933-
// Override the abort signal since we intercept it
934-
signal: abortSignal
935-
}
936-
);
937-
} catch (error) {
938-
onError?.(error);
932+
initialData: null
933+
}
939934
}
940-
};
935+
});
941936

942-
runOnSchemaChange(watchQuery, this, options);
937+
const dispose = watch.subscribe({
938+
onData: (data) => {
939+
if (!data) {
940+
// This should not happen. We only use null for the initial data.
941+
return;
942+
}
943+
onResult(data);
944+
},
945+
onError: (error) => {
946+
onError(error);
947+
}
948+
});
949+
950+
options?.signal?.addEventListener('abort', () => {
951+
dispose();
952+
watch.close();
953+
});
943954
}
944955

945956
/**
Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,43 @@
1-
import { DataStream } from '../../utils/DataStream.js';
2-
3-
export interface WatchedQueryState<T> {
1+
export interface WatchedQueryState<Data> {
2+
/**
3+
* Indicates the initial loading state (hard loading). Loading becomes false once the first set of results from the watched query is available or an error occurs.
4+
*/
45
isLoading: boolean;
6+
/**
7+
* Indicates whether the query is currently fetching data, is true during the initial load and any time when the query is re-evaluating (useful for large queries).
8+
*/
59
isFetching: boolean;
10+
/**
11+
* The last error that occurred while executing the query.
12+
*/
613
error: Error | null;
14+
/**
15+
* The last time the query was updated.
16+
*/
717
lastUpdated: Date | null;
8-
data: T[];
9-
}
10-
11-
/**
12-
* Performs underlaying watching and yields a stream of results.
13-
* @internal
14-
*/
15-
export interface WatchedQueryProcessor<T> {
16-
readonly state: WatchedQueryState<T>;
17-
18-
generateStream(): Promise<DataStream<WatchedQueryState<T>>>;
19-
20-
updateQuery(query: WatchedQueryOptions<T>): void;
18+
/**
19+
* The last data returned by the query.
20+
*/
21+
data: Data;
2122
}
2223

2324
/**
2425
* @internal
2526
*/
26-
export interface WatchedQueryOptions<T> {
27-
query: string;
27+
export interface WatchedQueryOptions<DataType> {
28+
sql: string;
2829
parameters?: any[];
2930
/** The minimum interval between queries. */
3031
throttleMs?: number;
31-
queryExecutor?: () => Promise<T[]>;
32+
/**
33+
* Optional query executor responsible for executing the query.
34+
* This can be used to return query results which are mapped from the database.
35+
* Often this is useful for ORM queries or other query builders.
36+
*/
37+
customExecutor?: {
38+
execute: () => Promise<DataType>;
39+
initialData: DataType;
40+
};
3241
/**
3342
* If true (default) the watched query will update its state to report
3443
* on the fetching state of the query.
@@ -38,9 +47,32 @@ export interface WatchedQueryOptions<T> {
3847
reportFetching?: boolean;
3948
}
4049

41-
export interface WatchedQuery<T> {
42-
readonly state: WatchedQueryState<T>;
43-
stream(): DataStream<WatchedQueryState<T>>;
44-
updateQuery(query: WatchedQueryOptions<T>): void;
45-
close(): void;
50+
export interface WatchedQuerySubscription<Data> {
51+
onData?: (data: Data) => void | Promise<void>;
52+
onError?: (error: Error) => void | Promise<void>;
53+
onStateChange?: (state: WatchedQueryState<Data>) => void | Promise<void>;
54+
}
55+
56+
export interface WatchedQuery<Data> {
57+
/**
58+
* Current state of the watched query.
59+
*/
60+
readonly state: WatchedQueryState<Data>;
61+
62+
/**
63+
* Subscribe to watched query events.
64+
* @returns A function to unsubscribe from the events.
65+
*/
66+
subscribe(subscription: WatchedQuerySubscription<Data>): () => void;
67+
68+
/**
69+
* Updates the underlaying query.
70+
* This will trigger a re-evaluation of the query and update the state.
71+
*/
72+
updateQuery(query: WatchedQueryOptions<Data>): Promise<void>;
73+
74+
/**
75+
* Close the watched query and end all subscriptions.
76+
*/
77+
close(): Promise<void>;
4678
}

packages/common/src/client/watched/WatchedQueryImpl.ts

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)