diff --git a/src/libs/Network/SequentialQueue.ts b/src/libs/Network/SequentialQueue.ts index 4ea6e9f2e761..9c4a1b366bf7 100644 --- a/src/libs/Network/SequentialQueue.ts +++ b/src/libs/Network/SequentialQueue.ts @@ -44,13 +44,28 @@ type RequestError = Error & { status?: string; }; -let resolveIsReadyPromise: ((args?: unknown[]) => void) | undefined; -let isReadyPromise = new Promise((resolve) => { - resolveIsReadyPromise = resolve; -}); +let resolveIsReadyPromise: (() => void) | undefined; +let isReadyPromise: Promise = Promise.resolve(); +let isReadyPromisePending = false; -// Resolve the isReadyPromise immediately so that the queue starts working as soon as the page loads -resolveIsReadyPromise?.(); +/** + * Marks isReadyPromise as pending so any READ that consults waitForIdle() parks behind us. + * Idempotent: if already pending, no-op (avoids orphaning subscribers from prior pushes). + * Called from push()'s sync prelude before the first await, so READs on the next sync line + * see the pending promise. + */ +function setIsReadyPromisePending() { + if (isReadyPromisePending) { + return; + } + isReadyPromise = new Promise((resolve) => { + resolveIsReadyPromise = () => { + isReadyPromisePending = false; + resolve(); + }; + }); + isReadyPromisePending = true; +} let isSequentialQueueRunning = false; let currentRequestPromise: Promise | null = null; @@ -322,6 +337,10 @@ function flush(shouldResetPromise = true) { if (persistedRequestsLength === 0 && !currentOngoingRequest && !hasOnyxUpdates) { Log.info('[SequentialQueue] Unable to flush. No requests or queued Onyx updates to process.'); + // push() may have marked isReadyPromise pending in its sync prelude (e.g. a conflict + // resolver deleted the only request without pushing a replacement). Resolve here so READs + // parked on waitForIdle() don't hang until unrelated queue activity releases them. + resolveIsReadyPromise?.(); return; } @@ -338,6 +357,10 @@ function flush(shouldResetPromise = true) { persistedRequestsLength, hasOngoingRequest: !!currentOngoingRequest, }); + // push() may have marked isReadyPromise pending in its sync prelude. Followers never + // process the queue, so resolve here — otherwise READs parked on waitForIdle() would + // hang forever on this tab after any write. + resolveIsReadyPromise?.(); return; } @@ -350,10 +373,9 @@ function flush(shouldResetPromise = true) { isSequentialQueueRunning = true; if (shouldResetPromise) { - // Reset the isReadyPromise so that the queue will be flushed as soon as the request is finished - isReadyPromise = new Promise((resolve) => { - resolveIsReadyPromise = resolve; - }); + // Mark isReadyPromise as pending so READs (waitForIdle) park behind us. + // Idempotent — safe if push() already marked it pending in its sync prelude. + setIsReadyPromisePending(); } // Ensure persistedRequests are read from storage before proceeding with the queue @@ -520,7 +542,7 @@ async function handleConflictActions(conflictAction: Confl } } -function push(newRequest: OnyxRequest): Promise { +async function push(newRequest: OnyxRequest): Promise { const currentRequests = getAllPersistedRequests(); Log.info('[SequentialQueue] push() called', false, { command: newRequest.command, @@ -530,19 +552,10 @@ function push(newRequest: OnyxRequest): Promise; if (newRequest.checkAndFixConflictingRequest) { - const requests = currentRequests; - Log.info('[SequentialQueue] Checking for conflicts', false, { - command: newRequest.command, - existingRequestsCount: requests.length, - }); - - const {conflictAction} = newRequest.checkAndFixConflictingRequest(requests as Array>); + const {conflictAction} = newRequest.checkAndFixConflictingRequest(currentRequests as Array>); Log.info('[SequentialQueue] Conflict action determined', false, { command: newRequest.command, conflictType: conflictAction.type, @@ -553,41 +566,56 @@ function push(newRequest: OnyxRequest): Promise { - Log.info('[SequentialQueue] isReadyPromise resolved, flushing queue', false, { - command: newRequest.command, - }); - flush(true); - }); - return persistencePromise; + isReadyPromise.then(() => flush(false)); + return; } Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, { command: newRequest.command, }); - flush(true); - return persistencePromise; + flush(false); } function getCurrentRequest(): Promise { @@ -612,10 +640,9 @@ function resetQueue(): void { isSequentialQueueRunning = false; currentRequestPromise = null; isQueuePaused = false; - isReadyPromise = new Promise((resolve) => { - resolveIsReadyPromise = resolve; - }); - resolveIsReadyPromise?.(); + isReadyPromise = Promise.resolve(); + isReadyPromisePending = false; + resolveIsReadyPromise = undefined; } export { diff --git a/src/libs/actions/PersistedRequests.ts b/src/libs/actions/PersistedRequests.ts index a689e238a2d7..847946eeb51e 100644 --- a/src/libs/actions/PersistedRequests.ts +++ b/src/libs/actions/PersistedRequests.ts @@ -260,10 +260,12 @@ function save(requestToPersist: Request): Promise { - Log.info('[PersistedRequests] ERROR: Failed to persist request to disk', false, { + .catch((error) => { + // Disk-write failure risks losing this request on a crash — alert as a storage emergency. + Log.alert('[PersistedRequests] ERROR: Failed to persist request to disk', { command: requestToPersist.command, queueLength: getLength(), + error, }); }); } @@ -334,9 +336,18 @@ function deleteRequestsByIndices(indices: number[]): Promise { persistedRequests = persistedRequests.filter((_, index) => !indicesSet.has(index)); // Update the persisted requests in storage or state as necessary - return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests)).then(() => { - Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`); - }); + return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, persistedRequests)) + .then(() => { + Log.info(`Multiple (${indices.length}) requests removed from the queue. Queue length is ${persistedRequests.length}`); + }) + .catch((error) => { + // Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency. + Log.alert('[PersistedRequests] ERROR: Failed to persist request deletion to disk', { + indicesCount: indices.length, + queueLength: getLength(), + error, + }); + }); } function update(oldRequestIndex: number, newRequest: Request): Promise { @@ -349,7 +360,14 @@ function update(oldRequestIndex: number, newRequest: Reque if (requestIndex != null) { knownRequestIDs.add(requestIndex); } - return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests)); + return trackOnyxWrite(Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests)).catch((error) => { + // Swallow so the conflict promise resolves (in-memory queue is the source of truth); alert as a storage emergency. + Log.alert('[PersistedRequests] ERROR: Failed to persist updated request to disk', { + command: newRequest.command, + queueLength: getLength(), + error, + }); + }); } function shouldPersistOngoingRequest(request: AnyRequest | null): boolean { diff --git a/tests/unit/SequentialQueueTest.ts b/tests/unit/SequentialQueueTest.ts index be9dc358b85b..67adffd451ad 100644 --- a/tests/unit/SequentialQueueTest.ts +++ b/tests/unit/SequentialQueueTest.ts @@ -1,5 +1,6 @@ import Onyx from 'react-native-onyx'; import type {OnyxKey, OnyxUpdate} from 'react-native-onyx'; +import * as NetworkState from '@libs/NetworkState'; import {clear as clearPersistedRequests, getAll, getLength, getOngoingRequest, updateOngoingRequest} from '@userActions/PersistedRequests'; import CONST from '@src/CONST'; import ONYXKEYS from '@src/ONYXKEYS'; @@ -7,6 +8,7 @@ import * as SequentialQueue from '../../src/libs/Network/SequentialQueue'; import * as RequestModule from '../../src/libs/Request'; import type Request from '../../src/types/onyx/Request'; import type {AnyRequest, ConflictActionData} from '../../src/types/onyx/Request'; +import type {MockFetch} from '../utils/TestHelper'; import * as TestHelper from '../utils/TestHelper'; import waitForBatchedUpdates from '../utils/waitForBatchedUpdates'; @@ -15,13 +17,15 @@ const request: Request<'userMetadata'> = { successData: [{key: 'userMetadata', onyxMethod: 'set', value: {accountID: 1234}}], failureData: [{key: 'userMetadata', onyxMethod: 'set', value: {}}], }; +let mockFetch: MockFetch; beforeAll(() => { Onyx.init({ keys: ONYXKEYS, }); }); beforeEach(() => { - global.fetch = TestHelper.getGlobalFetchMock(); + mockFetch = TestHelper.createGlobalFetchMock(); + global.fetch = mockFetch; return Onyx.clear() .then(() => SequentialQueue.clearQueueFlushedData()) .then(waitForBatchedUpdates); @@ -38,29 +42,103 @@ describe('SequentialQueue', () => { expect(getLength()).toBe(2); }); - it('should push two requests with conflict resolution and replace', () => { - SequentialQueue.push(request); - const requestWithConflictResolution: Request = { - command: 'ReconnectApp', - data: {accountID: 56789}, - checkAndFixConflictingRequest: (persistedRequests) => { - // should be one instance of ReconnectApp, get the index to replace it later - const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); - if (index === -1) { - return {conflictAction: {type: 'push'}}; - } + it('should resolve waitForIdle without flushing when the network goes offline during persist', async () => { + // push()'s sync prelude marks isReadyPromise pending while online, then awaits the disk + // write. If the network flips offline during that await, flush() would early-return on its + // offline guard without resolving isReadyPromise — leaving waitForIdle() (READs) hung until + // an unrelated reconnect. push() must instead resolve isReadyPromise and skip flushing. + const offlineSpy = jest.spyOn(NetworkState, 'getIsOffline').mockReturnValue(false); + let timeoutId: ReturnType | undefined; + try { + // Kick off the push while "online": the synchronous prelude runs up to `await persistencePromise`. + const pushPromise = SequentialQueue.push(request); - return { - conflictAction: {type: 'replace', index}, + // Flip offline while the awaited disk write is still pending. + offlineSpy.mockReturnValue(true); + + await pushPromise; + + // The request is still persisted — not flushed, not dropped. + expect(getLength()).toBe(1); + + // waitForIdle() must resolve rather than hang. + const idleOrTimeout = await Promise.race([ + SequentialQueue.waitForIdle().then(() => 'resolved' as const), + new Promise<'timeout'>((resolve) => { + timeoutId = setTimeout(() => resolve('timeout'), 1000); + }), + ]); + expect(idleOrTimeout).toBe('resolved'); + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } + offlineSpy.mockRestore(); + } + }); + + it('should not block the queue when a disk write fails during persist', async () => { + // If a conflict-resolution disk write rejects (storage full / corruption), push() must not throw + // or strand isReadyPromise — the request should still flush and waitForIdle() should resolve. + const originalSet = Onyx.set.bind(Onyx); + + mockFetch.pause(); + try { + await SequentialQueue.push({command: 'OpenReport'}); // occupies ongoingRequest + await waitForBatchedUpdates(); + await SequentialQueue.push(request); // ReconnectApp stacks in the queue + + // Fail the conflict-resolution persist (a raw Onyx.set on the persisted-requests key). + const setMock = jest + .spyOn(Onyx, 'set') + .mockImplementation((key, value) => (key === ONYXKEYS.PERSISTED_REQUESTS ? Promise.reject(new Error('simulated disk-write failure')) : originalSet(key, value))); + try { + const replacing: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: (persistedRequests) => { + const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); + return {conflictAction: index === -1 ? {type: 'push'} : {type: 'replace', index}}; + }, }; - }, - }; - SequentialQueue.push(requestWithConflictResolution); - expect(getLength()).toBe(1); - // We know there is only one request and it's ongoing. - // We can get it and verify that the ongoing request is the second one. - const ongoingRequest = getOngoingRequest(); - expect(ongoingRequest?.data?.accountID).toBe(56789); + // The failed disk write must not reject the caller. + await expect(SequentialQueue.push(replacing)).resolves.toBeUndefined(); + } finally { + setMock.mockRestore(); + } + } finally { + await mockFetch.resume(); + } + + // The queue still drains and READs unblock — a hang here would fail the test by timing out. + await SequentialQueue.waitForIdle(); + expect(getLength()).toBe(0); + }); + + it('should push two requests with conflict resolution and replace', async () => { + // Pause the queue so `process()` does not consume the first request before + // the conflict resolver runs. Under persist-before-fire `push()` is async, + // so we await both pushes and then assert on the on-disk queue directly. + SequentialQueue.pause(); + try { + await SequentialQueue.push(request); + const requestWithConflictResolution: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: (persistedRequests) => { + const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); + if (index === -1) { + return {conflictAction: {type: 'push'}}; + } + return {conflictAction: {type: 'replace', index}}; + }, + }; + await SequentialQueue.push(requestWithConflictResolution); + expect(getLength()).toBe(1); + expect(getAll().at(0)?.data?.accountID).toBe(56789); + } finally { + SequentialQueue.unpause(); + } }); it('should push two requests with conflict resolution and push', () => { @@ -90,108 +168,112 @@ describe('SequentialQueue', () => { }); it('should add a new request even if a similar one is ongoing', async () => { - // .push at the end flush the queue - SequentialQueue.push(request); - - // wait for Onyx.connect execute the callback and start processing the queue - await Promise.resolve(); - - const requestWithConflictResolution: Request = { - command: 'ReconnectApp', - data: {accountID: 56789}, - checkAndFixConflictingRequest: (persistedRequests) => { - // should be one instance of ReconnectApp, get the index to replace it later - const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); - if (index === -1) { - return {conflictAction: {type: 'push'}}; - } - - return { - conflictAction: {type: 'replace', index}, - }; - }, - }; - - SequentialQueue.push(requestWithConflictResolution); - - const ongoingRequest = getOngoingRequest(); - expect(ongoingRequest?.data?.accountID).toBe(56789); - }); - - it('should replace request request in queue while a similar one is ongoing', async () => { - // .push at the end flush the queue - SequentialQueue.push(request); - - // wait for Onyx.connect execute the callback and start processing the queue - await Promise.resolve(); - - const conflictResolver = (persistedRequests: Array>): ConflictActionData => { - // should be one instance of ReconnectApp, get the index to replace it later - const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); - if (index === -1) { - return {conflictAction: {type: 'push'}}; - } - - return { - conflictAction: {type: 'replace', index}, + // Pause fetch so the first request lands as `ongoingRequest` but never completes. + // The conflict checker on push 2 inspects the persisted queue (which excludes the + // ongoing request), so it cannot find a 'ReconnectApp' to replace and falls back + // to 'push'. The new request is therefore added to the queue. + mockFetch.pause(); + try { + await SequentialQueue.push(request); + await waitForBatchedUpdates(); + expect(getOngoingRequest()?.command).toBe('ReconnectApp'); + + const requestWithConflictResolution: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: (persistedRequests) => { + const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); + if (index === -1) { + return {conflictAction: {type: 'push'}}; + } + return {conflictAction: {type: 'replace', index}}; + }, }; - }; - const requestWithConflictResolution: Request = { - command: 'ReconnectApp', - data: {accountID: 56789}, - checkAndFixConflictingRequest: conflictResolver, - }; + await SequentialQueue.push(requestWithConflictResolution); - const requestWithConflictResolution2: Request = { - command: 'ReconnectApp', - data: {accountID: 56789}, - checkAndFixConflictingRequest: conflictResolver, - }; - - SequentialQueue.push(requestWithConflictResolution); - SequentialQueue.push(requestWithConflictResolution2); - - expect(getLength()).toBe(2); + // The new request is in the persisted queue with the expected accountID. + expect(getAll().some((r) => r.data?.accountID === 56789)).toBe(true); + } finally { + await mockFetch.resume(); + } }); - it('should replace request request in queue while a similar one is ongoing and keep the same index', () => { - SequentialQueue.push({command: 'OpenReport'}); - SequentialQueue.push(request); + it('should replace request in queue while a similar one is ongoing', async () => { + mockFetch.pause(); + try { + await SequentialQueue.push(request); + await waitForBatchedUpdates(); + expect(getOngoingRequest()?.command).toBe('ReconnectApp'); - const requestWithConflictResolution: Request = { - command: 'ReconnectApp', - data: {accountID: 56789}, - checkAndFixConflictingRequest: (persistedRequests) => { - // should be one instance of ReconnectApp, get the index to replace it later + const conflictResolver = (persistedRequests: Array>): ConflictActionData => { const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); if (index === -1) { return {conflictAction: {type: 'push'}}; } + return {conflictAction: {type: 'replace', index}}; + }; - return { - conflictAction: {type: 'replace', index}, - }; - }, - }; + const requestWithConflictResolution: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: conflictResolver, + }; - SequentialQueue.push(requestWithConflictResolution); - SequentialQueue.push({command: 'AddComment'}); - SequentialQueue.push({command: 'OpenReport'}); + const requestWithConflictResolution2: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: conflictResolver, + }; - expect(getLength()).toBe(4); - const persistedRequests = getAll(); - const ongoingRequest = getOngoingRequest(); + // First conflict push: queue is empty (ongoing not in queue) → push action → queue=[r2]. + // Second conflict push: queue=[r2] → replace at 0 → queue=[r3]. + // Total in-flight items: ongoing + queue = 2. + await SequentialQueue.push(requestWithConflictResolution); + await SequentialQueue.push(requestWithConflictResolution2); - // The first OpenReport call is ongoing - expect(ongoingRequest?.command).toBe('OpenReport'); + expect(getLength()).toBe(2); + } finally { + await mockFetch.resume(); + } + }); - // We know ReconnectApp is at index 0 in the queue now, so we can get it to verify - // that was replaced by the new request. - expect(persistedRequests.at(0)?.data?.accountID).toBe(56789); + it('should replace request in queue while a similar one is ongoing and keep the same index', async () => { + mockFetch.pause(); + try { + // First push moves into `ongoingRequest`; subsequent pushes stack in the queue. + await SequentialQueue.push({command: 'OpenReport'}); + await waitForBatchedUpdates(); + expect(getOngoingRequest()?.command).toBe('OpenReport'); + + await SequentialQueue.push(request); + + const requestWithConflictResolution: Request = { + command: 'ReconnectApp', + data: {accountID: 56789}, + checkAndFixConflictingRequest: (persistedRequests) => { + const index = persistedRequests.findIndex((r) => r.command === 'ReconnectApp'); + if (index === -1) { + return {conflictAction: {type: 'push'}}; + } + return {conflictAction: {type: 'replace', index}}; + }, + }; + + await SequentialQueue.push(requestWithConflictResolution); + await SequentialQueue.push({command: 'AddComment'}); + await SequentialQueue.push({command: 'OpenReport'}); + + expect(getLength()).toBe(4); + const persistedRequests = getAll(); + expect(getOngoingRequest()?.command).toBe('OpenReport'); + expect(persistedRequests.at(0)?.data?.accountID).toBe(56789); + } finally { + await mockFetch.resume(); + } }); - // need to test a rance condition between processing the next request and then pushing a new request with conflict resolver + // need to test a race condition between processing the next request and then pushing a new request with conflict resolver it('should resolve the conflict and replace the correct request in the queue while a new request is picked up after unpausing', async () => { SequentialQueue.pause(); for (let i = 0; i < 5; i++) { diff --git a/tests/utils/TestHelper.ts b/tests/utils/TestHelper.ts index b710ea226cde..edea0633e606 100644 --- a/tests/utils/TestHelper.ts +++ b/tests/utils/TestHelper.ts @@ -217,7 +217,7 @@ function signOutTestUser() { * - fail() - start returning a failure response * - success() - go back to returning a success response */ -function getGlobalFetchMock(mockResponse?: Partial): typeof fetch { +function createGlobalFetchMock(mockResponse?: Partial): MockFetch { let queue: QueueItem[] = []; // eslint-disable-next-line @typescript-eslint/no-explicit-any let responses = new Map OnyxResponse>(); @@ -280,7 +280,11 @@ function getGlobalFetchMock(mockResponse?: Partial): typeof fetch { mockFetch.mockAPICommand = (command: TCommand, responseHandler: (params: ApiRequestCommandParameters[TCommand]) => OnyxResponse): void => { responses.set(command, responseHandler); }; - return mockFetch as typeof fetch; + return mockFetch; +} + +function getGlobalFetchMock(mockResponse?: Partial): typeof fetch { + return createGlobalFetchMock(mockResponse); } function setupGlobalFetchMock(): MockFetch { @@ -394,6 +398,7 @@ export { buildTestReportComment, getFetchMockCalls, getGlobalFetchMock, + createGlobalFetchMock, setPersonalDetails, signInWithTestUser, signOutTestUser,