Skip to content

Commit 3a5cc0e

Browse files
[node-core-library] Add allowOversubscription option (#5355)
* Fix concurrency bug for max weighted operation scheduling * create Peekable iterator * changelog * go back to singleton next iterator * reviews * reviews * update changelogs * api reviews * test cleanup before review * move allowOversubscription to command-line.json * remove un-needed changes * reviews * pull bump type from version-policies * change to none * Change `IAsyncParallelismOptions.allowOversubscription` default to false; improve docs --------- Co-authored-by: Pete Gonzalez <4673363+octogonz@users.noreply.github.com>
1 parent 1f3e785 commit 3a5cc0e

12 files changed

Lines changed: 270 additions & 17 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@microsoft/rush",
5+
"comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.",
6+
"type": "none"
7+
}
8+
],
9+
"packageName": "@microsoft/rush"
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@rushstack/node-core-library",
5+
"comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency. Change its default to `false`.",
6+
"type": "minor"
7+
}
8+
],
9+
"packageName": "@rushstack/node-core-library"
10+
}

common/reviews/api/node-core-library.api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ export type FolderItem = nodeFs.Dirent;
238238

239239
// @public
240240
export interface IAsyncParallelismOptions {
241+
allowOversubscription?: boolean;
241242
concurrency?: number;
242243
weighted?: boolean;
243244
}

libraries/node-core-library/src/Async.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,28 @@ export interface IAsyncParallelismOptions {
1919
concurrency?: number;
2020

2121
/**
22-
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can
23-
* take up more or less than one concurrency unit.
22+
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an
23+
* operation can take up more or less than one concurrency unit.
2424
*/
2525
weighted?: boolean;
26+
27+
/**
28+
* This option affects the handling of task weights, applying a softer policy that favors maximizing parallelism
29+
* instead of avoiding overload.
30+
*
31+
* @remarks
32+
* By default, a new task cannot start executing if doing so would push the total weight above the concurrency limit.
33+
* Set `allowOversubscription` to true to relax this rule, allowing a new task to start as long as the current
34+
* total weight is below the concurrency limit. Either way, a task cannot start if the total weight already equals
35+
* the concurrency limit; therefore, `allowOversubscription` has no effect when all tasks have weight 1.
36+
*
37+
* Example: Suppose the concurrency limit is 8, and seven tasks are running whose weights are 1, so the current
38+
* total weight is 7. If an available task has weight 2, that would push the total weight to 9, exceeding
39+
* the limit. This task can start only if `allowOversubscription` is true.
40+
*
41+
* @defaultValue false
42+
*/
43+
allowOversubscription?: boolean;
2644
}
2745

