Skip to content

Commit fcd0bad

Browse files
committed
feat: parallelize container monitor dependency requests
Rewrite monitorDependencies in src/lib/ecosystems/monitor.ts to fan out per-ScanResult /monitor-dependencies PUTs in parallel via pMap, bounded by the SNYK_REQUEST_CONCURRENCY limit (default 5). Per-ScanResult work is extracted into monitorOneScanResult for testability and clarity. Container images that produce many ScanResults (e.g. one per directory of JARs in fat-JAR-heavy images) previously incurred one full RTT per scan result, since the prior implementation used a nested for-loop with await. With bounded parallelism this collapses to ~ceil(N / concurrency) sequential batches, materially reducing wall-clock for large images. Error semantics are preserved: - 401 still throws AuthFailedError (terminates the run). - Other 4xx still throws MonitorError (terminates via pMap fail-fast). - 5xx and other non-4xx errors are accumulated per-ScanResult into the errors array, matching the prior continue-on-error behavior. Result order is preserved by pMap based on input order, so output remains deterministic regardless of completion order. Tests cover concurrency cap (default 5), env override via the internal SNYK_INTERNAL_REQUEST_CONCURRENCY contract that the wrapping Go CLI forwards, ordering preservation, 4xx fail-fast, and 5xx accumulation.
1 parent abe1f81 commit fcd0bad

2 files changed

Lines changed: 226 additions & 39 deletions

File tree

