Skip to content

Commit d9cf2be

Browse files
committed
Fix onSizeLessThan() resolving late when the queue drains without a completion
Fixes #249
1 parent cfa917a commit d9cf2be

2 files changed

Lines changed: 138 additions & 5 deletions

File tree

source/index.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -666,12 +666,15 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
666666
Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
667667
*/
668668
async onSizeLessThan(limit: number): Promise<void> {
669-
// Instantly resolve if the queue is empty.
669+
// Instantly resolve if the size is already below the limit.
670670
if (this.#queue.size < limit) {
671671
return;
672672
}
673673

674-
await this.#onEvent('next', () => this.#queue.size < limit);
674+
// Listen on both `'next'` (task completion, queued abort, `clear()`) and `'active'` (every dequeue),
675+
// so waiters wake even when the queue drains without a completion (`start()`, a runtime `concurrency`
676+
// increase, or an interval tick).
677+
await this.#onEvent(['next', 'active'], () => this.#queue.size < limit);
675678
}
676679

677680
/**
@@ -764,18 +767,25 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
764767
});
765768
}
766769

767-
async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
770+
async #onEvent(events: EventName | EventName[], filter?: () => boolean): Promise<void> {
771+
const eventList = Array.isArray(events) ? events : [events];
772+
768773
return new Promise(resolve => {
769774
const listener = () => {
770775
if (filter && !filter()) {
771776
return;
772777
}
773778

774-
this.off(event, listener);
779+
for (const event of eventList) {
780+
this.off(event, listener);
781+
}
782+
775783
resolve();
776784
};
777785

778-
this.on(event, listener);
786+
for (const event of eventList) {
787+
this.on(event, listener);
788+
}
779789
});
780790
}
781791

test/basic.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,129 @@ test('.onSizeLessThan() resolves after clear()', async () => {
316316
assert.equal(queue.pending, 0);
317317
});
318318

319+
test('.onSizeLessThan() resolves when start() drains the queue', async () => {
320+
const queue = new PQueue({concurrency: 3, autoStart: false});
321+
322+
for (let index = 0; index < 5; index++) {
323+
queue.add(async () => delay(1000));
324+
}
325+
326+
assert.equal(queue.size, 5);
327+
328+
const sizePromise = queue.onSizeLessThan(3);
329+
330+
queue.start();
331+
332+
await Promise.race([
333+
sizePromise,
334+
(async () => {
335+
await delay(100);
336+
throw new Error('start() draining the queue should resolve onSizeLessThan() waiters');
337+
})(),
338+
]);
339+
340+
// Resolved on the synchronous drain, not on a task completion.
341+
assert.equal(queue.size, 2);
342+
assert.equal(queue.pending, 3);
343+
});
344+
345+
test('.onSizeLessThan() resolves when concurrency is raised at runtime', async () => {
346+
const queue = new PQueue({concurrency: 1});
347+
348+
for (let index = 0; index < 5; index++) {
349+
queue.add(async () => delay(1000));
350+
}
351+
352+
assert.equal(queue.size, 4);
353+
assert.equal(queue.pending, 1);
354+
355+
const sizePromise = queue.onSizeLessThan(2);
356+
357+
queue.concurrency = 4;
358+
359+
await Promise.race([
360+
sizePromise,
361+
(async () => {
362+
await delay(100);
363+
throw new Error('raising concurrency should resolve onSizeLessThan() waiters');
364+
})(),
365+
]);
366+
367+
assert.equal(queue.size, 1);
368+
assert.equal(queue.pending, 4);
369+
});
370+
371+
test('.onSizeLessThan() resolves when a rate-limit interval tick drains the queue', async () => {
372+
// Under rate limiting, tasks are dequeued on interval ticks rather than on completions.
373+
const queue = new PQueue({intervalCap: 2, interval: 100});
374+
375+
for (let index = 0; index < 5; index++) {
376+
queue.add(async () => delay(1000));
377+
}
378+
379+
// The first interval immediately dequeues up to `intervalCap`.
380+
assert.equal(queue.size, 3);
381+
assert.equal(queue.pending, 2);
382+
383+
const time = timeSpan();
384+
385+
// Needs a second interval tick (not a completion) to drop below the limit.
386+
await queue.onSizeLessThan(3);
387+
388+
// Resolved via the interval tick (~100ms), long before the 1000ms tasks could complete,
389+
// so all four dequeued tasks are still running (none completed to drive the resolution).
390+
assert.ok(time() < 500, `Expected to resolve on the interval tick, took ${time()}ms`);
391+
assert.equal(queue.size, 1);
392+
assert.equal(queue.pending, 4);
393+
});
394+
395+
test('.onSizeLessThan() resolves multiple waiters with different limits on a single drain', async () => {
396+
const queue = new PQueue({concurrency: 4, autoStart: false});
397+
398+
for (let index = 0; index < 6; index++) {
399+
queue.add(async () => delay(1000));
400+
}
401+
402+
assert.equal(queue.size, 6);
403+
404+
const waiters = Promise.all([
405+
queue.onSizeLessThan(5),
406+
queue.onSizeLessThan(4),
407+
queue.onSizeLessThan(3),
408+
]);
409+
410+
queue.start();
411+
412+
await Promise.race([
413+
waiters,
414+
(async () => {
415+
await delay(100);
416+
throw new Error('all waiters should resolve on the synchronous drain');
417+
})(),
418+
]);
419+
420+
// Draining to size 2 satisfies every limit above, without any completion.
421+
assert.equal(queue.size, 2);
422+
assert.equal(queue.pending, 4);
423+
});
424+
425+
test('.onSizeLessThan() removes its listeners once resolved', async () => {
426+
const queue = new PQueue({concurrency: 3, autoStart: false});
427+
428+
for (let index = 0; index < 5; index++) {
429+
queue.add(async () => delay(1000));
430+
}
431+
432+
const sizePromise = queue.onSizeLessThan(3);
433+
434+
queue.start();
435+
await sizePromise;
436+
437+
// Both the `'next'` and `'active'` listeners must be cleaned up to avoid leaking on repeated calls.
438+
assert.equal(queue.listenerCount('next'), 0);
439+
assert.equal(queue.listenerCount('active'), 0);
440+
});
441+
319442
test('.onIdle() - no pending', async () => {
320443
const queue = new PQueue();
321444
assert.equal(queue.size, 0);

0 commit comments

Comments
 (0)