Skip to content

Commit 2bc175d

Browse files
authored
Merge pull request #91734 from callstack-internal/fix/sq-crash-window
SequentialQueue: await Onyx persist before allowing flush
2 parents 625298b + b0628f3 commit 2bc175d

4 files changed

Lines changed: 288 additions & 156 deletions

File tree

src/libs/Network/SequentialQueue.ts

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,28 @@ type RequestError = Error & {
4444
status?: string;
4545
};
4646

47-
let resolveIsReadyPromise: ((args?: unknown[]) => void) | undefined;
48-
let isReadyPromise = new Promise((resolve) => {
49-
resolveIsReadyPromise = resolve;
50-
});
47+
let resolveIsReadyPromise: (() => void) | undefined;
48+
let isReadyPromise: Promise<void> = Promise.resolve();
49+
let isReadyPromisePending = false;
5150

52-
// Resolve the isReadyPromise immediately so that the queue starts working as soon as the page loads
53-
resolveIsReadyPromise?.();
51+
/**
52+
* Marks isReadyPromise as pending so any READ that consults waitForIdle() parks behind us.
53+
* Idempotent: if already pending, no-op (avoids orphaning subscribers from prior pushes).
54+
* Called from push()'s sync prelude before the first await, so READs on the next sync line
55+
* see the pending promise.
56+
*/
57+
function setIsReadyPromisePending() {
58+
if (isReadyPromisePending) {
59+
return;
60+
}
61+
isReadyPromise = new Promise<void>((resolve) => {
62+
resolveIsReadyPromise = () => {
63+
isReadyPromisePending = false;
64+
resolve();
65+
};
66+
});
67+
isReadyPromisePending = true;
68+
}
5469

5570
let isSequentialQueueRunning = false;
5671
let currentRequestPromise: Promise<void> | null = null;
@@ -322,6 +337,10 @@ function flush(shouldResetPromise = true) {
322337

323338
if (persistedRequestsLength === 0 && !currentOngoingRequest && !hasOnyxUpdates) {
324339
Log.info('[SequentialQueue] Unable to flush. No requests or queued Onyx updates to process.');
340+
// push() may have marked isReadyPromise pending in its sync prelude (e.g. a conflict
341+
// resolver deleted the only request without pushing a replacement). Resolve here so READs
342+
// parked on waitForIdle() don't hang until unrelated queue activity releases them.
343+
resolveIsReadyPromise?.();
325344
return;
326345
}
327346

@@ -338,6 +357,10 @@ function flush(shouldResetPromise = true) {
338357
persistedRequestsLength,
339358
hasOngoingRequest: !!currentOngoingRequest,
340359
});
360+
// push() may have marked isReadyPromise pending in its sync prelude. Followers never
361+
// process the queue, so resolve here — otherwise READs parked on waitForIdle() would
362+
// hang forever on this tab after any write.
363+
resolveIsReadyPromise?.();
341364
return;
342365
}
343366

@@ -350,10 +373,9 @@ function flush(shouldResetPromise = true) {
350373
isSequentialQueueRunning = true;
351374

352375
if (shouldResetPromise) {
353-
// Reset the isReadyPromise so that the queue will be flushed as soon as the request is finished
354-
isReadyPromise = new Promise((resolve) => {
355-
resolveIsReadyPromise = resolve;
356-
});
376+
// Mark isReadyPromise as pending so READs (waitForIdle) park behind us.
377+
// Idempotent — safe if push() already marked it pending in its sync prelude.
378+
setIsReadyPromisePending();
357379
}
358380

359381
// Ensure persistedRequests are read from storage before proceeding with the queue
@@ -520,7 +542,7 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl
520542
}
521543
}
522544

523-
function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
545+
async function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
524546
const currentRequests = getAllPersistedRequests();
525547
Log.info('[SequentialQueue] push() called', false, {
526548
command: newRequest.command,
@@ -530,19 +552,10 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
530552
isSequentialQueueRunning,
531553
});
532554

533-
// Save the request to the persisted queue. The in-memory update inside save()
534-
// happens synchronously, so flush() below will see the new request immediately.
535-
// The returned promise resolves when disk persistence completes.
536555
let persistencePromise: Promise<void>;
537556