src/lib/ecosystems/monitor.ts

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { InspectResult } from '@snyk/cli-interface/legacy/plugin';
22
import chalk from 'chalk';
3+
import * as pMap from 'p-map';
34
import config from '../config';
45
import { isCI } from '../is-ci';
56
import { makeRequest } from '../request/promise';
7+
import { getRequestConcurrency } from '../snyk-test/common';
68
import {
79
Contributor,
810
MonitorOptions,
@@ -148,54 +150,83 @@ async function monitorDependencies(
148150
): Promise<[EcosystemMonitorResult[], EcosystemMonitorError[]]> {
149151
const results: EcosystemMonitorResult[] = [];
150152
const errors: EcosystemMonitorError[] = [];
153+
const concurrency = getRequestConcurrency();
154+
151155
for (const [path, scanResults] of Object.entries(scans)) {
152156
await spinner(`Monitoring dependencies in ${path}`);
153-
for (const scanResult of scanResults) {
154-
const monitorDependenciesRequest =
155-
await generateMonitorDependenciesRequest(scanResult, options);
156-
157-
const configOrg = config.org ? decodeURIComponent(config.org) : undefined;
158-
159-
const payload = {
160-
method: 'PUT',
161-
url: `${config.API}/monitor-dependencies`,
162-
json: true,
163-
headers: {
164-
'x-is-ci': isCI(),
165-
authorization: getAuthHeader(),
166-
},
167-
body: monitorDependenciesRequest,
168-
qs: {
169-
org: options.org || configOrg,
170-
},
171-
};
172-
try {
173-
const response =
174-
await makeRequest<MonitorDependenciesResponse>(payload);
175-
results.push({
176-
...response,
177-
path,
178-
scanResult,
179-
});
180-
} catch (error) {
181-
if (error.code === 401) {
182-
throw AuthFailedError();
183-
}
184-
if (error.code >= 400 && error.code < 500) {
185-
throw new MonitorError(error.code, error.message);
186-
}
187-
errors.push({
188-
error: 'Could not monitor dependencies in ' + path,
189-
path,
190-
scanResult,
191-
});
157+
const perScanResults = await pMap(
158+
scanResults,
159+
(scanResult) => monitorOneScanResult(scanResult, options, path),
160+
{ concurrency },
161+
);
162+
for (const r of perScanResults) {
163+
if (r.result) {
164+
results.push(r.result);
165+
}
166+
if (r.error) {
167+
errors.push(r.error);
192168
}
193169
}
194170
spinner.clearAll();
195171
}
196172
return [results, errors];
197173
}
198174

175+
async function monitorOneScanResult(
176+
scanResult: ScanResult,
177+
options: Options & MonitorOptions,
178+
path: string,
179+
): Promise<{
180+
result?: EcosystemMonitorResult;
181+
error?: EcosystemMonitorError;
182+
}> {
183+
const monitorDependenciesRequest = await generateMonitorDependenciesRequest(
184+
scanResult,
185+
options,
186+
);
187+
188+
const configOrg = config.org ? decodeURIComponent(config.org) : undefined;
189+
190+
const payload = {
191+
method: 'PUT',
192+
url: `${config.API}/monitor-dependencies`,
193+
json: true,
194+
headers: {
195+
'x-is-ci': isCI(),
196+
authorization: getAuthHeader(),
197+
},
198+
body: monitorDependenciesRequest,
199+
qs: {
200+
org: options.org || configOrg,
201+
},
202+
};
203+
204+
try {
205+
const response = await makeRequest<MonitorDependenciesResponse>(payload);
206+
return {
207+
result: {
208+
...response,
209+
path,
210+
scanResult,
211+
},
212+
};
213+
} catch (error) {
214+
if (error.code === 401) {
215+
throw AuthFailedError();
216+
}
217+
if (error.code >= 400 && error.code < 500) {
218+
throw new MonitorError(error.code, error.message);
219+
}
220+
return {
221+
error: {
222+
error: 'Could not monitor dependencies in ' + path,
223+
path,
224+
scanResult,
225+
},
226+
};
227+
}
228+
}
229+
199230
export async function getFormattedMonitorOutput(
200231
results: Array<GoodResult | BadResult>,
201232
monitorResults: EcosystemMonitorResult[],

test/jest/unit/ecosystems-monitor-docker.spec.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,160 @@ describe('monitorEcosystem docker/container', () => {
285285
);
286286
expect(parsedOutput.projectName).not.toBe('my-custom-project-name');
287287
});
288+
289+
describe('parallelization of monitor-dependencies requests', () => {
290+
const ORIGINAL_CONCURRENCY = process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
291+
292+
afterEach(() => {
293+
if (ORIGINAL_CONCURRENCY === undefined) {
294+
delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
295+
} else {
296+
process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = ORIGINAL_CONCURRENCY;
297+
}
298+
});
299+
300+
function makeMavenScanResult(targetFile: string): ScanResult {
301+
const base = readJsonFixture(
302+
'maven-project-0-dependencies-scan-result.json',
303+
) as ScanResult;
304+
return {
305+
...base,
306+
identity: { ...base.identity, targetFile },
307+
};
308+
}
309+
310+
function makeMonitorResponse(identity: string) {
311+
const base = readJsonFixture(
312+
'monitor-dependencies-response-with-project-name.json',
313+
) as ecosystemsTypes.MonitorDependenciesResponse;
314+
return {
315+
...base,
316+
id: `${identity}-id`,
317+
projectName: identity,
318+
};
319+
}
320+
321+
async function runMonitor(scanResults: ScanResult[]) {
322+
jest.spyOn(dockerPlugin, 'scan').mockResolvedValue({ scanResults });
323+
return ecosystems.monitorEcosystem('docker', ['/srv'], {
324+
path: '/srv',
325+
docker: true,
326+
org: 'my-org',
327+
});
328+
}
329+
330+
it('caps in-flight requests at the default concurrency (5)', async () => {
331+
delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
332+
const scanResults = Array.from({ length: 25 }, (_, i) =>
333+
makeMavenScanResult(`app-${i}`),
334+
);
335+
336+
let inFlight = 0;
337+
let peakInFlight = 0;
338+
jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => {
339+
inFlight++;
340+
peakInFlight = Math.max(peakInFlight, inFlight);
341+
const identity = payload.body.scanResult.identity.targetFile;
342+
return new Promise((resolve) => {
343+
setTimeout(() => {
344+
inFlight--;
345+
resolve(makeMonitorResponse(identity));
346+
}, 10);
347+
});
348+
});
349+
350+
await runMonitor(scanResults);
351+
352+
expect(peakInFlight).toBeLessThanOrEqual(5);
353+
expect(peakInFlight).toBeGreaterThan(1);
354+
});
355+
356+
it('respects SNYK_INTERNAL_REQUEST_CONCURRENCY override', async () => {
357+
process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = '3';
358+
const scanResults = Array.from({ length: 15 }, (_, i) =>
359+
makeMavenScanResult(`app-${i}`),
360+
);
361+
362+
let inFlight = 0;
363+
let peakInFlight = 0;
364+
jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => {
365+
inFlight++;
366+
peakInFlight = Math.max(peakInFlight, inFlight);
367+
const identity = payload.body.scanResult.identity.targetFile;
368+
return new Promise((resolve) => {
369+
setTimeout(() => {
370+
inFlight--;
371+
resolve(makeMonitorResponse(identity));
372+
}, 10);
373+
});
374+
});
375+
376+
await runMonitor(scanResults);
377+
378+
expect(peakInFlight).toBeLessThanOrEqual(3);
379+
});
380+
381+
it('preserves result order matching input order', async () => {
382+
delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
383+
const scanResults = ['os', 'app-1', 'app-2', 'app-3', 'app-4'].map(
384+
makeMavenScanResult,
385+
);
386+
387+
// Stagger response times in reverse so completion order != input order.
388+
jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => {
389+
const identity = payload.body.scanResult.identity.targetFile;
390+
const delay =
391+
{ os: 30, 'app-1': 20, 'app-2': 5, 'app-3': 25, 'app-4': 10 }[
392+
identity
393+
] ?? 0;
394+
return new Promise((resolve) =>
395+
setTimeout(() => resolve(makeMonitorResponse(identity)), delay),
396+
);
397+
});
398+
399+
const [results] = await runMonitor(scanResults);
400+
401+
expect(results.map((r) => r.projectName)).toEqual([
402+
'os',
403+
'app-1',
404+
'app-2',
405+
'app-3',
406+
'app-4',
407+
]);
408+
});
409+
410+
it('throws MonitorError when any request returns 4xx (fail-fast)', async () => {
411+
delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
412+
const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult);
413+
414+
jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => {
415+
const identity = payload.body.scanResult.identity.targetFile;
416+
if (identity === 'app-2') {
417+
return Promise.reject({ code: 403, message: 'forbidden' });
418+
}
419+
return Promise.resolve(makeMonitorResponse(identity));
420+
});
421+
422+
await expect(runMonitor(scanResults)).rejects.toThrow('forbidden');
423+
});
424+
425+
it('accumulates 5xx errors per scan-result without aborting', async () => {
426+
delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY;
427+
const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult);
428+
429+
jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => {
430+
const identity = payload.body.scanResult.identity.targetFile;
431+
if (identity === 'app-2') {
432+
return Promise.reject({ code: 503, message: 'unavailable' });
433+
}
434+
return Promise.resolve(makeMonitorResponse(identity));
435+
});
436+
437+
const [results, errors] = await runMonitor(scanResults);
438+
439+
expect(results.map((r) => r.projectName)).toEqual(['app-1', 'app-3']);
440+
expect(errors).toHaveLength(1);
441+
expect(errors[0].error).toContain('Could not monitor dependencies');
442+
});
443+
});
288444
});

0 commit comments

Comments
 (0)