Skip to content

Commit 70d1ac9

Browse files
committed
SequentialQueue: await Onyx persist before flush
1 parent f3621b7 commit 70d1ac9

3 files changed

Lines changed: 350 additions & 114 deletions

File tree

src/libs/Network/SequentialQueue.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ async function handleConflictActions<TKey extends OnyxKey>(conflictAction: Confl
504504
}
505505
}
506506

507-
function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
507+
async function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void> {
508508
const currentRequests = getAllPersistedRequests();
509509
Log.info('[SequentialQueue] push() called', false, {
510510
command: newRequest.command,
@@ -514,9 +514,10 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
514514
isSequentialQueueRunning,
515515
});
516516

517-
// Save the request to the persisted queue. The in-memory update inside save()
518-
// happens synchronously, so flush() below will see the new request immediately.
519-
// The returned promise resolves when disk persistence completes.
517+
// Persist-before-fire: await disk write before any flush(). The in-memory update
518+
// inside save()/handleConflictActions happens synchronously, but the network call
519+
// must NOT race the Onyx.set() promise. If the process is killed after fire but
520+
// before disk commit, the request is lost — no retry on next launch.
520521
let persistencePromise: Promise<void>;
521522

522523
if (newRequest.checkAndFixConflictingRequest) {
@@ -544,13 +545,20 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
544545
persistencePromise = savePersistedRequest(newRequest);
545546
}
546547

548+
await persistencePromise;
549+
Log.info('[SequentialQueue] Request persisted, proceeding to flush', false, {
550+
command: newRequest.command,
551+
isOffline: isOfflineNetwork(),
552+
isSequentialQueueRunning,
553+
});
554+
547555
// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
548556
if (isOfflineNetwork()) {
549557
Log.info('[SequentialQueue] Request persisted but not flushing — we are offline', false, {
550558
command: newRequest.command,
551559
queueLength: getAllPersistedRequests().length,
552560
});
553-
return persistencePromise;
561+
return;
554562
}
555563

556564
// If the queue is running this request will run once it has finished processing the current batch
@@ -564,14 +572,13 @@ function push<TKey extends OnyxKey>(newRequest: OnyxRequest<TKey>): Promise<void
564572
});
565573
flush(true);
566574
});
567-
return persistencePromise;
575+
return;
568576
}
569577

570578
Log.info('[SequentialQueue] Queue is not running. Flushing the queue.', false, {
571579
command: newRequest.command,
572580
});
573581
flush(true);
574-
return persistencePromise;
575582
}
576583