2846
/**
@@ -201,6 +219,8 @@ export class Async {
201219
let arrayIndex: number = 0;
202220
let iteratorIsComplete: boolean = false;
203221
let promiseHasResolvedOrRejected: boolean = false;
222+
// iterator that is stored when the loop exits early due to not enough concurrency
223+
let nextIterator: IteratorResult<TEntry> | undefined = undefined;
204224

205225
async function queueOperationsAsync(): Promise<void> {
206226
while (
@@ -213,7 +233,7 @@ export class Async {
213233
// there will be effectively no cap on the number of operations waiting.
214234
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
215235
concurrentUnitsInProgress += limitedConcurrency;
216-
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
236+
const currentIteratorResult: IteratorResult<TEntry> = nextIterator ?? (await iterator.next());
217237
// eslint-disable-next-line require-atomic-updates
218238
iteratorIsComplete = !!currentIteratorResult.done;
219239

@@ -225,9 +245,21 @@ export class Async {
225245

226246
// Remove the "lock" from the concurrency check and only apply the current weight.
227247
// This should allow other operations to execute.
228-
concurrentUnitsInProgress += weight;
229248
concurrentUnitsInProgress -= limitedConcurrency;
230249

250+
// Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync`
251+
const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency;
252+
const allowOversubscription: boolean = options?.allowOversubscription ?? false;
253+
if (!allowOversubscription && wouldExceedConcurrency) {
254+
// eslint-disable-next-line require-atomic-updates
255+
nextIterator = currentIteratorResult;
256+
break;
257+
}
258+
259+
// eslint-disable-next-line require-atomic-updates
260+
nextIterator = undefined;
261+
concurrentUnitsInProgress += weight;
262+
231263
Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
232264
.then(async () => {
233265
// Remove the operation completely from the in progress units.
@@ -306,6 +338,7 @@ export class Async {
306338
* number of concurrency units that can be in progress at once. The weight of each operation
307339
* determines how many concurrency units it takes up. For example, if the concurrency is 2
308340
* and the first operation has a weight of 2, then only one more operation can be in progress.
341+
* Operations may exceed the concurrency limit based on the `allowOversubscription` option.
309342
*
310343
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
311344
* then the loop stops immediately. Any remaining array items will be skipped, and

libraries/node-core-library/src/test/Async.test.ts

Lines changed: 171 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33

44
import { Async, AsyncQueue } from '../Async';
55

6+
interface INumberWithWeight {
7+
n: number;
8+
weight: number;
9+
}
10+
611
describe(Async.name, () => {
712
describe(Async.mapAsync.name, () => {
813
it('handles an empty array correctly', async () => {
@@ -27,13 +32,6 @@ describe(Async.name, () => {
2732
expect(fn).toHaveBeenNthCalledWith(3, 3, 2);
2833
});
2934

30-
it('returns the same result as built-in Promise.all', async () => {
31-
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
32-
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;
33-
34-
expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
35-
});
36-
3735
it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
3836
let running: number = 0;
3937
let maxRunning: number = 0;
@@ -61,6 +59,31 @@ describe(Async.name, () => {
6159
expect(maxRunning).toEqual(3);
6260
});
6361

62+
it('respects concurrency limit with allowOversubscription=false in mapAsync', async () => {
63+
const array: INumberWithWeight[] = [
64+
{ n: 1, weight: 2 },
65+
{ n: 2, weight: 2 }
66+
];
67+
68+
let running = 0;
69+
let maxRunning = 0;
70+
71+
const result = await Async.mapAsync(
72+
array,
73+
async (item) => {
74+
running++;
75+
maxRunning = Math.max(maxRunning, running);
76+
await Async.sleepAsync(0);
77+
running--;
78+
return `result-${item.n}`;
79+
},
80+
{ concurrency: 3, weighted: true, allowOversubscription: false }
81+
);
82+
83+
expect(result).toEqual(['result-1', 'result-2']);
84+
expect(maxRunning).toEqual(1);
85+
});
86+
6487
it('rejects if a sync iterator throws an error', async () => {
6588
const expectedError: Error = new Error('iterator error');
6689
let iteratorIndex: number = 0;
@@ -314,11 +337,6 @@ describe(Async.name, () => {
314337
).rejects.toThrow(expectedError);
315338
});
316339

317-
interface INumberWithWeight {
318-
n: number;
319-
weight: number;
320-
}
321-
322340
it('handles an empty array correctly', async () => {
323341
let running: number = 0;
324342
let maxRunning: number = 0;
@@ -469,7 +487,7 @@ describe(Async.name, () => {
469487
running--;
470488
});
471489

472-
await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true });
490+
await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: true });
473491
expect(fn).toHaveBeenCalledTimes(8);
474492
expect(maxRunning).toEqual(2);
475493
});
@@ -542,6 +560,10 @@ describe(Async.name, () => {
542560
});
543561

544562
describe(Async.runWithRetriesAsync.name, () => {
563+
afterEach(() => {
564+
jest.restoreAllMocks();
565+
});
566+
545567
it('Correctly handles a sync function that succeeds the first time', async () => {
546568
const expectedResult: string = 'RESULT';
547569
const result: string = await Async.runWithRetriesAsync({ action: () => expectedResult, maxRetries: 0 });
@@ -688,6 +710,142 @@ describe(Async.name, () => {
688710
expect(sleepSpy).toHaveBeenCalledTimes(1);
689711
expect(sleepSpy).toHaveBeenLastCalledWith(5);
690712
});
713+
714+
describe('allowOversubscription=false operations', () => {
715+
it.each([
716+
{
717+
concurrency: 4,
718+
weight: 4,
719+
expectedConcurrency: 1,
720+
numberOfTasks: 4
721+
},
722+
{
723+
concurrency: 4,
724+
weight: 1,
725+
expectedConcurrency: 4,
726+
numberOfTasks: 4
727+
},
728+
{
729+
concurrency: 4,
730+
weight: 5,
731+
expectedConcurrency: 1,
732+
numberOfTasks: 2
733+
}
734+
])(
735+
'enforces strict concurrency limits when allowOversubscription=false: concurrency=$concurrency, weight=$weight, expects max $expectedConcurrency concurrent operations',
736+
async ({ concurrency, weight, expectedConcurrency, numberOfTasks }) => {
737+
let running: number = 0;
738+
let maxRunning: number = 0;
739+
740+
const array: INumberWithWeight[] = Array.from({ length: numberOfTasks }, (v, i) => i).map((n) => ({
741+
n,
742+
weight
743+
}));
744+
745+
const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async () => {
746+
running++;
747+
await Async.sleepAsync(0);
748+
maxRunning = Math.max(maxRunning, running);
749+
running--;
750+
});
751+
752+
await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false });
753+
expect(fn).toHaveBeenCalledTimes(numberOfTasks);
754+
expect(maxRunning).toEqual(expectedConcurrency);
755+
}
756+
);
757+
758+
it('waits for a small and large operation to finish before scheduling more', async () => {
759+
let running: number = 0;
760+
let maxRunning: number = 0;
761+
762+
const array: INumberWithWeight[] = [
763+
{ n: 1, weight: 1 },
764+
{ n: 2, weight: 10 },
765+
{ n: 3, weight: 1 },
766+
{ n: 4, weight: 10 },
767+
{ n: 5, weight: 1 },
768+
{ n: 6, weight: 10 },
769+
{ n: 7, weight: 1 },
770+
{ n: 8, weight: 10 }
771+
];
772+
773+
const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
774+
running++;
775+
await Async.sleepAsync(0);
776+
maxRunning = Math.max(maxRunning, running);
777+
running--;
778+
});
779+
780+
await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false });
781+
expect(fn).toHaveBeenCalledTimes(8);
782+
expect(maxRunning).toEqual(1);
783+
});
784+
785+
it('handles operation with mixed weights', async () => {
786+
const concurrency: number = 3;
787+
let running: number = 0;
788+
let maxRunning: number = 0;
789+
const taskToMaxConcurrency: Record<number, number> = {};
790+
791+
const array: INumberWithWeight[] = [
792+
{ n: 1, weight: 1 },
793+
{ n: 2, weight: 2 },
794+
{ n: 3, weight: concurrency },
795+
{ n: 4, weight: 1 },
796+
{ n: 5, weight: 1 }
797+
];
798+
799+
const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
800+
running++;
801+
taskToMaxConcurrency[item.n] = running;
802+
await Async.sleepAsync(0);
803+
maxRunning = Math.max(maxRunning, running);
804+
running--;
805+
});
806+
807+
await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false });
808+
expect(fn).toHaveBeenCalledTimes(5);
809+
expect(maxRunning).toEqual(2);
810+
811+
expect(taskToMaxConcurrency[1]).toEqual(1); // task 1
812+
expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2
813+
expect(taskToMaxConcurrency[3]).toEqual(1); // task 3
814+
expect(taskToMaxConcurrency[4]).toEqual(1); // task 4
815+
expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5
816+
});
817+
818+
it('allows operations with weight 0 to be picked up when system is at max concurrency', async () => {
819+
let running: number = 0;
820+
let maxRunning: number = 0;
821+
const taskToMaxConcurrency: Record<number, number> = {};
822+
823+
const array: INumberWithWeight[] = [
824+
{ n: 1, weight: 1 },
825+
{ n: 2, weight: 0 },
826+
{ n: 3, weight: 3 },
827+
{ n: 4, weight: 1 }
828+
];
829+
830+
const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
831+
running++;
832+
taskToMaxConcurrency[item.n] = running;
833+
maxRunning = Math.max(maxRunning, running);
834+
await Async.sleepAsync(0);
835+
running--;
836+
});
837+
838+
await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false });
839+
840+
expect(fn).toHaveBeenCalledTimes(4);
841+
expect(maxRunning).toEqual(2);
842+
843+
expect(taskToMaxConcurrency[1]).toEqual(1); // task 1
844+
expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2
845+
expect(taskToMaxConcurrency[3]).toEqual(2); // task 2 + 3
846+
expect(taskToMaxConcurrency[4]).toEqual(1); // task 4
847+
});
848+
});
691849
});
692850
});
693851

libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@
7777
*/
7878
"enableParallelism": false,
7979

