-
Notifications
You must be signed in to change notification settings - Fork 3.9k
SequentialQueue: await Onyx persist before allowing flush #91734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1e43ade
d4aa746
96aceae
ce33aa9
5cd3e59
31ad82c
b4285d8
ec7e71e
5155cb2
9f2097b
297dd5b
a86df44
d3b7692
6944ea4
f68d4c9
6676f02
b0628f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,13 +44,28 @@ type RequestError = Error & { | |
| status?: string; | ||
| }; | ||
|
|
||
| let resolveIsReadyPromise: ((args?: unknown[]) => void) | undefined; | ||
| let isReadyPromise = new Promise((resolve) => { | ||
| resolveIsReadyPromise = resolve; | ||
| }); | ||
| let resolveIsReadyPromise: (() => void) | undefined; | ||
| let isReadyPromise: Promise<void> = Promise.resolve(); | ||
| let isReadyPromisePending = false; | ||
|
|
||
| // Resolve the isReadyPromise immediately so that the queue starts working as soon as the page loads | ||
| resolveIsReadyPromise?.(); | ||
| /** | ||
| * Marks isReadyPromise as pending so any READ that consults waitForIdle() parks behind us. | ||
| * Idempotent: if already pending, no-op (avoids orphaning subscribers from prior pushes). | ||
| * Called from push()'s sync prelude before the first await, so READs on the next sync line | ||
| * see the pending promise. | ||
| */ | ||
| function setIsReadyPromisePending() { | ||
| if (isReadyPromisePending) { | ||
| return; | ||
| } | ||
| isReadyPromise = new Promise<void>((resolve) => { | ||
| resolveIsReadyPromise = () => { | ||
| isReadyPromisePending = false; | ||
| resolve(); | ||
| }; | ||
| }); | ||
| isReadyPromisePending = true; | ||
| } | ||
|
|
||
| let isSequentialQueueRunning = false; | ||
| let currentRequestPromise: Promise<void> | null = null; | ||
|
|
@@ -322,6 +337,10 @@ function flush(shouldResetPromise = true) { | |
|
|
||
| if (persistedRequestsLength === 0 && !currentOngoingRequest && !hasOnyxUpdates) { | ||
| Log.info('[SequentialQueue] Unable to flush. No requests or queued Onyx updates to process.'); | ||
| // push() may have marked isReadyPromise pending in its sync prelude (e.g. a conflict | ||
| // resolver deleted the only request without pushing a replacement). Resolve here so READs | ||
| // parked on waitForIdle() don't hang until unrelated queue activity releases them. | ||
| resolveIsReadyPromise?.(); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -338,6 +357,10 @@ function flush(shouldResetPromise = true) { | |
| persistedRequestsLength, | ||
| hasOngoingRequest: !!currentOngoingRequest, | ||
| }); | ||
| // push() may have marked isReadyPromise pending in its sync prelude. Followers never | ||
| // process the queue, so resolve here — otherwise READs parked on waitForIdle() would | ||
| // hang forever on this tab after any write. | ||
| resolveIsReadyPromise?.(); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -350,10 +373,9 @@ function flush(shouldResetPromise = true) { | |
| isSequentialQueueRunning = true; | ||
|
|
||
| if (shouldResetPromise) { | ||
| // Reset the isReadyPromise so that the queue will be flushed as soon as the request is finished | ||
| isReadyPromise = new Promise((resolve) => { | ||
| resolveIsReadyPromise = resolve; | ||
| }); | ||
| // Mark isReadyPromise as pending so READs (waitForIdle) park behind us. | ||
| // Idempotent — safe if push() already marked it pending in its sync prelude. | ||
| setIsReadyPromisePending(); | ||
| } | ||
|
|
||
| // Ensure persistedRequests are read from storage before proceeding with the queue | ||
|
|
@@ -520,7 +542,7 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl | |
| } | ||
| } | ||
|
|
||
| function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> { | ||
| async function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> { | ||
| const currentRequests = getAllPersistedRequests(); | ||
| Log.info('[SequentialQueue] push() called', false, { | ||
| command: newRequest.command, | ||
|
|
@@ -530,19 +552,10 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void | |
| isSequentialQueueRunning, | ||
| }); | ||
|
|
||
| // Save the request to the persisted queue. The in-memory update inside save() | ||
| // happens synchronously, so flush() below will see the new request immediately. | ||
| // The returned promise resolves when disk persistence completes. | ||
| let persistencePromise: Promise<void>; | ||
|
|
||
| if (newRequest.checkAndFixConflictingRequest) { | ||
| const requests = currentRequests; | ||
| Log.info('[SequentialQueue] Checking for conflicts', false, { | ||
| command: newRequest.command, | ||
| existingRequestsCount: requests.length, | ||
| }); | ||
|
|
||
| const {conflictAction} = newRequest.checkAndFixConflictingRequest(requests as Array<OnyxRequest<TKey>>); | ||
| const {conflictAction} = newRequest.checkAndFixConflictingRequest(currentRequests as Array<OnyxRequest<TKey>>); | ||
| Log.info('[SequentialQueue] Conflict action determined', false, { | ||
| command: newRequest.command, | ||
| conflictType: conflictAction.type, | ||
|
|
@@ -553,41 +566,56 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void | |
| delete newRequest.checkAndFixConflictingRequest; | ||
| persistencePromise = handleConflictActions(conflictAction, newRequest); | ||
| } else { | ||
| Log.info('[SequentialQueue] No conflict action. Adding request to Persisted Requests', false, { | ||
| command: newRequest.command, | ||
| }); | ||
| // Add request to Persisted Requests so that it can be retried if it fails | ||
| persistencePromise = savePersistedRequest(newRequest); | ||
| } | ||
|
|
||
| // If we are offline we don't need to trigger the queue to empty as it will happen when we come back online | ||
| if (isOfflineNetwork()) { | ||
| Log.info('[SequentialQueue] Request persisted but not flushing — we are offline', false, { | ||
| command: newRequest.command, | ||
| queueLength: getAllPersistedRequests().length, | ||
| }); | ||
| return persistencePromise; | ||
| await persistencePromise; | ||
| return; | ||
| } | ||
|
|
||
| // Mark the ready-promise pending sync (before the first await) so any READ that fires on | ||
| // the next synchronous line via waitForIdle() correctly parks behind this write. | ||
| setIsReadyPromisePending(); | ||
|
|
||
| // Block until the Onyx disk commit lands so flush() → XHR cannot race the disk write — | ||
| // a process kill in that window would lose the request on next launch. | ||
| try { | ||
| await persistencePromise; | ||
| } catch { | ||
| // Backstop: persistence alerts+swallows on failure, so this shouldn't reject. If it ever does, | ||
| // flush anyway (the request is already in the in-memory queue) rather than stranding isReadyPromise. | ||
| Log.info('[SequentialQueue] Persist rejected — flushing anyway', false, {command: newRequest.command}); | ||
| } | ||
|
|
||
| // The network may have flipped offline while we awaited the disk write. flush() would | ||
| // early-return on its offline check without resolving isReadyPromise, leaving READs parked | ||
| // on waitForIdle() until an unrelated reconnect drains the queue. Resolve here so READs | ||
| // proceed — consistent with flush() resolving isReadyPromise when offline. | ||
| if (isOfflineNetwork()) { | ||
| Log.info('[SequentialQueue] Went offline during persist — resolving isReadyPromise without flushing', false, { | ||
| command: newRequest.command, | ||
| }); | ||
| resolveIsReadyPromise?.(); | ||
| return; | ||
| } | ||
|
|
||
|
adhorodyski marked this conversation as resolved.
adhorodyski marked this conversation as resolved.
|
||
| // If the queue is running this request will run once it has finished processing the current batch | ||
| if (isSequentialQueueRunning) { | ||
| Log.info('[SequentialQueue] Queue is running. Will flush when the current request is finished.', false, { | ||
| command: newRequest.command, | ||
| }); | ||
| isReadyPromise.then(() => { | ||
| Log.info('[SequentialQueue] isReadyPromise resolved, flushing queue', false, { | ||
| command: newRequest.command, | ||
| }); | ||
| flush(true); | ||
| }); | ||
| return persistencePromise; | ||
| isReadyPromise.then(() => flush(false)); | ||
|
adhorodyski marked this conversation as resolved.
|
||
| return; | ||
|
adhorodyski marked this conversation as resolved.
adhorodyski marked this conversation as resolved.
|
||
| } | ||
|
|
||
| Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, { | ||
| command: newRequest.command, | ||
| }); | ||
| flush(true); | ||
| return persistencePromise; | ||
| flush(false); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When two writes are enqueued without awaiting each other, the first Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed — leaving this as the documented residual window. As covered in the line-590 thread, this PR narrows the dominant idle-queue case rather than eliminating every window; the back-to-back residual only meaningfully exists for the flush-initiating request (which we now await), since a follower's XHR fires only after the leader's full network round-trip, by which point its same-key Onyx write has already resolved.
adhorodyski marked this conversation as resolved.
|
||
| } | ||
|
|
||
| function getCurrentRequest(): Promise<void> { | ||
|
|
@@ -612,10 +640,9 @@ function resetQueue(): void { | |
| isSequentialQueueRunning = false; | ||
| currentRequestPromise = null; | ||
| isQueuePaused = false; | ||
| isReadyPromise = new Promise((resolve) => { | ||
| resolveIsReadyPromise = resolve; | ||
| }); | ||
| resolveIsReadyPromise?.(); | ||
| isReadyPromise = Promise.resolve(); | ||
| isReadyPromisePending = false; | ||
| resolveIsReadyPromise = undefined; | ||
| } | ||
|
|
||
| export { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.