Skip to content

Commit a42db40

Browse files
committed
Scan bounded ranges.
1 parent 182bdea commit a42db40

2 files changed

Lines changed: 19 additions & 9 deletions

File tree

modules/module-slatedb-storage/src/storage/SlateDBKVStore.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ export class SlateDBKVStore implements AsyncDisposable {
156156
}
157157
}
158158

159-
scanPrefix<T = unknown>(prefix: SlateDBKey, options?: { limit?: number }): AsyncIterable<SlateDBEntry<T>> {
159+
scanPrefix<T = unknown>(
160+
prefix: SlateDBKey,
161+
options?: { limit?: number; startAfter?: SlateDBKey; endAt?: SlateDBKey }
162+
): AsyncIterable<SlateDBEntry<T>> {
160163
return scanPrefixFrom<T>(this.db, prefix, options);
161164
}
162165

@@ -269,7 +272,10 @@ export class SlateDBCheckpointReader {
269272
return getFrom<T>(this.reader, key);
270273
}
271274

272-
scanPrefix<T = unknown>(prefix: SlateDBKey, options?: { limit?: number }): AsyncIterable<SlateDBEntry<T>> {
275+
scanPrefix<T = unknown>(
276+
prefix: SlateDBKey,
277+
options?: { limit?: number; startAfter?: SlateDBKey; endAt?: SlateDBKey }
278+
): AsyncIterable<SlateDBEntry<T>> {
273279
return scanPrefixFrom<T>(this.reader, prefix, options);
274280
}
275281
}
@@ -306,9 +312,9 @@ async function getFrom<T>(reader: SlateDBReadable, key: SlateDBKey): Promise<T |
306312
async function* scanPrefixFrom<T>(
307313
reader: SlateDBReadable,
308314
prefix: SlateDBKey,
309-
options: { limit?: number } = {}
315+
options: { limit?: number; startAfter?: SlateDBKey; endAt?: SlateDBKey } = {}
310316
): AsyncIterable<SlateDBEntry<T>> {
311-
const iterator = await reader.scan_prefix(toKeyBytes(prefix), unboundedRange());
317+
const iterator = await reader.scan_prefix(toKeyBytes(prefix), scanRange(options));
312318
try {
313319
let count = 0;
314320
while (options.limit == null || count < options.limit) {
@@ -332,12 +338,12 @@ function toKeyBytes(key: SlateDBKey): Uint8Array {
332338
return typeof key == 'string' ? textEncoder.encode(key) : key;
333339
}
334340

335-
function unboundedRange(): KeyRange {
341+
function scanRange(options: { startAfter?: SlateDBKey; endAt?: SlateDBKey }): KeyRange {
336342
return {
337-
start: undefined,
343+
start: options.startAfter == null ? undefined : toKeyBytes(options.startAfter),
338344
start_inclusive: false,
339-
end: undefined,
340-
end_inclusive: false
345+
end: options.endAt == null ? undefined : toKeyBytes(options.endAt),
346+
end_inclusive: true
341347
};
342348
}
343349

modules/module-slatedb-storage/src/storage/SlateDBSyncBucketStorage.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ export class SlateDBSyncBucketStorage
220220
for (const request of dataBuckets) {
221221
let start = request.start;
222222
while (emitted < limit) {
223+
if (start >= checkpoint.checkpoint) {
224+
break;
225+
}
223226
const chunk: utils.SyncBucketData = {
224227
bucket: request.bucket,
225228
after: utils.internalToExternalOpId(start),
@@ -230,7 +233,8 @@ export class SlateDBSyncBucketStorage
230233
let chunkSizeBytes = 0;
231234

232235
for await (const entry of this.store.scanPrefix<BucketOpRecord>(
233-
storagePrefix('bucket-data', this.replicationStreamId, request.bucket)
236+
storagePrefix('bucket-data', this.replicationStreamId, request.bucket),
237+
{ startAfter: encodeOpId(start), endAt: encodeOpId(checkpoint.checkpoint) }
234238
)) {
235239
const op = BigInt(entry.value.op_id_bigint);
236240
if (op <= start || op > checkpoint.checkpoint) {

0 commit comments

Comments
 (0)