Skip to content

Commit 8874c75

Browse files
cleanup api
1 parent 7b15fc3 commit 8874c75

10 files changed

Lines changed: 169 additions & 263 deletions

File tree

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ import {
3535
} from './sync/stream/AbstractStreamingSyncImplementation.js';
3636
import { WatchedQuery } from './watched/WatchedQuery.js';
3737
import { WatchedQueryImpl } from './watched/WatchedQueryImpl.js';
38-
import { ComparisonQueryProcessor } from './watched/processors/comparison/ComparisonQueryProcessor.js';
39-
import { InlineWatchComparator } from './watched/processors/comparison/WatchComparator.js';
38+
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
4039

4140
export interface DisconnectAndClearOptions {
4241
/** When set to false, data in local-only tables is preserved. */
@@ -877,12 +876,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
877876
reportFetching?: boolean;
878877
}): WatchedQuery<T> {
879878
return new WatchedQueryImpl({
880-
processor: new ComparisonQueryProcessor({
879+
processor: new OnChangeQueryProcessor({
881880
db: this,
882-
comparator: new InlineWatchComparator({
883-
hash: (row) => JSON.stringify(row),
884-
identify: (row) => (row as any).id ?? JSON.stringify(row) // TODO
885-
}),
881+
compareBy: (item) => JSON.stringify(item), // TODO make configurable
886882
watchedQuery: {
887883
query: options.sql,
888884
parameters: options.parameters,

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
import { DataStream } from '../../utils/DataStream.js';
2-
import { WatchedQueryResult } from './WatchedQueryResult.js';
32

43
export interface WatchedQueryState<T> {
5-
loading: boolean;
6-
fetching: boolean;
4+
isLoading: boolean;
5+
isFetching: boolean;
76
error: Error | null;
87
lastUpdated: Date | null;
9-
data: WatchedQueryResult<T>;
8+
data: T[];
109
}
1110

1211
/**
1312
* Performs underlaying watching and yields a stream of results.
13+
* @internal
1414
*/
1515
export interface WatchedQueryProcessor<T> {
1616
readonly state: WatchedQueryState<T>;
@@ -20,6 +20,9 @@ export interface WatchedQueryProcessor<T> {
2020
updateQuery(query: WatchedQueryOptions<T>): void;
2121
}
2222

23+
/**
24+
* @internal
25+
*/
2326
export interface WatchedQueryOptions<T> {
2427
query: string;
2528
parameters?: any[];

packages/common/src/client/watched/WatchedQueryImpl.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@ export interface WatchedQueryImplOptions<T> {
88

99
export class WatchedQueryImpl<T> implements WatchedQuery<T> {
1010
protected lazyStreamPromise: Promise<DataStream<WatchedQueryState<T>>>;
11+
protected _stream: DataStream<WatchedQueryState<T>> | null;
1112

1213
constructor(protected options: WatchedQueryImplOptions<T>) {
13-
this.lazyStreamPromise = this.options.processor.generateStream();
14+
this._stream = null;
15+
this.lazyStreamPromise = this.options.processor.generateStream().then((s) => {
16+
this._stream = s;
17+
return s;
18+
});
1419
}
1520

1621
get state() {
@@ -67,9 +72,13 @@ export class WatchedQueryImpl<T> implements WatchedQuery<T> {
6772
}
6873

6974
close(): void {
75+
if (this._stream) {
76+
this._stream.close().catch(() => {});
77+
return;
78+
}
7079
this.lazyStreamPromise
71-
.then((s) => {
72-
s.close();
80+
.then(async (s) => {
81+
await s.close();
7382
})
7483
.catch(() => {
7584
// In rare cases where the DB might be closed before the stream is created

packages/common/src/client/watched/WatchedQueryResult.ts

Lines changed: 0 additions & 30 deletions
This file was deleted.

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,11 @@ export abstract class AbstractQueryProcessor<T>
3737
implements WatchedQueryProcessor<T>
3838
{
3939
readonly state: WatchedQueryState<T> = {
40-
loading: true,
41-
fetching: true,
40+
isLoading: true,
41+
isFetching: true,
4242
error: null,
4343
lastUpdated: null,
44-
data: {
45-
all: [],
46-
delta: () => ({ added: [], removed: [], unchanged: [], updated: [] })
47-
}
44+
data: []
4845
};
4946

5047
protected _stream: DataStream<WatchedQueryState<T>> | null;
@@ -78,7 +75,7 @@ export abstract class AbstractQueryProcessor<T>
7875
protected abstract linkStream(options: LinkQueryStreamOptions<T>): Promise<void>;
7976

8077
protected updateState(update: Partial<WatchedQueryState<T>>) {
81-
Object.assign(this.state, update);
78+
Object.assign(this.state, { lastUpdated: new Date() } satisfies Partial<WatchedQueryState<T>>, update);
8279

8380
if (this._stream?.closed) {
8481
// Don't enqueue data in a closed stream.

packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,52 @@
11
import { WatchedQueryState } from '../WatchedQuery.js';
2-
import { WatchedQueryResult } from '../WatchedQueryResult.js';
3-
import { AbstractQueryProcessor, LinkQueryStreamOptions } from './AbstractQueryProcessor.js';
2+
import {
3+
AbstractQueryProcessor,
4+
AbstractQueryProcessorOptions,
5+
LinkQueryStreamOptions
6+
} from './AbstractQueryProcessor.js';
7+
8+
export interface OnChangeQueryProcessorOptions<T> extends AbstractQueryProcessorOptions<T> {
9+
compareBy?: (element: T) => string;
10+
}
411

512
/**
613
* Uses the PowerSync onChange event to trigger watched queries.
714
* Results are emitted on every change of the relevant tables.
815
*/
916
export class OnChangeQueryProcessor<T> extends AbstractQueryProcessor<T> {
10-
/**
11-
* Always returns the result set on every onChange event. Deltas are not supported by this processor.
17+
constructor(protected options: OnChangeQueryProcessorOptions<T>) {
18+
super(options);
19+
}
20+
21+
/*
22+
* @returns If the sets are equal
1223
*/
13-
protected processResultSet(result: T[]): WatchedQueryResult<T> | null {
14-
return {
15-
all: result,
16-
delta: () => {
17-
throw new Error('Delta not implemented for OnChangeQueryProcessor');
24+
protected checkEquality(current: T[], previous: T[]): boolean {
25+
if (current.length == 0 && previous.length == 0) {
26+
return true;
27+
}
28+
29+
if (current.length !== previous.length) {
30+
return false;
31+
}
32+
33+
const { compareBy } = this.options;
34+
// Assume items are not equal if we can't compare them
35+
if (!compareBy) {
36+
return false;
37+
}
38+
39+
// At this point the lengths are equal
40+
for (let i = 0; i < current.length; i++) {
41+
const currentItem = compareBy(current[i]);
42+
const previousItem = compareBy(previous[i]);
43+
44+
if (currentItem !== previousItem) {
45+
return false;
1846
}
19-
};
47+
}
48+
49+
return true;
2050
}
2151

2252
protected async linkStream(options: LinkQueryStreamOptions<T>): Promise<void> {
@@ -31,7 +61,7 @@ export class OnChangeQueryProcessor<T> extends AbstractQueryProcessor<T> {
3161
// This fires for each change of the relevant tables
3262
try {
3363
if (this.reportFetching) {
34-
this.updateState({ fetching: true });
64+
this.updateState({ isFetching: true });
3565
}
3666

3767
const partialStateUpdate: Partial<WatchedQueryState<T>> = {};
@@ -42,18 +72,16 @@ export class OnChangeQueryProcessor<T> extends AbstractQueryProcessor<T> {
4272
: await db.getAll<T>(watchedQuery.query, watchedQuery.parameters);
4373

4474
if (this.reportFetching) {
45-
partialStateUpdate.fetching = false;
75+
partialStateUpdate.isFetching = false;
4676
}
4777

48-
if (this.state.loading) {
49-
partialStateUpdate.loading = false;
78+
if (this.state.isLoading) {
79+
partialStateUpdate.isLoading = false;
5080
}
5181

5282
// Check if the result has changed
53-
const watchedQueryResult = this.processResultSet(result);
54-
if (watchedQueryResult) {
55-
partialStateUpdate.data = watchedQueryResult;
56-
partialStateUpdate.lastUpdated = new Date();
83+
if (!this.checkEquality(result, this.state.data)) {
84+
partialStateUpdate.data = result;
5785
}
5886

5987
if (Object.keys(partialStateUpdate).length > 0) {

packages/common/src/client/watched/processors/comparison/ComparisonQueryProcessor.ts

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)