Skip to content

Commit ea65ce3

Browse files
committed
Tweaks
1 parent 87077d0 commit ea65ce3

3 files changed

Lines changed: 67 additions & 10 deletions

File tree

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@
4747
"bluebird"
4848
],
4949
"dependencies": {
50-
"eventemitter3": "^5.0.1",
50+
"eventemitter3": "^5.0.4",
5151
"p-timeout": "^7.0.0"
5252
},
5353
"devDependencies": {
5454
"@sindresorhus/tsconfig": "^8.0.1",
5555
"@types/benchmark": "^2.1.5",
56-
"@types/node": "^24.5.1",
56+
"@types/node": "^25.6.0",
5757
"benchmark": "^2.1.4",
5858
"del-cli": "^6.0.0",
5959
"delay": "^6.0.0",

source/priority-queue.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export type PriorityQueueOptions = {
1111
export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOptions> {
1212
readonly #queue: Array<PriorityQueueOptions & {run: RunFunction}> = [];
1313

14-
// Index of the next item to dequeue. Old items are compacted lazily so dequeue stays O(1).
14+
// The queue is stored as a sorted array, but dequeued items are left before `#head` until compaction. Only items from `#head` onward are live, which keeps repeated dequeues amortized O(1).
1515
#head = 0;
1616

1717
enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void {
@@ -28,23 +28,27 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
2828
};
2929

3030
if (size === 0) {
31+
// When the queue is logically empty, discard any consumed prefix before accepting new work.
3132
this.#queue.length = 0;
3233
this.#head = 0;
3334
this.#queue.push(element);
3435
return;
3536
}
3637

3738
if (this.#queue.at(-1)!.priority! >= priority) {
39+
// Same-priority and lower-priority items belong after the current tail. Appending preserves FIFO order for equal priorities.
3840
this.#queue.push(element);
3941
return;
4042
}
4143

44+
// Binary insertion must run on the live sorted range only.
4245
this.#compact();
4346
const index = lowerBound(this.#queue, element, (a: Readonly<PriorityQueueOptions>, b: Readonly<PriorityQueueOptions>) => b.priority! - a.priority!);
4447
this.#queue.splice(index, 0, element);
4548
}
4649

4750
setPriority(id: string, priority: number) {
51+
// A dequeued item with the same id is no longer part of the queue.
4852
const index = this.#queue.findIndex((element: Readonly<PriorityQueueOptions>, index) => index >= this.#head && element.id === id);
4953
if (index === -1) {
5054
throw new ReferenceError(`No promise function with the id "${id}" exists in the queue.`);
@@ -58,6 +62,7 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
5862
remove(run: RunFunction): void;
5963
remove(idOrRun: string | RunFunction): void {
6064
const index = this.#queue.findIndex((element: Readonly<PriorityQueueOptions & {run: RunFunction}>, index) => {
65+
// The consumed prefix may still contain references that should not be removable.
6166
if (index < this.#head) {
6267
return false;
6368
}
@@ -83,9 +88,11 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
8388
this.#head++;
8489

8590
if (this.#head === this.#queue.length) {
91+
// Fully drained queues are reset immediately so the next enqueue starts from a clean array.
8692
this.#queue.length = 0;
8793
this.#head = 0;
8894
} else if (this.#head > compactionThreshold && this.#head > this.#queue.length / 2) {
95+
// Keep repeated dequeues cheap, but stop the consumed prefix from growing without bound.
8996
this.#compact();
9097
}
9198

@@ -110,6 +117,7 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
110117
}
111118

112119
#compact(): void {
120+
// Compaction restores the invariant that the whole array is the live sorted range.
113121
if (this.#head === 0) {
114122
return;
115123
}

test/priority-queue.ts

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,28 @@ test('PriorityQueue inserts high-priority items correctly after partial dequeue'
4747
assert.equal(queue.dequeue(), undefined);
4848
});
4949

50+
test('PriorityQueue appends same-priority and lower-priority items correctly after partial dequeue', () => {
51+
const queue = new PriorityQueue();
52+
const first = createRun('first');
53+
const second = createRun('second');
54+
const samePriority = createRun('samePriority');
55+
const lowerPriority = createRun('lowerPriority');
56+
57+
queue.enqueue(first, {priority: 0});
58+
queue.enqueue(second, {priority: 0});
59+
60+
assert.equal(queue.dequeue(), first);
61+
62+
queue.enqueue(samePriority, {priority: 0});
63+
queue.enqueue(lowerPriority, {priority: -1});
64+
65+
assert.equal(queue.size, 3);
66+
assert.equal(queue.dequeue(), second);
67+
assert.equal(queue.dequeue(), samePriority);
68+
assert.equal(queue.dequeue(), lowerPriority);
69+
assert.equal(queue.dequeue(), undefined);
70+
});
71+
5072
test('PriorityQueue removes the live duplicate id after partial dequeue', () => {
5173
const queue = new PriorityQueue();
5274
const first = createRun('first');
@@ -85,30 +107,57 @@ test('PriorityQueue removes by run after partial dequeue', () => {
85107
assert.equal(queue.dequeue(), undefined);
86108
});
87109

110+
test('PriorityQueue ignores consumed items with no live match', () => {
111+
const queue = new PriorityQueue();
112+
const first = createRun('first');
113+
const second = createRun('second');
114+
115+
queue.enqueue(first, {id: 'first'});
116+
queue.enqueue(second, {id: 'second'});
117+
118+
assert.equal(queue.dequeue(), first);
119+
120+
assert.throws(
121+
() => {
122+
queue.setPriority('first', 1);
123+
},
124+
{
125+
name: 'ReferenceError',
126+
message: 'No promise function with the id "first" exists in the queue.',
127+
},
128+
);
129+
130+
queue.remove(first);
131+
132+
assert.equal(queue.size, 1);
133+
assert.equal(queue.dequeue(), second);
134+
assert.equal(queue.dequeue(), undefined);
135+
});
136+
88137
test('PriorityQueue stays ordered after cursor compaction', () => {
89138
const queue = new PriorityQueue();
90139
const urgent = createRun('urgent');
140+
const tasks = Array.from({length: 150}, (_element, index) => createRun(`task-${index}`));
91141

92-
for (let index = 0; index < 150; index++) {
93-
queue.enqueue(createRun(`task-${index}`));
142+
for (const task of tasks) {
143+
queue.enqueue(task);
94144
}
95145

96146
// Cross the compaction threshold while leaving queued items behind.
97147
for (let index = 0; index < 120; index++) {
98-
assert.notEqual(queue.dequeue(), undefined);
148+
assert.equal(queue.dequeue(), tasks[index]);
99149
}
100150

101151
queue.enqueue(urgent, {priority: 1});
102152

103153
assert.equal(queue.size, 31);
104154
assert.equal(queue.dequeue(), urgent);
105155

106-
let remaining = 0;
107-
while (queue.dequeue() !== undefined) {
108-
remaining++;
156+
for (let index = 120; index < tasks.length; index++) {
157+
assert.equal(queue.dequeue(), tasks[index]);
109158
}
110159

111-
assert.equal(remaining, 30);
160+
assert.equal(queue.dequeue(), undefined);
112161
assert.equal(queue.size, 0);
113162
});
114163

0 commit comments

Comments
 (0)