-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathenvPriorityDequeuingStrategy.server.ts
More file actions
95 lines (74 loc) · 2.84 KB
/
envPriorityDequeuingStrategy.server.ts
File metadata and controls
95 lines (74 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import { EnvQueues, MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types";
export type EnvPriorityDequeuingStrategyOptions = {
keys: MarQSKeyProducer;
delegate: MarQSFairDequeueStrategy;
};
export class EnvPriorityDequeuingStrategy implements MarQSFairDequeueStrategy {
private _delegate: MarQSFairDequeueStrategy;
constructor(private options: EnvPriorityDequeuingStrategyOptions) {
this._delegate = options.delegate;
}
async distributeFairQueuesFromParentQueue(
parentQueue: string,
consumerId: string
): Promise<Array<EnvQueues>> {
const envQueues = await this._delegate.distributeFairQueuesFromParentQueue(
parentQueue,
consumerId
);
return this.#sortQueuesInEnvironmentsByPriority(envQueues);
}
#sortQueuesInEnvironmentsByPriority(envs: EnvQueues[]): EnvQueues[] {
return envs.map((env) => {
return this.#sortQueuesInEnvironmentByPriority(env);
});
}
// Sorts the queues by priority. A higher priority means the queue should be dequeued first.
// All the queues with the same priority should keep the order they were in the original list.
// So that means if all the queues have the same priority, the order should be preserved.
#sortQueuesInEnvironmentByPriority(env: EnvQueues): EnvQueues {
const queues = env.queues;
// Group queues by their base name (without priority)
const queueGroups = new Map<string, string[]>();
queues.forEach((queue) => {
const descriptor = this.options.keys.queueDescriptorFromQueue(queue);
const baseQueueName = this.options.keys.queueKey(
descriptor.organization,
descriptor.environment,
descriptor.name,
descriptor.concurrencyKey
);
if (!queueGroups.has(baseQueueName)) {
queueGroups.set(baseQueueName, []);
}
queueGroups.get(baseQueueName)!.push(queue);
});
// For each group, keep only the highest priority queue
const resultQueues: string[] = [];
queueGroups.forEach((groupQueues) => {
const sortedGroupQueues = [...groupQueues].sort((a, b) => {
const aPriority = this.#getQueuePriority(a);
const bPriority = this.#getQueuePriority(b);
if (aPriority === bPriority) {
return 0;
}
return bPriority - aPriority;
});
resultQueues.push(sortedGroupQueues[0]);
});
// Sort the final result by priority
const sortedQueues = resultQueues.sort((a, b) => {
const aPriority = this.#getQueuePriority(a);
const bPriority = this.#getQueuePriority(b);
if (aPriority === bPriority) {
return 0;
}
return bPriority - aPriority;
});
return { envId: env.envId, queues: sortedQueues };
}
#getQueuePriority(queue: string): number {
const queueRecord = this.options.keys.queueDescriptorFromQueue(queue);
return queueRecord.priority ?? 0;
}
}