Skip to content

Commit 9b8109c

Browse files
cobuilds: yield priority to operations that haven't been tried yet
Signed-off-by: Aramis Sennyey <aramissennyeydd@users.noreply.github.com>
1 parent e5c659c commit 9b8109c

5 files changed

Lines changed: 44 additions & 46 deletions

File tree

libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ export class AsyncOperationQueue
2222
private readonly _totalOperations: number;
2323
private readonly _completedOperations: Set<OperationExecutionRecord>;
2424

25+
// Tracks how many times each operation has been assigned to an execution slot.
26+
// Operations that have been assigned more times (e.g. cobuild retries) are sorted
27+
// after operations with fewer attempts, so untried work is preferred.
28+
private readonly _timesQueued: Map<OperationExecutionRecord, number>;
29+
2530
private _isDone: boolean;
2631

2732
/**
@@ -37,6 +42,7 @@ export class AsyncOperationQueue
3742
this._totalOperations = this._queue.length;
3843
this._isDone = false;
3944
this._completedOperations = new Set<OperationExecutionRecord>();
45+
this._timesQueued = new Map();
4046
}
4147

4248
/**
@@ -63,6 +69,7 @@ export class AsyncOperationQueue
6369
*/
6470
public complete(record: OperationExecutionRecord): void {
6571
this._completedOperations.add(record);
72+
this._timesQueued.delete(record);
6673

6774
// Apply status changes to direct dependents
6875
if (record.status !== OperationStatus.Failure && record.status !== OperationStatus.Blocked) {
@@ -86,25 +93,14 @@ export class AsyncOperationQueue
8693
}
8794
}
8895

89-
/**
90-
* Moves an operation to the back of the queue (lowest priority).
91-
* Used when a cobuild lock acquisition fails, so that locally-executable operations are
92-
* assigned before this operation is retried.
93-
*/
94-
public yieldPriority(record: OperationExecutionRecord): void {
95-
const index: number = this._queue.indexOf(record);
96-
if (index > 0) {
97-
this._queue.splice(index, 1);
98-
this._queue.unshift(record);
99-
}
100-
}
101-
10296
/**
10397
* Routes ready operations with 0 dependencies to waiting iterators. Normally invoked as part of `next()`, but
10498
* if the caller does not update operation dependencies prior to calling `next()`, may need to be invoked manually.
10599
*/
106100
public assignOperations(): void {
107-
const { _queue: queue, _pendingIterators: waitingIterators } = this;
101+
const { _queue: queue, _pendingIterators: waitingIterators, _timesQueued: timesQueued } = this;
102+
103+
const readyOperations: OperationExecutionRecord[] = [];
108104

109105
// By iterating in reverse order we do less array shuffling when removing operations
110106
for (let i: number = queue.length - 1; waitingIterators.length > 0 && i >= 0; i--) {
@@ -122,6 +118,7 @@ export class AsyncOperationQueue
122118
) {
123119
// It shouldn't be on the queue, remove it
124120
queue.splice(i, 1);
121+
timesQueued.delete(record);
125122
} else if (record.status === OperationStatus.Queued || record.status === OperationStatus.Executing) {
126123
// This operation is currently executing
127124
// next one plz :)
@@ -133,17 +130,32 @@ export class AsyncOperationQueue
133130
// Sanity check
134131
throw new Error(`Unexpected status "${record.status}" for queued operation: ${record.name}`);
135132
} else {
136-
// This task is ready to process, hand it to the iterator.
137-
// Needs to have queue semantics, otherwise tools that iterate it get confused
138-
record.status = OperationStatus.Queued;
139-
waitingIterators.shift()!({
140-
value: record,
141-
done: false
142-
});
133+
readyOperations.push(record);
143134
}
144135
// Otherwise operation is still waiting
145136
}
146137

