Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
1e43ade
SequentialQueue: await persist in push() before flush()
adhorodyski May 27, 2026
d4aa746
SequentialQueue: restore push() flush trigger logs
adhorodyski May 27, 2026
96aceae
SequentialQueue: defer flush onto persistencePromise; update tests
adhorodyski May 27, 2026
ce33aa9
SequentialQueue: async push() with idempotent isReadyPromise marker
adhorodyski May 27, 2026
5cd3e59
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 1, 2026
31ad82c
SequentialQueue: resolve isReadyPromise when conflict empties the queue
adhorodyski Jun 1, 2026
b4285d8
SequentialQueue: resolve isReadyPromise on follower tabs; fix test typos
adhorodyski Jun 2, 2026
ec7e71e
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 2, 2026
5155cb2
SequentialQueue: resolve isReadyPromise if push() goes offline during…
adhorodyski Jun 2, 2026
9f2097b
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 2, 2026
297dd5b
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 5, 2026
a86df44
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 5, 2026
d3b7692
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 8, 2026
6944ea4
SequentialQueue: don't block the queue when a persist write fails
adhorodyski Jun 10, 2026
f68d4c9
Fix ESLint no-unsafe-type-assertion in SequentialQueue test
adhorodyski Jun 10, 2026
6676f02
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 10, 2026
b0628f3
Merge branch 'Expensify:main' into fix/sq-crash-window
adhorodyski Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 68 additions & 41 deletions src/libs/Network/SequentialQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,28 @@ type RequestError = Error & {
status?: string;
};

let resolveIsReadyPromise: ((args?: unknown[]) => void) | undefined;
let isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise = resolve;
});
let resolveIsReadyPromise: (() => void) | undefined;
let isReadyPromise: Promise<void> = Promise.resolve();
let isReadyPromisePending = false;

// Resolve the isReadyPromise immediately so that the queue starts working as soon as the page loads
resolveIsReadyPromise?.();
/**
* Marks isReadyPromise as pending so any READ that consults waitForIdle() parks behind us.
* Idempotent: if already pending, no-op (avoids orphaning subscribers from prior pushes).
* Called from push()'s sync prelude before the first await, so READs on the next sync line
* see the pending promise.
*/
function setIsReadyPromisePending() {
if (isReadyPromisePending) {
return;
}
isReadyPromise = new Promise<void>((resolve) => {
resolveIsReadyPromise = () => {
isReadyPromisePending = false;
resolve();
};
});
isReadyPromisePending = true;
}

let isSequentialQueueRunning = false;
let currentRequestPromise: Promise<void> | null = null;
Expand Down Expand Up @@ -322,6 +337,10 @@ function flush(shouldResetPromise = true) {

if (persistedRequestsLength === 0 && !currentOngoingRequest && !hasOnyxUpdates) {
Log.info('[SequentialQueue] Unable to flush. No requests or queued Onyx updates to process.');
// push() may have marked isReadyPromise pending in its sync prelude (e.g. a conflict
// resolver deleted the only request without pushing a replacement). Resolve here so READs
// parked on waitForIdle() don't hang until unrelated queue activity releases them.
resolveIsReadyPromise?.();
return;
}

Expand All @@ -338,6 +357,10 @@ function flush(shouldResetPromise = true) {
persistedRequestsLength,
hasOngoingRequest: !!currentOngoingRequest,
});
// push() may have marked isReadyPromise pending in its sync prelude. Followers never
// process the queue, so resolve here — otherwise READs parked on waitForIdle() would
// hang forever on this tab after any write.
resolveIsReadyPromise?.();
return;
}

Expand All @@ -350,10 +373,9 @@ function flush(shouldResetPromise = true) {
isSequentialQueueRunning = true;

if (shouldResetPromise) {
// Reset the isReadyPromise so that the queue will be flushed as soon as the request is finished
isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise = resolve;
});
// Mark isReadyPromise as pending so READs (waitForIdle) park behind us.
// Idempotent — safe if push() already marked it pending in its sync prelude.
setIsReadyPromisePending();
}

// Ensure persistedRequests are read from storage before proceeding with the queue
Expand Down Expand Up @@ -520,7 +542,7 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl
}
}

function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
async function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
const currentRequests = getAllPersistedRequests();
Log.info('[SequentialQueue] push() called', false, {
command: newRequest.command,
Expand All @@ -530,19 +552,10 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
isSequentialQueueRunning,
});

// Save the request to the persisted queue. The in-memory update inside save()
// happens synchronously, so flush() below will see the new request immediately.
// The returned promise resolves when disk persistence completes.
let persistencePromise: Promise<void>;

