Skip to content

Commit de20c96

Browse files
committed
Batch removeItem/removeItems through WriteBuffer as tombstones
1 parent 1f300d3 commit de20c96

3 files changed

Lines changed: 109 additions & 31 deletions

File tree

lib/storage/NativeFlushWorker.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,17 @@ function createNativeFlushWorker(bufferStore: BufferStore) {
6767
return;
6868
}
6969

70-
// Separate SET and MERGE entries for the provider
70+
// Separate SET, MERGE, and REMOVE entries for the provider
7171
const setPairs: StorageKeyValuePair[] = [];
7272
const mergePairs: StorageKeyValuePair[] = [];
73+
const removeKeys: string[] = [];
7374

7475
for (const [, entry] of entries) {
7576
const typedEntry = entry as BufferEntry;
7677
if (typedEntry.entryType === 'set') {
7778
setPairs.push([typedEntry.key, typedEntry.value]);
79+
} else if (typedEntry.entryType === 'remove') {
80+
removeKeys.push(typedEntry.key);
7881
} else {
7982
mergePairs.push([typedEntry.key, typedEntry.value, typedEntry.replaceNullPatches]);
8083
}
@@ -88,6 +91,9 @@ function createNativeFlushWorker(bufferStore: BufferStore) {
8891
if (mergePairs.length > 0) {
8992
nativeSQLiteProvider.multiMerge(mergePairs);
9093
}
94+
if (removeKeys.length > 0) {
95+
nativeSQLiteProvider.removeItems(removeKeys);
96+
}
9197
} catch (error) {
9298
console.error('[Onyx] NativeFlushWorker: flush error:', error);
9399
}

lib/storage/WriteBuffer.ts

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import utils from '../utils';
2727
import type {StorageKeyValuePair} from './providers/types';
2828
import type BufferStore from './BufferStore/types';
2929

30-
type EntryType = 'set' | 'merge';
30+
type EntryType = 'set' | 'merge' | 'remove';
3131

3232
type BufferEntry = {
3333
key: OnyxKey;
@@ -36,10 +36,11 @@ type BufferEntry = {
3636
replaceNullPatches?: FastMergeReplaceNullPatch[];
3737
};
3838

39-
/** Flush handlers for the two entry types. */
39+
/** Flush handlers for the three entry types. */
4040
type FlushHandlers = {
4141
multiSet: (pairs: StorageKeyValuePair[]) => Promise<void>;
4242
multiMerge: (pairs: StorageKeyValuePair[]) => Promise<void>;
43+
multiRemove: (keys: OnyxKey[]) => Promise<void>;
4344
};
4445

4546
/**
@@ -73,7 +74,9 @@ const FLUSH_TIMEOUT_MS = 200;
7374
/** Default web flush scheduler using requestIdleCallback with setTimeout fallback. */
7475
function defaultScheduleFlush(doFlush: () => void): number | null {
7576
if (typeof requestIdleCallback === 'function') {
76-
return requestIdleCallback(doFlush, {timeout: FLUSH_TIMEOUT_MS}) as unknown as number;
77+
return requestIdleCallback(doFlush, {
78+
timeout: FLUSH_TIMEOUT_MS,
79+
}) as unknown as number;
7780
}
7881
return setTimeout(doFlush, FLUSH_TIMEOUT_MS) as unknown as number;
7982
}
@@ -144,20 +147,38 @@ class WriteBuffer {
144147
* - No existing entry: create a MERGE entry with just the patch
145148
* - Existing SET entry: apply patch to the full value in-memory, keep as SET
146149
* - Existing MERGE entry: merge patches together, keep as MERGE
150+
* - Existing REMOVE (tombstone): the underlying value will be null after flush,
151+
* so merging into null is just the patch itself -- replace tombstone with SET
147152
*/
148153
merge(key: OnyxKey, patch: OnyxValue<OnyxKey>, replaceNullPatches?: FastMergeReplaceNullPatch[]): void {
149154
const existing = this.store.get(key);
150155

151156
if (!existing) {
152157
// No pending write -- stage as a MERGE entry (just the patch)
153-
this.store.set(key, {key, value: patch, entryType: 'merge', replaceNullPatches});
158+
this.store.set(key, {
159+
key,
160+
value: patch,
161+
entryType: 'merge',
162+
replaceNullPatches,
163+
});
164+
} else if (existing.entryType === 'remove') {
165+
// Pending REMOVE -- merging into null is just the patch itself, replace with SET
166+
this.store.set(key, {
167+
key,
168+
value: patch,
169+
entryType: 'set',
170+
});
154171
} else if (existing.entryType === 'set') {
155172
// Pending SET -- apply patch to the full value, stay as SET
156173
const {result: merged} = utils.fastMerge(existing.value as Record<string, unknown>, patch as Record<string, unknown>, {
157174
shouldRemoveNestedNulls: true,
158175
objectRemovalMode: 'replace',
159176
});
160-
this.store.set(key, {key, value: merged as OnyxValue<OnyxKey>, entryType: 'set'});
177+
this.store.set(key, {
178+
key,
179+
value: merged as OnyxValue<OnyxKey>,
180+
entryType: 'set',
181+
});
161182
} else {
162183
// Pending MERGE -- merge patches together, stay as MERGE
163184
const {result: mergedPatch} = utils.fastMerge(existing.value as Record<string, unknown>, patch as Record<string, unknown>, {
@@ -176,20 +197,32 @@ class WriteBuffer {
176197
}
177198

178199
/**
179-
* Remove a key from the write buffer. This is used when a key is being
180-
* removed from storage entirely (not just set to null).
200+
* Stage a tombstone for a key (REMOVE entry). A pending SET or MERGE is
201+
* discarded; the tombstone wins and is flushed via multiRemove. Scheduling
202+
* a flush mirrors set/merge so deletes are batched off the worker hot path.
181203
*/
182204
remove(key: OnyxKey): void {
183-
this.store.delete(key);
205+
this.store.set(key, {
206+
key,
207+
value: null as OnyxValue<OnyxKey>,
208+
entryType: 'remove',
209+
});
210+
this.scheduleFlush();
184211
}
185212

186213
/**
187-
* Remove multiple keys from the write buffer.
214+
* Stage tombstones for multiple keys (all REMOVE entries). A single flush
215+
* is scheduled regardless of how many keys are removed.
188216
*/
189217
removeMany(keys: OnyxKey[]): void {
190218
for (const key of keys) {
191-
this.store.delete(key);
219+
this.store.set(key, {
220+
key,
221+
value: null as OnyxValue<OnyxKey>,
222+
entryType: 'remove',
223+
});
192224
}
225+
this.scheduleFlush();
193226
}
194227

195228
/**
@@ -217,12 +250,21 @@ class WriteBuffer {
217250
}
218251

219252
/**
220-
* Check whether the key has any pending entry (SET or MERGE).
253+
* Check whether the key has any pending entry (SET, MERGE, or REMOVE).
221254
*/
222255
hasAny(key: OnyxKey): boolean {
223256
return this.store.has(key);
224257
}
225258

259+
/**
260+
* Inspect the pending entry type for a key without touching it. Used by the
261+
* storage facade to disambiguate reads: SET serves from buffer, REMOVE
262+
* returns null immediately, MERGE forces a flush before reading the provider.
263+
*/
264+
peekEntryType(key: OnyxKey): EntryType | undefined {
265+
return this.store.get(key)?.entryType;
266+
}
267+
226268
/**
227269
* Returns the number of pending entries (both SET and MERGE).
228270
*/
@@ -277,11 +319,16 @@ class WriteBuffer {
277319
const setSnapshot = new Map<OnyxKey, BufferEntry>();
278320
const mergePairs: StorageKeyValuePair[] = [];
279321
const mergeKeys: OnyxKey[] = [];
322+
const removeKeys: OnyxKey[] = [];
323+
const removeSnapshot = new Map<OnyxKey, BufferEntry>();
280324

281325
for (const [key, entry] of this.store.entries()) {
282326
if (entry.entryType === 'set') {
283327
setPairs.push([entry.key, entry.value]);
284328
setSnapshot.set(key, entry);
329+
} else if (entry.entryType === 'remove') {
330+
removeKeys.push(entry.key);
331+
removeSnapshot.set(key, entry);
285332
} else {
286333
mergePairs.push([entry.key, entry.value, entry.replaceNullPatches]);
287334
mergeKeys.push(key);
@@ -294,14 +341,17 @@ class WriteBuffer {
294341
this.store.delete(key);
295342
}
296343

297-
// Flush both types concurrently
344+
// Flush all three types concurrently
298345
const promises: Array<Promise<void>> = [];
299346
if (setPairs.length > 0) {
300347
promises.push(this.handlers.multiSet(setPairs));
301348
}
302349
if (mergePairs.length > 0) {
303350
promises.push(this.handlers.multiMerge(mergePairs));
304351
}
352+
if (removeKeys.length > 0) {
353+
promises.push(this.handlers.multiRemove(removeKeys));
354+
}
305355

306356
await Promise.all(promises);
307357

@@ -312,6 +362,14 @@ class WriteBuffer {
312362
this.store.delete(key);
313363
}
314364
}
365+
366+
// Same reference-identity check for REMOVE tombstones: a set/merge
367+
// arriving during flush replaces the entry and must not be erased.
368+
for (const [key, flushedEntry] of removeSnapshot) {
369+
if (this.store.get(key) === flushedEntry) {
370+
this.store.delete(key);
371+
}
372+
}
315373
} finally {
316374
this.isFlushing = false;
317375
}

lib/storage/index.ts

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const writeBuffer = new WriteBuffer({
7070
handlers: {
7171
multiSet: (pairs) => provider.multiSet(pairs),
7272
multiMerge: (pairs) => provider.multiMerge(pairs),
73+
multiRemove: (keys) => provider.removeItems(keys),
7374
},
7475
store: createBufferStore(),
7576
});
@@ -104,18 +105,23 @@ const storage: Storage = {
104105
/**
105106
* Get the value of a given key or return `null` if it's not available.
106107
*
107-
* - If the key has a pending SET entry, it is returned directly from memory.
108-
* - If the key has a pending MERGE entry (a patch delta, not a full value),
109-
* the write buffer is flushed first so the provider has the correct merged
110-
* value on disk, then the read proceeds normally.
111-
* - Otherwise, the read goes straight to the provider.
108+
* - SET entry: served directly from memory.
109+
* - REMOVE entry (tombstone): returns null immediately. The on-disk value
110+
* will be gone once the buffer flushes; callers see the post-remove view.
111+
* - MERGE entry: the patch is incomplete, so the buffer is flushed first,
112+
* then the merged value is read back from the provider.
113+
* - No pending entry: read straight from the provider.
112114
*/
113115
getItem: <TKey extends OnyxKey>(key: TKey) =>
114116
tryOrDegradePerformance<OnyxValue<TKey>>(() => {
115-
if (writeBuffer.has(key)) {
117+
const entryType = writeBuffer.peekEntryType(key);
118+
if (entryType === 'set') {
116119
return Promise.resolve(writeBuffer.get(key) as OnyxValue<TKey>);
117120
}
118-
if (writeBuffer.hasAny(key)) {
121+
if (entryType === 'remove') {
122+
return Promise.resolve(null as unknown as OnyxValue<TKey>);
123+
}
124+
if (entryType === 'merge') {
119125
return writeBuffer.flushNow().then(() => provider.getItem(key));
120126
}
121127
return provider.getItem(key);
@@ -131,23 +137,30 @@ const storage: Storage = {
131137
multiGet: (keys) =>
132138
tryOrDegradePerformance(() => {
133139
const bufferedKeys: string[] = [];
140+
const tombstoneKeys: string[] = [];
134141
const cleanKeys: string[] = [];
135142
let hasPendingMerges = false;
136143

137144
for (const key of keys) {
138-
if (writeBuffer.has(key)) {
145+
const entryType = writeBuffer.peekEntryType(key);
146+
if (entryType === 'set') {
139147
bufferedKeys.push(key);
148+
} else if (entryType === 'remove') {
149+
tombstoneKeys.push(key);
140150
} else {
141151
cleanKeys.push(key);
142-
if (writeBuffer.hasAny(key)) {
152+
if (entryType === 'merge') {
143153
hasPendingMerges = true;
144154
}
145155
}
146156
}
147157

148-
// If all keys have SET entries, skip the provider call entirely
158+
const tombstoneResults = tombstoneKeys.map((key) => [key, null] as [string, unknown]);
159+
160+
// If no key needs the provider, skip the worker round-trip entirely
149161
if (cleanKeys.length === 0) {
150-
return Promise.resolve(bufferedKeys.map((key) => [key, writeBuffer.get(key)]));
162+
const bufferedResults = bufferedKeys.map((key) => [key, writeBuffer.get(key)] as [string, unknown]);
163+
return Promise.resolve([...bufferedResults, ...tombstoneResults]);
151164
}
152165

153166
// If any clean keys have pending MERGE entries, flush first
@@ -156,7 +169,7 @@ const storage: Storage = {
156169
return readyPromise.then(() =>
157170
provider.multiGet(cleanKeys).then((providerResults) => {
158171
const bufferedResults = bufferedKeys.map((key) => [key, writeBuffer.get(key)] as [string, unknown]);
159-
return [...providerResults, ...bufferedResults];
172+
return [...providerResults, ...bufferedResults, ...tombstoneResults];
160173
}),
161174
);
162175
}),
@@ -211,23 +224,24 @@ const storage: Storage = {
211224
}),
212225

213226
/**
214-
* Removes given key and its value.
215-
* Also removes the key from the write buffer if it has a pending write.
227+
* Removes given key and its value. Staged as a REMOVE tombstone in the
228+
* write buffer; the actual provider delete happens in the next coalesced
229+
* flush via multiRemove, so deletes no longer block reads in the worker.
216230
*/
217231
removeItem: (key) =>
218232
tryOrDegradePerformance(() => {
219233
writeBuffer.remove(key);
220-
return provider.removeItem(key);
234+
return Promise.resolve();
221235
}),
222236

223237
/**
224-
* Remove given keys and their values.
225-
* Also removes the keys from the write buffer if they have pending writes.
238+
* Remove given keys and their values. Staged as REMOVE tombstones in the
239+
* write buffer; flushed together as a single multiRemove on next flush.
226240
*/
227241
removeItems: (keys) =>
228242
tryOrDegradePerformance(() => {
229243
writeBuffer.removeMany(keys);
230-
return provider.removeItems(keys);
244+
return Promise.resolve();
231245
}),
232246

233247
/**

0 commit comments

Comments
 (0)