577584
function getCurrentRequest(): Promise<void> {
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
import Onyx from 'react-native-onyx';
2+
import type {OnyxKey} from 'react-native-onyx';
3+
import OnyxUtils from 'react-native-onyx/dist/OnyxUtils';
4+
import * as PersistedRequests from '@userActions/PersistedRequests';
5+
import ONYXKEYS from '@src/ONYXKEYS';
6+
import * as SequentialQueue from '../../src/libs/Network/SequentialQueue';
7+
import type Request from '../../src/types/onyx/Request';
8+
import * as TestHelper from '../utils/TestHelper';
9+
import waitForBatchedUpdates from '../utils/waitForBatchedUpdates';
10+
11+
jest.mock('@libs/ActiveClientManager', () => ({
12+
isClientTheLeader: jest.fn(() => true),
13+
init: jest.fn(),
14+
isReady: jest.fn(() => Promise.resolve()),
15+
}));
16+
17+
type DeferredSet = {key: OnyxKey; value: unknown; resolve: () => void; flush: () => Promise<void>};
18+
19+
/**
20+
* Intercept `Onyx.set` for the given key and defer resolution so tests can observe
21+
* the moment-of-fire vs. moment-of-disk-commit ordering. Returns the list of
22+
* captured sets (mutated as more arrive) and a restore function.
23+
*/
24+
function deferOnyxSetFor(key: OnyxKey) {
25+
const captured: DeferredSet[] = [];
26+
const originalSet = Onyx.set.bind(Onyx);
27+
const spy = jest.spyOn(Onyx, 'set').mockImplementation((targetKey, value) => {
28+
if (targetKey !== key) {
29+
return originalSet(targetKey, value);
30+
}
31+
return new Promise<void>((resolvePromise) => {
32+
captured.push({
33+
key: targetKey,
34+
value,
35+
resolve: resolvePromise,
36+
flush: () => originalSet(targetKey, value as never).then(resolvePromise),
37+
});
38+
});
39+
});
40+
return {captured, restore: () => spy.mockRestore()};
41+
}
42+
43+
const request: Request<'userMetadata'> = {
44+
command: 'ReconnectApp',
45+
successData: [{key: 'userMetadata', onyxMethod: 'set', value: {accountID: 1234}}],
46+
failureData: [{key: 'userMetadata', onyxMethod: 'set', value: {}}],
47+
};
48+
49+
beforeAll(() => {
50+
Onyx.init({keys: ONYXKEYS});
51+
});
52+
53+
let fetchMock: jest.Mock;
54+
beforeEach(async () => {
55+
fetchMock = TestHelper.getGlobalFetchMock() as unknown as jest.Mock;
56+
global.fetch = fetchMock as unknown as typeof fetch;
57+
await Onyx.clear();
58+
SequentialQueue.resetQueue();
59+
await SequentialQueue.clearQueueFlushedData();
60+
await waitForBatchedUpdates();
61+
});
62+
63+
describe('SequentialQueue.push - persist-before-fire', () => {
64+
it('online: does not call fetch until Onyx.set for PERSISTED_REQUESTS resolves', async () => {
65+
const {captured, restore} = deferOnyxSetFor(ONYXKEYS.PERSISTED_REQUESTS);
66+
67+
try {
68+
const pushPromise = SequentialQueue.push({...request});
69+
70+
// Pump microtasks. With persistence deferred, push() must not resolve and
71+
// fetch must not be called.
72+
await waitForBatchedUpdates();
73+
await waitForBatchedUpdates();
74+
expect(fetchMock).not.toHaveBeenCalled();
75+
expect(captured.length).toBeGreaterThanOrEqual(1);
76+
77+
// Resolve the deferred persistence — only now is the request authoritative on disk.
78+
await captured.at(0)?.flush();
79+
await pushPromise;
80+
await waitForBatchedUpdates();
81+
await waitForBatchedUpdates();
82+
83+
expect(fetchMock).toHaveBeenCalled();
84+
} finally {
85+
restore();
86+
}
87+
});
88+
89+
it('at the await boundary, on-disk PERSISTED_REQUESTS already contains the new request', async () => {
90+
await SequentialQueue.push({...request});
91+
92+
// Read straight from Onyx to confirm the persisted shape, not just in-memory state.
93+
const onDisk = await OnyxUtils.get(ONYXKEYS.PERSISTED_REQUESTS);
94+
expect(onDisk).toBeDefined();
95+
expect(onDisk?.some((r) => r.command === request.command)).toBe(true);
96+
});
97+
98+
it('offline: push() resolves after persistence and does not call fetch', async () => {
99+
await Onyx.set(ONYXKEYS.NETWORK, {shouldForceOffline: true});
100+
await waitForBatchedUpdates();
101+
102+
const {captured, restore} = deferOnyxSetFor(ONYXKEYS.PERSISTED_REQUESTS);
103+
try {
104+
let resolved = false;
105+
const pushPromise = SequentialQueue.push({...request}).then(() => {
106+
resolved = true;
107+
});
108+
109+
await waitForBatchedUpdates();
110+
expect(resolved).toBe(false);
111+
expect(captured.length).toBeGreaterThanOrEqual(1);
112+
113+
await captured.at(0)?.flush();
114+
await pushPromise;
115+
await waitForBatchedUpdates();
116+
117+
expect(resolved).toBe(true);
118+
expect(fetchMock).not.toHaveBeenCalled();
119+
expect(PersistedRequests.getAll().some((r) => r.command === request.command)).toBe(true);
120+
} finally {
121+
restore();
122+
await Onyx.set(ONYXKEYS.NETWORK, {shouldForceOffline: false});
123+
}
124+
});
125+
126+
it('conflict resolution (replace): final queue state is on disk before any fire', async () => {
127+
// Pause so the seeded request sits in the queue instead of being consumed by process().
128+
SequentialQueue.pause();
129+
130+
await SequentialQueue.push({...request});
131+
await waitForBatchedUpdates();
132+
fetchMock.mockClear();
133+
134+
const {captured, restore} = deferOnyxSetFor(ONYXKEYS.PERSISTED_REQUESTS);
135+
try {
136+
const replacement: Request<never> = {
137+
command: 'ReconnectApp',
138+
data: {accountID: 99999},
139+
checkAndFixConflictingRequest: (persisted) => {
140+
const index = persisted.findIndex((r) => r.command === 'ReconnectApp');
141+
if (index === -1) {
142+
return {conflictAction: {type: 'push'}};
143+
}
144+
return {conflictAction: {type: 'replace', index}};
145+
},
146+
};
147+
148+
const pushPromise = SequentialQueue.push(replacement);
149+
await waitForBatchedUpdates();
150+
151+
// No fetch fires while disk is pending (queue is also paused, but this guard
152+
// catches a regression where push() would call flush() before the await).
153+
expect(fetchMock).not.toHaveBeenCalled();
154+
expect(captured.length).toBeGreaterThanOrEqual(1);
155+
156+
for (const capturedSet of captured) {
157+
// Drain whatever conflict-resolution writes were issued (push/replace/delete chains
158+
// may issue more than one) so persistence is fully on disk before flush proceeds.
159+
await capturedSet.flush();
160+
}
161+
await pushPromise;
162+
await waitForBatchedUpdates();
163+
164+
const onDisk = await OnyxUtils.get(ONYXKEYS.PERSISTED_REQUESTS);
165+
expect(onDisk?.some((r) => r.data?.accountID === 99999)).toBe(true);
166+
} finally {
167+
restore();
168+
SequentialQueue.unpause();
169+
}
170+
});
171+
172+
it('queue-already-running path: push() awaits persistence even when a flush is already in flight', async () => {
173+
// First push starts the queue. We do not pause/intercept it — let it run.
174+
await SequentialQueue.push({...request});
175+
await waitForBatchedUpdates();
176+
fetchMock.mockClear();
177+
178+
const {captured, restore} = deferOnyxSetFor(ONYXKEYS.PERSISTED_REQUESTS);
179+
try {
180+
let resolved = false;
181+
const pushPromise = SequentialQueue.push({...request, data: {accountID: 4242}}).then(() => {
182+
resolved = true;
183+
});
184+
await waitForBatchedUpdates();
185+
186+
// push() must not have resolved yet — persistence is pending.
187+
expect(resolved).toBe(false);
188+
expect(captured.length).toBeGreaterThanOrEqual(1);
189+
190+
for (const capturedSet of captured) {
191+
await capturedSet.flush();
192+
}
193+
await pushPromise;
194+
expect(resolved).toBe(true);
195+
} finally {
196+
restore();
197+
}
198+
});
199+
});
200+
201+
describe('PersistedRequests.save - regression guard', () => {
202+
it('save() resolves only after the underlying Onyx.set completes', async () => {
203+
const {captured, restore} = deferOnyxSetFor(ONYXKEYS.PERSISTED_REQUESTS);
204+
try {
205+
let resolved = false;
206+
const savePromise = PersistedRequests.save({...request, requestID: 7777}).then(() => {
207+
resolved = true;
208+
});
209+
210+
await waitForBatchedUpdates();
211+
expect(resolved).toBe(false);
212+
expect(captured.length).toBeGreaterThanOrEqual(1);
213+
214+
await captured.at(0)?.flush();
215+
await savePromise;
216+
expect(resolved).toBe(true);
217+
} finally {
218+
restore();
219+
}
220+
});
221+
});

0 commit comments

Comments
 (0)