if (newRequest.checkAndFixConflictingRequest) {
const requests = currentRequests;
Log.info('[SequentialQueue] Checking for conflicts', false, {
command: newRequest.command,
existingRequestsCount: requests.length,
});

const {conflictAction} = newRequest.checkAndFixConflictingRequest(requests as Array<OnyxRequest<TKey>>);
const {conflictAction} = newRequest.checkAndFixConflictingRequest(currentRequests as Array<OnyxRequest<TKey>>);
Log.info('[SequentialQueue] Conflict action determined', false, {
command: newRequest.command,
conflictType: conflictAction.type,
Expand All @@ -553,41 +566,56 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
delete newRequest.checkAndFixConflictingRequest;
persistencePromise = handleConflictActions(conflictAction, newRequest);
} else {
Log.info('[SequentialQueue] No conflict action. Adding request to Persisted Requests', false, {
command: newRequest.command,
});
// Add request to Persisted Requests so that it can be retried if it fails
persistencePromise = savePersistedRequest(newRequest);
}

// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
if (isOfflineNetwork()) {
Log.info('[SequentialQueue] Request persisted but not flushing — we are offline', false, {
command: newRequest.command,
queueLength: getAllPersistedRequests().length,
});
return persistencePromise;
await persistencePromise;
return;
}

// Mark the ready-promise pending sync (before the first await) so any READ that fires on
// the next synchronous line via waitForIdle() correctly parks behind this write.
setIsReadyPromisePending();
Comment thread
adhorodyski marked this conversation as resolved.

// Block until the Onyx disk commit lands so flush() → XHR cannot race the disk write —
// a process kill in that window would lose the request on next launch.
try {
await persistencePromise;
} catch {
// Backstop: persistence alerts+swallows on failure, so this shouldn't reject. If it ever does,
// flush anyway (the request is already in the in-memory queue) rather than stranding isReadyPromise.
Log.info('[SequentialQueue] Persist rejected — flushing anyway', false, {command: newRequest.command});
}

// The network may have flipped offline while we awaited the disk write. flush() would
// early-return on its offline check without resolving isReadyPromise, leaving READs parked
// on waitForIdle() until an unrelated reconnect drains the queue. Resolve here so READs
// proceed — consistent with flush() resolving isReadyPromise when offline.
if (isOfflineNetwork()) {
Log.info('[SequentialQueue] Went offline during persist — resolving isReadyPromise without flushing', false, {
command: newRequest.command,
});
resolveIsReadyPromise?.();
return;
}

Comment thread
adhorodyski marked this conversation as resolved.
Comment thread
adhorodyski marked this conversation as resolved.
// If the queue is running this request will run once it has finished processing the current batch
if (isSequentialQueueRunning) {
Log.info('[SequentialQueue] Queue is running. Will flush when the current request is finished.', false, {
command: newRequest.command,
});
isReadyPromise.then(() => {
Log.info('[SequentialQueue] isReadyPromise resolved, flushing queue', false, {
command: newRequest.command,
});
flush(true);
});
return persistencePromise;
isReadyPromise.then(() => flush(false));
Comment thread
adhorodyski marked this conversation as resolved.
return;
Comment thread
adhorodyski marked this conversation as resolved.
Comment thread
adhorodyski marked this conversation as resolved.
}

Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, {
command: newRequest.command,
});
flush(true);
return persistencePromise;
flush(false);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't flush requests whose own persistence is still pending

When two writes are enqueued without awaiting each other, the first push() only waits for its own persistencePromise and then this flush(false) can drain the entire in-memory queue, including later requests whose savePersistedRequest() disk writes are still pending. If the first request finishes quickly and the app is killed after a later request is sent but before that later Onyx write commits, that later request is again lost on relaunch, so the crash-loss window remains for common back-to-back writes. Gate flushing on all pending persistence writes, or only process requests whose own persistence has completed.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — leaving this as the documented residual window. As covered in the line-590 thread, this PR narrows the dominant idle-queue case rather than eliminating every window; the back-to-back residual only meaningfully exists for the flush-initiating request (which we now await), since a follower's XHR fires only after the leader's full network round-trip, by which point its same-key Onyx write has already resolved.

Comment thread
adhorodyski marked this conversation as resolved.
}

function getCurrentRequest(): Promise<void> {
Expand All @@ -612,10 +640,9 @@ function resetQueue(): void {
isSequentialQueueRunning = false;
currentRequestPromise = null;
isQueuePaused = false;
isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise = resolve;
});
resolveIsReadyPromise?.();
isReadyPromise = Promise.resolve();
isReadyPromisePending = false;
resolveIsReadyPromise = undefined;
}

export {
Expand Down
30 changes: 24 additions & 6 deletions src/libs/actions/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,12 @@ function save<TKey extends OnyxKey>(requestToPersist: Request<TKey>): Promise<vo
queueLength: getLength(),
});
})
.catch(() => {
Log.info('[PersistedRequests] ERROR: Failed to persist request to disk', false, {
.catch((error) => {
// Disk-write failure risks losing this request on a crash — alert as a storage emergency.
Log.alert('[PersistedRequests] ERROR: Failed to persist request to disk', {
command: requestToPersist.command,
queueLength: getLength(),
error,
});
});
}
Expand Down Expand Up @@ -334,9 +336,18 @@ function deleteRequestsByIndices(indices: number[]): Promise<void> {
persistedRequests = persistedRequests.filter((_, index) => !indicesSet.has(index));

// Update the persisted requests in storage or state as necessary
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests)).then(() => {
Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`);
});
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests))
.then(() => {
Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`);
})
.catch((error) => {
// Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency.
Log.alert('[PersistedRequests] ERROR: Failed to persist request deletion to disk', {
indicesCount: indices.length,
queueLength: getLength(),
error,
});
});
}

function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Request<TKey>): Promise<void> {
Expand All @@ -349,7 +360,14 @@ function update<TKey extends OnyxKey>(oldRequestIndex: number, newRequest: Reque
if (requestIndex != null) {
knownRequestIDs.add(requestIndex);
}
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests));
return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests)).catch((error) => {
// Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency.
Log.alert('[PersistedRequests] ERROR: Failed to persist updated request to disk', {
command: newRequest.command,
queueLength: getLength(),
error,
});
});
}

function shouldPersistOngoingRequest(request: AnyRequest | null): boolean {
Expand Down
Loading
Loading