538557
if (newRequest.checkAndFixConflictingRequest) {
539-
const requests = currentRequests;
540-
Log.info('[SequentialQueue] Checking for conflicts', false, {
541-
command: newRequest.command,
542-
existingRequestsCount: requests.length,
543-
});
544-
545-
const {conflictAction} = newRequest.checkAndFixConflictingRequest(requests as Array<OnyxRequest<TKey>>);
558+
const {conflictAction} = newRequest.checkAndFixConflictingRequest(currentRequests as Array<OnyxRequest<TKey>>);
546559
Log.info('[SequentialQueue] Conflict action determined', false, {
547560
command: newRequest.command,
548561
conflictType: conflictAction.type,
@@ -553,41 +566,56 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
553566
delete newRequest.checkAndFixConflictingRequest;
554567
persistencePromise = handleConflictActions(conflictAction, newRequest);
555568
} else {
556-
Log.info('[SequentialQueue] No conflict action. Adding request to Persisted Requests', false, {
557-
command: newRequest.command,
558-
});
559-
// Add request to Persisted Requests so that it can be retried if it fails
560569
persistencePromise = savePersistedRequest(newRequest);
561570
}
562571

563-
// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
564572
if (isOfflineNetwork()) {
565573
Log.info('[SequentialQueue] Request persisted but not flushing — we are offline', false, {
566574
command: newRequest.command,
567575
queueLength: getAllPersistedRequests().length,
568576
});
569-
return persistencePromise;
577+
await persistencePromise;
578+
return;
579+
}
580+
581+
// Mark the ready-promise pending sync (before the first await) so any READ that fires on
582+
// the next synchronous line via waitForIdle() correctly parks behind this write.
583+
setIsReadyPromisePending();
584+
585+
// Block until the Onyx disk commit lands so flush() → XHR cannot race the disk write —
586+
// a process kill in that window would lose the request on next launch.
587+
try {
588+
await persistencePromise;
589+
} catch {
590+
// Backstop: persistence alerts+swallows on failure, so this shouldn't reject. If it ever does,
591+
// flush anyway (the request is already in the in-memory queue) rather than stranding isReadyPromise.
592+
Log.info('[SequentialQueue] Persist rejected — flushing anyway', false, {command: newRequest.command});
593+
}
594+
595+
// The network may have flipped offline while we awaited the disk write. flush() would
596+
// early-return on its offline check without resolving isReadyPromise, leaving READs parked
597+
// on waitForIdle() until an unrelated reconnect drains the queue. Resolve here so READs
598+
// proceed — consistent with flush() resolving isReadyPromise when offline.
599+
if (isOfflineNetwork()) {
600+
Log.info('[SequentialQueue] Went offline during persist — resolving isReadyPromise without flushing', false, {
601+
command: newRequest.command,
602+
});
603+
resolveIsReadyPromise?.();
604+
return;
570605
}
571606

572-
// If the queue is running this request will run once it has finished processing the current batch
573607
if (isSequentialQueueRunning) {
574608
Log.info('[SequentialQueue] Queue is running. Will flush when the current request is finished.', false, {
575609
command: newRequest.command,
576610
});
577-
isReadyPromise.then(() => {
578-
Log.info('[SequentialQueue] isReadyPromise resolved, flushing queue', false, {
579-
command: newRequest.command,
580-
});
581-
flush(true);
582-
});
583-
return persistencePromise;
611+
isReadyPromise.then(() => flush(false));
612+
return;
584613
}
585614

586615
Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, {
587616
command: newRequest.command,
588617
});
589-
flush(true);
590-
return persistencePromise;
618+
flush(false);
591619
}
592620

593621
function getCurrentRequest(): Promise<void> {
@@ -612,10 +640,9 @@ function resetQueue(): void {
612640
isSequentialQueueRunning = false;
613641
currentRequestPromise = null;
614642
isQueuePaused = false;
615-
isReadyPromise = new Promise((resolve) => {
616-
resolveIsReadyPromise = resolve;
617-
});
618-
resolveIsReadyPromise?.();
643+
isReadyPromise = Promise.resolve();
644+
isReadyPromisePending = false;
645+
resolveIsReadyPromise = undefined;
619646
}
620647

621648
export {

src/libs/actions/PersistedRequests.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,12 @@ function save<TKey extends OnyxKey>(requestToPersist: Request<TKey>): Promise<vo
260260
queueLength: getLength(),
261261
});
262262
})
263-
.catch(() => {
264-
Log.info('[PersistedRequests] ERROR: Failed to persist request to disk', false, {
263+
.catch((error) => {
264+
// Disk-write failure risks losing this request on a crash — alert as a storage emergency.
265+
Log.alert('[PersistedRequests] ERROR: Failed to persist request to disk', {
265266
command: requestToPersist.command,
266267
queueLength: getLength(),
268+
error,
267269
});
268270
});
269271
}
@@ -334,9 +336,18 @@ function deleteRequestsByIndices(indices: number[]): Promise<void> {
334336
persistedRequests = persistedRequests.filter((_, index) => !indicesSet.has(index));
335337

336338
// Update the persisted requests in storage or state as necessary
337-
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests)).then(() => {
338-
Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`);
339-
});
339+
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests))
340+
.then(() => {
341+
Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`);
342+
})
343+
.catch((error) => {
344+
// Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency.
345+
Log.alert('[PersistedRequests] ERROR: Failed to persist request deletion to disk', {
346+
indicesCount: indices.length,
347+
queueLength: getLength(),
348+
error,
349+
});
350+
});
340351
}
341352

342353
function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Request<TKey>): Promise<void> {
@@ -349,7 +360,14 @@ function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Reque
349360
if (requestIndex != null) {
350361
knownRequestIDs.add(requestIndex);
351362
}
352-
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests));
363+
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests)).catch((error) => {
364+
// Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency.
365+
Log.alert('[PersistedRequests] ERROR: Failed to persist updated request to disk', {
366+
command: newRequest.command,
367+
queueLength: getLength(),
368+
error,
369+
});
370+
});
353371
}
354372

355373
function shouldPersistOngoingRequest(request: AnyRequest | null): boolean {

0 commit comments

Comments
 (0)