diff --git a/src/lib/ecosystems/monitor.ts b/src/lib/ecosystems/monitor.ts index 6a02f6e78b..19936044f3 100644 --- a/src/lib/ecosystems/monitor.ts +++ b/src/lib/ecosystems/monitor.ts @@ -1,8 +1,10 @@ import { InspectResult } from '@snyk/cli-interface/legacy/plugin'; import chalk from 'chalk'; +import * as pMap from 'p-map'; import config from '../config'; import { isCI } from '../is-ci'; import { makeRequest } from '../request/promise'; +import { getRequestConcurrency } from '../snyk-test/common'; import { Contributor, MonitorOptions, @@ -148,47 +150,21 @@ async function monitorDependencies( ): Promise<[EcosystemMonitorResult[], EcosystemMonitorError[]]> { const results: EcosystemMonitorResult[] = []; const errors: EcosystemMonitorError[] = []; + const concurrency = getRequestConcurrency(); + for (const [path, scanResults] of Object.entries(scans)) { await spinner(`Monitoring dependencies in ${path}`); - for (const scanResult of scanResults) { - const monitorDependenciesRequest = - await generateMonitorDependenciesRequest(scanResult, options); - - const configOrg = config.org ? decodeURIComponent(config.org) : undefined; - - const payload = { - method: 'PUT', - url: `${config.API}/monitor-dependencies`, - json: true, - headers: { - 'x-is-ci': isCI(), - authorization: getAuthHeader(), - }, - body: monitorDependenciesRequest, - qs: { - org: options.org || configOrg, - }, - }; - try { - const response = - await makeRequest(payload); - results.push({ - ...response, - path, - scanResult, - }); - } catch (error) { - if (error.code === 401) { - throw AuthFailedError(); - } - if (error.code >= 400 && error.code < 500) { - throw new MonitorError(error.code, error.message); - } - errors.push({ - error: 'Could not monitor dependencies in ' + path, - path, - scanResult, - }); + const perScanResults = await pMap( + scanResults, + (scanResult) => monitorOneScanResult(scanResult, options, path), + { concurrency }, + ); + for (const r of perScanResults) { + if (r.result) { + results.push(r.result); + } + if (r.error) { + errors.push(r.error); } } spinner.clearAll(); @@ -196,6 +172,61 @@ async function monitorDependencies( return [results, errors]; } +async function monitorOneScanResult( + scanResult: ScanResult, + options: Options & MonitorOptions, + path: string, +): Promise<{ + result?: EcosystemMonitorResult; + error?: EcosystemMonitorError; +}> { + const monitorDependenciesRequest = await generateMonitorDependenciesRequest( + scanResult, + options, + ); + + const configOrg = config.org ? decodeURIComponent(config.org) : undefined; + + const payload = { + method: 'PUT', + url: `${config.API}/monitor-dependencies`, + json: true, + headers: { + 'x-is-ci': isCI(), + authorization: getAuthHeader(), + }, + body: monitorDependenciesRequest, + qs: { + org: options.org || configOrg, + }, + }; + + try { + const response = await makeRequest(payload); + return { + result: { + ...response, + path, + scanResult, + }, + }; + } catch (error) { + if (error.code === 401) { + throw AuthFailedError(); + } + if (error.code >= 400 && error.code < 500) { + throw new MonitorError(error.code, error.message); + } + return { + error: { + error: 'Could not monitor dependencies in ' + path, + path, + scanResult, + }, + }; + } +} + export async function getFormattedMonitorOutput( results: Array, monitorResults: EcosystemMonitorResult[], diff --git a/test/jest/unit/ecosystems-monitor-docker.spec.ts b/test/jest/unit/ecosystems-monitor-docker.spec.ts index b2f113b9cc..d8717a66b1 100644 --- a/test/jest/unit/ecosystems-monitor-docker.spec.ts +++ b/test/jest/unit/ecosystems-monitor-docker.spec.ts @@ -285,4 +285,160 @@ describe('monitorEcosystem docker/container', () => { ); expect(parsedOutput.projectName).not.toBe('my-custom-project-name'); }); + + describe('parallelization of monitor-dependencies requests', () => { + const ORIGINAL_CONCURRENCY = process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + + afterEach(() => { + if (ORIGINAL_CONCURRENCY === undefined) { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + } else { + process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = ORIGINAL_CONCURRENCY; + } + }); + + function makeMavenScanResult(targetFile: string): ScanResult { + const base = readJsonFixture( + 'maven-project-0-dependencies-scan-result.json', + ) as ScanResult; + return { + ...base, + identity: { ...base.identity, targetFile }, + }; + } + + function makeMonitorResponse(identity: string) { + const base = readJsonFixture( + 'monitor-dependencies-response-with-project-name.json', + ) as ecosystemsTypes.MonitorDependenciesResponse; + return { + ...base, + id: `${identity}-id`, + projectName: identity, + }; + } + + async function runMonitor(scanResults: ScanResult[]) { + jest.spyOn(dockerPlugin, 'scan').mockResolvedValue({ scanResults }); + return ecosystems.monitorEcosystem('docker', ['/srv'], { + path: '/srv', + docker: true, + org: 'my-org', + }); + } + + it('caps in-flight requests at the default concurrency (5)', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = Array.from({ length: 25 }, (_, i) => + makeMavenScanResult(`app-${i}`), + ); + + let inFlight = 0; + let peakInFlight = 0; + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + inFlight++; + peakInFlight = Math.max(peakInFlight, inFlight); + const identity = payload.body.scanResult.identity.targetFile; + return new Promise((resolve) => { + setTimeout(() => { + inFlight--; + resolve(makeMonitorResponse(identity)); + }, 10); + }); + }); + + await runMonitor(scanResults); + + expect(peakInFlight).toBeLessThanOrEqual(5); + expect(peakInFlight).toBeGreaterThan(1); + }); + + it('respects SNYK_INTERNAL_REQUEST_CONCURRENCY override', async () => { + process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = '3'; + const scanResults = Array.from({ length: 15 }, (_, i) => + makeMavenScanResult(`app-${i}`), + ); + + let inFlight = 0; + let peakInFlight = 0; + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + inFlight++; + peakInFlight = Math.max(peakInFlight, inFlight); + const identity = payload.body.scanResult.identity.targetFile; + return new Promise((resolve) => { + setTimeout(() => { + inFlight--; + resolve(makeMonitorResponse(identity)); + }, 10); + }); + }); + + await runMonitor(scanResults); + + expect(peakInFlight).toBeLessThanOrEqual(3); + }); + + it('preserves result order matching input order', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['os', 'app-1', 'app-2', 'app-3', 'app-4'].map( + makeMavenScanResult, + ); + + // Stagger response times in reverse so completion order != input order. + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + const delay = + { os: 30, 'app-1': 20, 'app-2': 5, 'app-3': 25, 'app-4': 10 }[ + identity + ] ?? 0; + return new Promise((resolve) => + setTimeout(() => resolve(makeMonitorResponse(identity)), delay), + ); + }); + + const [results] = await runMonitor(scanResults); + + expect(results.map((r) => r.projectName)).toEqual([ + 'os', + 'app-1', + 'app-2', + 'app-3', + 'app-4', + ]); + }); + + it('throws MonitorError when any request returns 4xx (fail-fast)', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult); + + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + if (identity === 'app-2') { + return Promise.reject({ code: 403, message: 'forbidden' }); + } + return Promise.resolve(makeMonitorResponse(identity)); + }); + + await expect(runMonitor(scanResults)).rejects.toThrow('forbidden'); + }); + + it('accumulates 5xx errors per scan-result without aborting', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult); + + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + if (identity === 'app-2') { + return Promise.reject({ code: 503, message: 'unavailable' }); + } + return Promise.resolve(makeMonitorResponse(identity)); + }); + + const [results, errors] = await runMonitor(scanResults); + + expect(results.map((r) => r.projectName)).toEqual(['app-1', 'app-3']); + expect(errors).toHaveLength(1); + expect(errors[0].error).toContain('Could not monitor dependencies'); + }); + }); });