Skip to content

Commit b98da73

Browse files
authored
refactor: parallelize per-scope contract syncs (backport #22525) (#22561)
## Summary Backport of #22525 to v4-next. Cherry-pick applied cleanly with no conflicts. No additional changes needed. ## Original PR Parallelizes per-scope contract syncs in the PXE ContractSyncService, capping concurrency at 5 via a Semaphore. This addresses a performance regression from moving to sequential per-scope-and-contract syncing. Also adds a concurrent WASM execution test. ClaudeBox log: https://claudebox.work/s/ea5bb0472c2216a5?run=1
2 parents 100a143 + 7a2829b commit b98da73

2 files changed

Lines changed: 147 additions & 51 deletions

File tree

yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution.test.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,96 @@ describe('Utility Execution test suite', () => {
227227
expect(offchainEffects).toEqual([]);
228228
}, 30_000);
229229

230+
it('WASM simulator supports N-way concurrent utility execution', async () => {
231+
// Fires N concurrent runUtility calls through the shared WASMSimulator instance. Each call runs its own ACIR
232+
// execution with its own oracle, but they all funnel through the same `executeUserCircuit` entry point and
233+
// therefore the same Wasm module. If the underlying noir-acvm_js binding cannot handle overlapping executions,
234+
// concurrent calls can corrupt each other's witness maps: we'd expect crashes, rejections, or wrong sums.
235+
//
236+
// This guards the parallelization in ContractSyncService (MAX_CONCURRENT_SCOPE_SYNCS = 5), which is the first
237+
// code path to run multiple utility ACIR calls simultaneously. N is set to 10 so we test twice the production
238+
// cap to catch issues that would only surface at higher concurrency.
239+
const N = 10;
240+
const artifact = {
241+
...StatefulTestContractArtifact.functions.find(f => f.name === 'summed_values')!,
242+
contractName: StatefulTestContractArtifact.name,
243+
};
244+
245+
const notes: Note[] = [...Array(5).fill(buildNote(1n)), ...Array(2).fill(buildNote(2n))];
246+
const expectedSum = new Fr(9);
247+
248+
const instanceFields = {
249+
version: 1 as const,
250+
salt: Fr.random(),
251+
deployer: await AztecAddress.random(),
252+
currentContractClassId: new Fr(42),
253+
originalContractClassId: new Fr(42),
254+
initializationHash: Fr.random(),
255+
publicKeys: await PublicKeys.random(),
256+
};
257+
const contractAddress = await computeContractAddressFromInstance(instanceFields);
258+
259+
aztecNode.getPublicStorageAt.mockResolvedValue(Fr.ZERO);
260+
aztecNode.findLeavesIndexes.mockResolvedValue([
261+
{ data: 1n, l2BlockNumber: BlockNumber(1), l2BlockHash: BlockHash.random() },
262+
]);
263+
contractStore.getFunctionArtifact.mockResolvedValue(artifact);
264+
contractStore.getContractInstance.mockResolvedValue({
265+
...instanceFields,
266+
address: contractAddress,
267+
} as ContractInstanceWithAddress);
268+
contractStore.getFunctionArtifactWithDebugMetadata.mockImplementation(async (address, selector) => {
269+
const artifact = await contractStore.getFunctionArtifact(address, selector);
270+
if (!artifact) {
271+
throw new Error(`Function not found: ${selector.toString()} in contract ${address}`);
272+
}
273+
return { ...artifact, debug: undefined };
274+
});
275+
noteStore.getNotes.mockResolvedValue(
276+
notes.map(
277+
note =>
278+
new NoteDao(
279+
note,
280+
contractAddress,
281+
owner,
282+
Fr.random(),
283+
Fr.random(),
284+
Fr.random(),
285+
Fr.random(),
286+
Fr.random(),
287+
TxHash.random(),
288+
BlockNumber(42),
289+
BlockHash.random().toString(),
290+
0,
291+
0,
292+
),
293+
),
294+
);
295+
capsuleStore.getCapsule.mockImplementation((_, __) => Promise.resolve(null));
296+
297+
const execRequest = FunctionCall.from({
298+
name: artifact.name,
299+
to: contractAddress,
300+
selector: FunctionSelector.empty(),
301+
type: FunctionType.UTILITY,
302+
hideMsgSender: false,
303+
isStatic: false,
304+
args: encodeArguments(artifact, [owner]),
305+
returnTypes: artifact.returnTypes,
306+
});
307+
308+
const results = await Promise.all(
309+
Array.from({ length: N }, (_, i) =>
310+
acirSimulator.runUtility(execRequest, [], anchorBlockHeader, [], `reentrance-job-${i}`),
311+
),
312+
);
313+
314+
expect(results).toHaveLength(N);
315+
for (const { result } of results) {
316+
expect(result).toEqual([expectedSum]);
317+
}
318+
}, 60_000);
319+
230320
describe('UtilityExecutionOracle', () => {
231321
let contractAddress: AztecAddress;
232322
let utilityExecutionOracle: UtilityExecutionOracle;

yarn-project/pxe/src/contract_sync/contract_sync_service.ts

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Logger } from '@aztec/foundation/log';
2+
import { Semaphore } from '@aztec/foundation/queue';
23
import type { FunctionCall, FunctionSelector } from '@aztec/stdlib/abi';
34
import type { AztecAddress } from '@aztec/stdlib/aztec-address';
45
import type { AztecNode } from '@aztec/stdlib/interfaces/client';
@@ -9,6 +10,9 @@ import type { ContractStore } from '../storage/contract_store/contract_store.js'
910
import type { NoteStore } from '../storage/note_store/note_store.js';
1011
import { syncState, verifyCurrentClassId } from './helpers.js';
1112

13+
/** Maximum number of scope syncs running concurrently across the PXE. */
14+
const MAX_CONCURRENT_SCOPE_SYNCS = 5;
15+
1216
/**
1317
* Service for syncing the private state of contracts and verifying that the PXE holds the current class artifact.
1418
* It uses a cache to avoid redundant sync operations - the cache is wiped when the anchor block changes.
@@ -26,6 +30,11 @@ export class ContractSyncService implements StagedStore {
2630
// Per-job excluded contract addresses - these contracts should not be synced.
2731
private excludedFromSync: Map<string, Set<string>> = new Map();
2832

33+
// Bounds the number of scope syncs running concurrently. Scopes beyond this limit queue here. Sized to trade off
34+
// parallelism on non-ACIR work (node RPC, note store reads) against memory pressure from concurrent circuit
35+
// execution.
36+
#syncSlot = new Semaphore(MAX_CONCURRENT_SCOPE_SYNCS);
37+
2938
constructor(
3039
private aztecNode: AztecNode,
3140
private contractStore: ContractStore,
@@ -59,15 +68,22 @@ export class ContractSyncService implements StagedStore {
5968
return;
6069
}
6170

62-
this.#startSyncIfNeeded(contractAddress, scopes, scopesToSync =>
63-
this.#syncContract(
64-
contractAddress,
65-
functionToInvokeAfterSync,
66-
utilityExecutor,
67-
anchorBlockHeader,
68-
jobId,
69-
scopesToSync,
70-
),
71+
this.#startSyncIfNeeded(
72+
contractAddress,
73+
scopes,
74+
() => verifyCurrentClassId(contractAddress, this.aztecNode, this.contractStore, anchorBlockHeader),
75+
scope =>
76+
syncState(
77+
contractAddress,
78+
this.contractStore,
79+
functionToInvokeAfterSync,
80+
utilityExecutor,
81+
this.noteStore,
82+
this.aztecNode,
83+
anchorBlockHeader,
84+
jobId,
85+
scope,
86+
),
7187
);
7288

7389
await this.#awaitSync(contractAddress, scopes);
@@ -81,39 +97,6 @@ export class ContractSyncService implements StagedStore {
8197
scopes.forEach(scope => this.syncedContracts.delete(toKey(contractAddress, scope)));
8298
}
8399

84-
async #syncContract(
85-
contractAddress: AztecAddress,
86-
functionToInvokeAfterSync: FunctionSelector | null,
87-
utilityExecutor: (call: FunctionCall, scopes: AztecAddress[]) => Promise<any>,
88-
anchorBlockHeader: BlockHeader,
89-
jobId: string,
90-
scopes: AztecAddress[],
91-
): Promise<void> {
92-
this.log.debug(`Syncing contract ${contractAddress}`);
93-
94-
await Promise.all([
95-
// Call sync_state sequentially for each scope address — each invocation synchronizes one account's private
96-
// state using scoped capsule arrays.
97-
(async () => {
98-
for (const scope of scopes) {
99-
await syncState(
100-
contractAddress,
101-
this.contractStore,
102-
functionToInvokeAfterSync,
103-
utilityExecutor,
104-
this.noteStore,
105-
this.aztecNode,
106-
anchorBlockHeader,
107-
jobId,
108-
scope,
109-
);
110-
}
111-
})(),
112-
verifyCurrentClassId(contractAddress, this.aztecNode, this.contractStore, anchorBlockHeader),
113-
]);
114-
this.log.debug(`Contract ${contractAddress} synced`);
115-
}
116-
117100
/** Clears sync cache. Called by BlockSynchronizer when anchor block changes. */
118101
wipe(): void {
119102
this.log.debug(`Wiping contract sync cache (${this.syncedContracts.size} entries)`);
@@ -138,22 +121,45 @@ export class ContractSyncService implements StagedStore {
138121
return !!this.excludedFromSync.get(jobId)?.has(contractAddress.toString());
139122
}
140123

141-
/** If there are unsynced scopes, starts sync and stores the promise in cache with error cleanup. */
124+
/**
125+
* If there are unsynced scopes, starts one sync per scope (bounded by #syncSlot) and stores each promise in the
126+
* cache with per-scope error cleanup. The verifyFn runs once for the whole fan-out and is awaited by every new
127+
* scope's promise, matching the pre-parallelization invariant that a cache-miss batch re-verifies the class id.
128+
*/
142129
#startSyncIfNeeded(
143130
contractAddress: AztecAddress,
144131
scopes: AztecAddress[],
145-
syncFn: (scopesToSync: AztecAddress[]) => Promise<void>,
132+
verifyFn: () => Promise<void>,
133+
syncScopeFn: (scope: AztecAddress) => Promise<void>,
146134
): void {
147135
const scopesToSync = scopes.filter(scope => !this.syncedContracts.has(toKey(contractAddress, scope)));
148-
const keys = scopesToSync.map(scope => toKey(contractAddress, scope));
149-
if (keys.length === 0) {
136+
if (scopesToSync.length === 0) {
150137
return;
151138
}
152-
const promise = syncFn(scopesToSync).catch(err => {
153-
keys.forEach(key => this.syncedContracts.delete(key));
154-
throw err;
155-
});
156-
keys.forEach(key => this.syncedContracts.set(key, promise));
139+
140+
this.log.debug(`Syncing contract ${contractAddress} for ${scopesToSync.length} scope(s)`);
141+
const verifyPromise = verifyFn();
142+
143+
for (const scope of scopesToSync) {
144+
const key = toKey(contractAddress, scope);
145+
const promise = Promise.all([verifyPromise, this.#runBounded(() => syncScopeFn(scope))])
146+
.then(() => {})
147+
.catch(err => {
148+
this.syncedContracts.delete(key);
149+
throw err;
150+
});
151+
this.syncedContracts.set(key, promise);
152+
}
153+
}
154+
155+
/** Runs fn while holding a slot in #syncSlot, bounding total concurrent scope syncs. */
156+
async #runBounded<T>(fn: () => Promise<T>): Promise<T> {
157+
await this.#syncSlot.acquire();
158+
try {
159+
return await fn();
160+
} finally {
161+
this.#syncSlot.release();
162+
}
157163
}
158164

159165
/** Collects all relevant scope promises (including in-flight ones from concurrent calls) and awaits them. */

0 commit comments

Comments
 (0)