80+
/**
81+
* Controls whether weighted operations can start when the total weight would exceed the limit
82+
* but is currently below the limit. This setting only applies when "enableParallelism" is true
83+
* and operations have a "weight" property configured in their rush-project.json "operationSettings".
84+
* Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit.
85+
*/
86+
"allowOversubscription": false,
87+
8088
/**
8189
* Normally projects will be processed according to their dependency order: a given project will not start
8290
* processing the command until all of its dependencies have completed. This restriction doesn't apply for

libraries/rush-lib/src/api/CommandLineJson.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export interface IBaseCommandJson {
2323
export interface IBulkCommandJson extends IBaseCommandJson {
2424
commandKind: 'bulk';
2525
enableParallelism: boolean;
26+
allowOversubscription?: boolean;
2627
ignoreDependencyOrder?: boolean;
2728
ignoreMissingScript?: boolean;
2829
incremental?: boolean;
@@ -38,6 +39,7 @@ export interface IBulkCommandJson extends IBaseCommandJson {
3839
export interface IPhasedCommandWithoutPhasesJson extends IBaseCommandJson {
3940
commandKind: 'phased';
4041
enableParallelism: boolean;
42+
allowOversubscription?: boolean;
4143
incremental?: boolean;
4244
}
4345

libraries/rush-lib/src/cli/RushCommandLineParser.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@ export class RushCommandLineParser extends CommandLineParser {
467467
incremental: command.incremental || false,
468468
disableBuildCache: command.disableBuildCache || false,
469469

470+
// The Async.forEachAsync() API defaults allowOversubscription=false, whereas Rush historically
471+
// defaults allowOversubscription=true to favor faster builds rather than strictly staying below
472+
// the CPU limit.
473+
allowOversubscription: command.allowOversubscription ?? true,
474+
470475
initialPhases: command.phases,
471476
originalPhases: command.originalPhases,
472477
watchPhases: command.watchPhases,

0 commit comments

Comments
 (0)