138+
if (readyOperations.length > 1) {
139+
// Sort by times queued ascending. Operations that have never been queued (0)
140+
// come first, then operations with fewer attempts. This ensures cobuild retries
141+
// (queued 1+ times, returned to Ready) are tried after untried operations.
142+
readyOperations.sort((a, b) => (timesQueued.get(a) ?? 0) - (timesQueued.get(b) ?? 0));
143+
}
144+
145+
for (const record of readyOperations) {
146+
if (waitingIterators.length === 0) {
147+
break;
148+
}
149+
// This task is ready to process, hand it to the iterator.
150+
// Needs to have queue semantics, otherwise tools that iterate it get confused
151+
timesQueued.set(record, (timesQueued.get(record) ?? 0) + 1);
152+
record.status = OperationStatus.Queued;
153+
waitingIterators.shift()!({
154+
value: record,
155+
done: false
156+
});
157+
}
158+
147159
// Since items only get removed from the queue when they have a final status, this should be safe.
148160
if (queue.length === 0) {
149161
this._isDone = true;

libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,6 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
411411
periodicCallback.start();
412412
} else {
413413
setTimeout(() => {
414-
record.yieldPriority();
415414
record.status = OperationStatus.Ready;
416415
}, 500);
417416
return OperationStatus.Executing;

libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,6 @@ export class OperationExecutionManager {
148148
const executionRecordContext: IOperationExecutionRecordContext = {
149149
streamCollator: this._streamCollator,
150150
onOperationStatusChanged: this._onOperationStatusChanged,
151-
yieldPriority: (record: OperationExecutionRecord) => {
152-
this._executionQueue.yieldPriority(record);
153-
},
154151
createEnvironment: this._createEnvironmentForOperation,
155152
inputsSnapshot,
156153
debugMode,

libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import {
4242
export interface IOperationExecutionRecordContext {
4343
streamCollator: StreamCollator;
4444
onOperationStatusChanged?: (record: OperationExecutionRecord) => void;
45-
yieldPriority?: (record: OperationExecutionRecord) => void;
4645
createEnvironment?: (record: OperationExecutionRecord) => IEnvironment;
4746
inputsSnapshot: IInputsSnapshot | undefined;
4847

@@ -235,14 +234,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext, IOpera
235234
return !this.operation.enabled || this.runner.silent;
236235
}
237236

238-
/**
239-
* Moves this operation to the back of the execution queue so that other operations
240-
* are assigned first.
241-
*/
242-
public yieldPriority(): void {
243-
this._context.yieldPriority?.(this);
244-
}
245-
246237
public getStateHash(): string {
247238
if (this._stateHash === undefined) {
248239
const components: readonly string[] = this.getStateHashComponents();

libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,11 @@ describe(AsyncOperationQueue.name, () => {
169169
expect(result.done).toEqual(true);
170170
});
171171

172-
it('yieldPriority moves an operation to the back of the queue', async () => {
172+
it('sorts cobuild retries after untried operations', async () => {
173173
// Three independent operations: A, B, C (all Ready).
174-
// A is assigned first. Calling yieldPriority + setting Ready should move it to the back,
175-
// so B and C are assigned before A on the next pass.
174+
// A is assigned first, then returns to Ready (cobuild lock failed).
175+
// On the next pass, B and C should be assigned before A because A
176+
// has a recent lastAssignedAt timestamp.
176177
const opA = createRecord('a');
177178
const opB = createRecord('b');
178179
const opC = createRecord('c');
@@ -183,11 +184,10 @@ describe(AsyncOperationQueue.name, () => {
183184
const r1: IteratorResult<OperationExecutionRecord> = await queue.next();
184185
const firstAssigned: OperationExecutionRecord = r1.value;
185186

186-
// Simulate cobuild retry: yield priority then re-ready the operation
187-
queue.yieldPriority(firstAssigned);
187+
// Simulate cobuild retry: operation returns to Ready
188188
firstAssigned.status = OperationStatus.Ready;
189189

190-
// Now assign the remaining — untried operations should come before the retry
190+
// Assign all three — untried operations should come before the retry
191191
const results: OperationExecutionRecord[] = [];
192192
const r2: IteratorResult<OperationExecutionRecord> = await queue.next();
193193
results.push(r2.value);
@@ -196,13 +196,13 @@ describe(AsyncOperationQueue.name, () => {
196196
const r4: IteratorResult<OperationExecutionRecord> = await queue.next();
197197
results.push(r4.value);
198198

199-
// The yielded operation should be last
199+
// The cobuild retry should be last
200200
expect(results[2]).toBe(firstAssigned);
201201
});
202202

203-
it('yieldPriority assigns freshly unblocked operations first', async () => {
203+
it('assigns freshly unblocked operations before cobuild retries', async () => {
204204
// A (no deps), B (depends on C), C (no deps)
205-
// A is assigned and yields priority (cobuild retry).
205+
// A is assigned and returns to Ready (cobuild retry).
206206
// C completes, unblocking B. B should be assigned before A.
207207
const opA = createRecord('a');
208208
const opB = createRecord('b');
@@ -217,15 +217,14 @@ describe(AsyncOperationQueue.name, () => {
217217
const r2: IteratorResult<OperationExecutionRecord> = await queue.next();
218218
expect(new Set([r1.value, r2.value])).toEqual(new Set([opA, opC]));
219219

220-
// Simulate: A fails cobuild lock
221-
queue.yieldPriority(opA);
220+
// Simulate: A fails cobuild lock and returns to Ready
222221
opA.status = OperationStatus.Ready;
223222

224223
// C succeeds, which unblocks B
225224
opC.status = OperationStatus.Success;
226225
queue.complete(opC);
227226

228-
// B is freshly unblocked, A yielded priority — B should be assigned first
227+
// B is freshly unblocked (never assigned), A is a cobuild retry — B should be first
229228
const r3: IteratorResult<OperationExecutionRecord> = await queue.next();
230229
expect(r3.value).toBe(opB);
231230

0 commit comments

Comments
 (0)