Skip to content

Commit 182bdea

Browse files
committed
Change flushing.
1 parent e4fb53e commit 182bdea

3 files changed

Lines changed: 32 additions & 4 deletions

File tree

modules/module-slatedb-storage/src/storage/SlateDBBucketStorageFactory.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export class SlateDBBucketStorageFactory extends storage.BucketStorageFactory {
103103
};
104104

105105
await putReplicationStreamRecord(store, record);
106+
await store.flush();
106107
return new SlateDBPersistedReplicationStream(store, record);
107108
}
108109

modules/module-slatedb-storage/src/storage/SlateDBKVStore.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
FlushType,
1111
KeyRange,
1212
ObjectStore,
13+
Settings,
1314
WriteBatch
1415
} from '@slatedb/uniffi';
1516
import fs from 'node:fs/promises';
@@ -59,6 +60,8 @@ type SlateDBReadable = Pick<Db | DbReader, 'get' | 'scan_prefix'>;
5960

6061
const DEFAULT_DB_PATH = 'powersync';
6162
const LOCAL_FILE_OBJECT_STORE_URL = 'file:///';
63+
const NON_DURABLE_WRITE_OPTIONS = { await_durable: false };
64+
const DURABLE_WRITE_OPTIONS = { await_durable: true };
6265
const textEncoder = new TextEncoder();
6366
const textDecoder = new TextDecoder();
6467

@@ -75,14 +78,19 @@ export class SlateDBKVStore implements AsyncDisposable {
7578
readonly dbPath: string,
7679
private readonly objectStore: ObjectStore,
7780
private readonly db: Db,
78-
private readonly admin: Admin
81+
private readonly admin: Admin,
82+
private durabilityBarrierId = 0
7983
) {}
8084

8185
static async open(options: SlateDBKVStoreOptions): Promise<SlateDBKVStore> {
8286
const { objectStoreUrl, dbPath } = await resolveObjectStoreOptions(options);
8387
const objectStore = ObjectStore.resolve(objectStoreUrl);
8488

89+
const settings = Settings.load();
90+
settings.set('flush_interval', '"50ms"');
91+
8592
const dbBuilder = new DbBuilder(dbPath, objectStore);
93+
dbBuilder.with_settings(settings);
8694
const adminBuilder = new AdminBuilder(dbPath, objectStore);
8795
try {
8896
const db = await dbBuilder.build();
@@ -99,11 +107,11 @@ export class SlateDBKVStore implements AsyncDisposable {
99107
}
100108

101109
async put(key: SlateDBKey, value: unknown): Promise<void> {
102-
await this.db.put(toKeyBytes(key), encodeValue(value));
110+
await this.write([{ type: 'put', key, value }]);
103111
}
104112

105113
async delete(key: SlateDBKey): Promise<void> {
106-
await this.db.delete(toKeyBytes(key));
114+
await this.write([{ type: 'delete', key }]);
107115
}
108116

109117
async deletePrefix(prefix: SlateDBKey, options: { limit?: number } = {}): Promise<number> {
@@ -129,7 +137,20 @@ export class SlateDBKVStore implements AsyncDisposable {
129137
batch.delete(toKeyBytes(operation.key));
130138
}
131139
}
132-
await this.db.write(batch);
140+
await this.db.write_with_options(batch, NON_DURABLE_WRITE_OPTIONS);
141+
} finally {
142+
batch.dispose();
143+
}
144+
}
145+
146+
async flush(): Promise<void> {
147+
const batch = new WriteBatch();
148+
try {
149+
batch.put(
150+
toKeyBytes(storageKey('meta', 'durability-barrier')),
151+
encodeValue({ id: ++this.durabilityBarrierId, flushed_at: new Date().toISOString() })
152+
);
153+
await this.db.write_with_options(batch, DURABLE_WRITE_OPTIONS);
133154
} finally {
134155
batch.dispose();
135156
}
@@ -142,6 +163,7 @@ export class SlateDBKVStore implements AsyncDisposable {
142163
async createCheckpoint(
143164
options: { name?: string; lifetimeMs?: number | bigint } = {}
144165
): Promise<CheckpointCreateResult> {
166+
await this.flush();
145167
await this.db.flush_with_options({ flush_type: FlushType.MemTable });
146168
return this.admin.create_detached_checkpoint({
147169
name: options.name,

modules/module-slatedb-storage/src/storage/SlateDBSyncBucketStorage.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ class SlateDBBucketBatch
386386

387387
async flush(): Promise<storage.FlushedResult | null> {
388388
if (this.pending.length == 0) {
389+
await this.storage.store.flush();
389390
return null;
390391
}
391392
if (!this.options.storeCurrentData) {
@@ -396,6 +397,7 @@ class SlateDBBucketBatch
396397
}
397398
this.pending = [];
398399
await this.flushPendingWrites();
400+
await this.storage.store.flush();
399401
return this.currentFlushResult();
400402
}
401403

@@ -417,6 +419,7 @@ class SlateDBBucketBatch
417419
keepalive_op: maxOpId(keepaliveOp, persistedOp).toString(),
418420
last_keepalive_ts: new Date().toISOString()
419421
});
422+
await this.storage.store.flush();
420423
return { checkpointBlocked: true, checkpointCreated: false };
421424
}
422425

@@ -438,6 +441,7 @@ class SlateDBBucketBatch
438441
update.last_checkpoint_ts = new Date().toISOString();
439442
}
440443
await this.storage.updateRecord(update);
444+
await this.storage.store.flush();
441445
return { checkpointBlocked: false, checkpointCreated };
442446
}
443447

@@ -584,6 +588,7 @@ class SlateDBBucketBatch
584588
});
585589
}
586590
await this.storage.store.write(writes);
591+
await this.storage.store.flush();
587592
return this.last_flushed_op == null ? null : { flushed_op: this.last_flushed_op };
588593
}
589594

0 commit comments

Comments
 (0)