-
Notifications
You must be signed in to change notification settings - Fork 75
Expand file tree
/
Copy pathRustClientInterceptor.ts
More file actions
126 lines (113 loc) · 4.47 KB
/
Copy pathRustClientInterceptor.ts
File metadata and controls
126 lines (113 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import {
AbstractPowerSyncDatabase,
AbstractRemote,
BucketChecksum,
Checkpoint,
ColumnType,
DBAdapter,
isStreamingSyncCheckpoint,
isStreamingSyncCheckpointDiff,
isStreamingSyncData,
PowerSyncControlCommand,
SqliteBucketStorage,
StreamingSyncLine,
SyncDataBucket
} from '@powersync/web';
import { DynamicSchemaManager } from './DynamicSchemaManager';
/**
* Tracks per-byte and per-operation progress for the Rust client.
*
* While per-operation progress is reported by the SDK as well, we need those counters for each bucket. Since that
* information is internal to the Rust client and inaccessible to JavaScript, this intercepts the raw
* `powersync_control` calls to decode sync lines and derive progress information.
*/
export class RustClientInterceptor extends SqliteBucketStorage {
private rdb: DBAdapter;
private lastStartedCheckpoint: Checkpoint | null = null;
public tables: Record<string, Record<string, ColumnType>> = {};
constructor(
db: DBAdapter,
private remote: AbstractRemote,
private schemaManager: DynamicSchemaManager
) {
super(db, (AbstractPowerSyncDatabase as any).transactionMutex);
this.rdb = db;
}
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
const response = await super.control(op, payload);
if (op == PowerSyncControlCommand.PROCESS_TEXT_LINE) {
await this.processTextLine(payload as string);
} else if (op == PowerSyncControlCommand.PROCESS_BSON_LINE) {
await this.processBinaryLine(payload as Uint8Array);
}
return response;
}
private processTextLine(line: string) {
return this.processParsedLine(JSON.parse(line));
}
private async processBinaryLine(line: Uint8Array) {
const bson = await this.remote.getBSON();
await this.processParsedLine(bson.deserialize(line) as StreamingSyncLine);
}
private async processParsedLine(line: StreamingSyncLine) {
if (isStreamingSyncCheckpoint(line)) {
this.lastStartedCheckpoint = line.checkpoint;
await this.trackCheckpoint(line.checkpoint);
} else if (isStreamingSyncCheckpointDiff(line) && this.lastStartedCheckpoint) {
const diff = line.checkpoint_diff;
const newBuckets = new Map<string, BucketChecksum>();
for (const checksum of this.lastStartedCheckpoint.buckets) {
newBuckets.set(checksum.bucket, checksum);
}
for (const checksum of diff.updated_buckets) {
newBuckets.set(checksum.bucket, checksum);
}
for (const bucket of diff.removed_buckets) {
newBuckets.delete(bucket);
}
const newCheckpoint: Checkpoint = {
last_op_id: diff.last_op_id,
buckets: [...newBuckets.values()],
write_checkpoint: diff.write_checkpoint
};
this.lastStartedCheckpoint = newCheckpoint;
await this.trackCheckpoint(newCheckpoint);
} else if (isStreamingSyncData(line)) {
const batch = { buckets: [SyncDataBucket.fromRow(line.data)] };
await this.rdb.writeTransaction(async (tx) => {
for (const bucket of batch.buckets) {
// Record metrics
const size = JSON.stringify(bucket.data).length;
await tx.execute(
`UPDATE local_bucket_data SET
download_size = IFNULL(download_size, 0) + ?,
last_op = ?,
downloading = ?,
downloaded_operations = IFNULL(downloaded_operations, 0) + ?
WHERE id = ?`,
[size, bucket.next_after, bucket.has_more, bucket.data.length, bucket.bucket]
);
}
});
await this.schemaManager.updateFromOperations(batch);
}
}
private async trackCheckpoint(checkpoint: Checkpoint) {
await this.rdb.writeTransaction(async (tx) => {
for (const bucket of checkpoint.buckets) {
await tx.execute(
`INSERT OR REPLACE INTO local_bucket_data(id, total_operations, last_op, download_size, downloading, downloaded_operations)
VALUES (
?,
?,
IFNULL((SELECT last_op FROM local_bucket_data WHERE id = ?), '0'),
IFNULL((SELECT download_size FROM local_bucket_data WHERE id = ?), 0),
IFNULL((SELECT downloading FROM local_bucket_data WHERE id = ?), TRUE),
IFNULL((SELECT downloaded_operations FROM local_bucket_data WHERE id = ?), TRUE)
)`,
[bucket.bucket, bucket.count, bucket.bucket, bucket.bucket, bucket.bucket, bucket.bucket]
);
}
});
}
}