Skip to content

Commit d0fe877

Browse files
committed
it works!
1 parent 840682d commit d0fe877

1 file changed

Lines changed: 81 additions & 7 deletions

File tree

modules/module-slatedb-storage/src/storage/SlateDBSyncBucketStorage.ts

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { storage, utils } from '@powersync/service-core';
33
import { JSONBig } from '@powersync/service-jsonbig';
44
import { HydratedSyncConfig } from '@powersync/service-sync-rules';
55
import { SlateDBBucketStorageFactory } from './SlateDBBucketStorageFactory.js';
6-
import { encodeOpId, SlateDBKVStore, storageKey, storagePrefix } from './SlateDBKVStore.js';
6+
import { encodeOpId, SlateDBKVStore, storageKey, storagePrefix, type SlateDBWriteOperation } from './SlateDBKVStore.js';
77
import {
88
getReplicationStreamRecord,
99
putReplicationStreamRecord,
@@ -20,6 +20,7 @@ interface SourceTableRecord {
2020
objectId: string | number | undefined;
2121
replicaIdColumns: storage.ColumnDescriptor[];
2222
snapshotComplete: boolean;
23+
snapshotStatus?: storage.TableSnapshotStatus;
2324
}
2425

2526
interface CurrentDataRecord {
@@ -182,16 +183,19 @@ export class SlateDBSyncBucketStorage
182183
async *watchCheckpointChanges(
183184
options: storage.WatchWriteCheckpointOptions
184185
): AsyncIterable<storage.StorageCheckpointUpdate> {
185-
let last = 0n;
186+
let last: bigint | null = null;
186187
while (!options.signal.aborted) {
187188
const base = await this.getCheckpoint();
188-
if (base.checkpoint > last) {
189-
const previous = new SlateDBReplicationCheckpoint(last, null);
189+
if (last == null || base.checkpoint > last) {
190+
const previous = last == null ? null : new SlateDBReplicationCheckpoint(last, null);
190191
last = base.checkpoint;
191192
yield {
192193
base,
193194
writeCheckpoint: null,
194-
update: await this.getCheckpointChanges({ lastCheckpoint: previous, nextCheckpoint: base })
195+
update:
196+
previous == null
197+
? storage.CHECKPOINT_INVALIDATE_ALL
198+
: await this.getCheckpointChanges({ lastCheckpoint: previous, nextCheckpoint: base })
195199
};
196200
}
197201
await new Promise((resolve) => setTimeout(resolve, 1000));
@@ -357,6 +361,9 @@ class SlateDBBucketBatch
357361
if (this.pending.length == 0) {
358362
return null;
359363
}
364+
if (!this.options.storeCurrentData) {
365+
return this.flushWithoutCurrentData();
366+
}
360367
for (const record of this.pending) {
361368
await this.persistRecord(record);
362369
}
@@ -464,7 +471,7 @@ class SlateDBBucketBatch
464471
objectId: source.objectId,
465472
connectionTag: source.connectionTag,
466473
replicaIdColumns: source.replicaIdColumns,
467-
snapshotComplete: true
474+
snapshotComplete: false
468475
};
469476
await this.storage.store.put(key, record);
470477
}
@@ -475,6 +482,64 @@ class SlateDBBucketBatch
475482
// Ignored for the POC.
476483
}
477484

485+
private async flushWithoutCurrentData(): Promise<storage.FlushedResult | null> {
486+
const bucketOps: Array<Omit<BucketOpRecord, 'op_id' | 'op_id_bigint'>> = [];
487+
for (const record of this.pending) {
488+
if (record.tag == storage.SaveOperationTag.DELETE) {
489+
continue;
490+
}
491+
const { results } = this.parsed.evaluateRowWithErrors({
492+
record: record.after as any,
493+
sourceTable: record.sourceTable.ref,
494+
bucketDataSources: record.sourceTable.bucketDataSources
495+
});
496+
for (const row of results) {
497+
const data = JSONBig.stringify(row.data);
498+
bucketOps.push({
499+
bucket: row.bucket,
500+
op: 'PUT',
501+
object_type: row.table,
502+
object_id: row.id,
503+
data,
504+
checksum: utils.hashData(row.table, row.id, data)
505+
});
506+
}
507+
}
508+
509+
this.pending = [];
510+
if (bucketOps.length == 0) {
511+
return null;
512+
}
513+
514+
const firstOp = await this.reserveOps(bucketOps.length);
515+
const writes: SlateDBWriteOperation[] = [];
516+
const bucketStates = new Map<string, string>();
517+
for (const [index, operation] of bucketOps.entries()) {
518+
const op = firstOp + BigInt(index);
519+
const record: BucketOpRecord = {
520+
...operation,
521+
op_id: utils.internalToExternalOpId(op),
522+
op_id_bigint: op.toString()
523+
};
524+
writes.push({
525+
type: 'put',
526+
key: bucketDataKey(this.storage.replicationStreamId, operation.bucket, op),
527+
value: record
528+
});
529+
bucketStates.set(operation.bucket, op.toString());
530+
this.last_flushed_op = op;
531+
}
532+
for (const [bucket, last_op] of bucketStates) {
533+
writes.push({
534+
type: 'put',
535+
key: storageKey('bucket-state', this.storage.replicationStreamId, bucket),
536+
value: { bucket, last_op }
537+
});
538+
}
539+
await this.storage.store.write(writes);
540+
return this.last_flushed_op == null ? null : { flushed_op: this.last_flushed_op };
541+
}
542+
478543
private async persistRecord(record: storage.SaveOptions): Promise<void> {
479544
const sourceKey = sourceRecordId(record);
480545
const currentKey = currentDataKey(this.storage.replicationStreamId, record.sourceTable.id.toString(), sourceKey);
@@ -547,6 +612,13 @@ class SlateDBBucketBatch
547612
return next;
548613
}
549614

615+
private async reserveOps(count: number): Promise<bigint> {
616+
const record = await this.storage.getRecord();
617+
const next = BigInt(record.next_op_id ?? '1');
618+
await this.storage.updateRecord({ next_op_id: (next + BigInt(count)).toString() });
619+
return next;
620+
}
621+
550622
private async saveSourceTable(table: storage.SourceTable): Promise<void> {
551623
await this.storage.store.put(sourceTableKey(this.storage.replicationStreamId, table.id.toString()), {
552624
id: table.id.toString(),
@@ -556,7 +628,8 @@ class SlateDBBucketBatch
556628
objectId: table.objectId,
557629
connectionTag: table.ref.connectionTag,
558630
replicaIdColumns: table.replicaIdColumns,
559-
snapshotComplete: table.snapshotComplete
631+
snapshotComplete: table.snapshotComplete,
632+
snapshotStatus: table.snapshotStatus
560633
} satisfies SourceTableRecord);
561634
}
562635

@@ -570,6 +643,7 @@ class SlateDBBucketBatch
570643
snapshotComplete: record.snapshotComplete,
571644
...this.parsed.getMatchingSources(ref)
572645
});
646+
table.snapshotStatus = record.snapshotStatus;
573647
table.syncData = table.bucketDataSources.length > 0;
574648
table.syncParameters = false;
575649
table.syncEvent = this.parsed.tableTriggersEvent(ref);

0 commit comments

Comments
 (0)