-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathconcurrency.ts
More file actions
280 lines (244 loc) · 8.03 KB
/
Copy pathconcurrency.ts
File metadata and controls
280 lines (244 loc) · 8.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
import type {
ConcurrencyCheckResult,
ConcurrencyGroupConfig,
ConcurrencyState,
FairQueueKeyProducer,
QueueDescriptor,
} from "./types.js";
export interface ConcurrencyManagerOptions {
redis: RedisOptions;
keys: FairQueueKeyProducer;
groups: ConcurrencyGroupConfig[];
}
/**
* ConcurrencyManager handles multi-level concurrency tracking and limiting.
*
* Features:
* - Multiple concurrent concurrency groups (tenant, org, project, etc.)
* - Atomic reserve/release operations using Lua scripts
* - Efficient batch checking of all groups
*/
export class ConcurrencyManager {
private redis: Redis;
private keys: FairQueueKeyProducer;
private groups: ConcurrencyGroupConfig[];
private groupsByName: Map<string, ConcurrencyGroupConfig>;
constructor(private options: ConcurrencyManagerOptions) {
this.redis = createRedisClient(options.redis);
this.keys = options.keys;
this.groups = options.groups;
this.groupsByName = new Map(options.groups.map((g) => [g.name, g]));
this.#registerCommands();
}
// ============================================================================
// Public Methods
// ============================================================================
/**
* Check if a message can be processed given all concurrency constraints.
* Checks all configured groups and returns the first one at capacity.
*/
async canProcess(queue: QueueDescriptor): Promise<ConcurrencyCheckResult> {
for (const group of this.groups) {
const groupId = group.extractGroupId(queue);
const isAtCapacity = await this.isAtCapacity(group.name, groupId);
if (isAtCapacity) {
const state = await this.getState(group.name, groupId);
return {
allowed: false,
blockedBy: state,
};
}
}
return { allowed: true };
}
/**
* Reserve concurrency slots for a message across all groups.
* Atomic - either all groups are reserved or none.
*
* @returns true if reservation successful, false if any group is at capacity
*/
async reserve(queue: QueueDescriptor, messageId: string): Promise<boolean> {
// Build list of group keys and limits
const groupData = await Promise.all(
this.groups.map(async (group) => {
const groupId = group.extractGroupId(queue);
const limit = await group.getLimit(groupId);
return {
key: this.keys.concurrencyKey(group.name, groupId),
limit: limit || group.defaultLimit,
};
})
);
// Use Lua script for atomic multi-group reservation
// Pass keys as KEYS array so ioredis applies keyPrefix correctly
const keys = groupData.map((g) => g.key);
const limits = groupData.map((g) => g.limit.toString());
// Args order: messageId, ...limits (keys are passed separately)
const result = await this.redis.reserveConcurrency(keys.length, keys, messageId, ...limits);
return result === 1;
}
/**
* Release concurrency slots for a message across all groups.
*/
async release(queue: QueueDescriptor, messageId: string): Promise<void> {
const pipeline = this.redis.pipeline();
for (const group of this.groups) {
const groupId = group.extractGroupId(queue);
const key = this.keys.concurrencyKey(group.name, groupId);
pipeline.srem(key, messageId);
}
await pipeline.exec();
}
/**
* Get current concurrency for a specific group.
*/
async getCurrentConcurrency(groupName: string, groupId: string): Promise<number> {
const key = this.keys.concurrencyKey(groupName, groupId);
return await this.redis.scard(key);
}
/**
* Get available capacity for a queue across all concurrency groups.
* Returns the minimum available capacity across all groups.
*/
async getAvailableCapacity(queue: QueueDescriptor): Promise<number> {
if (this.groups.length === 0) {
return 0;
}
// Build group data for parallel fetching
const groupData = this.groups.map((group) => ({
group,
groupId: group.extractGroupId(queue),
}));
// Fetch all current counts and limits in parallel
const [currents, limits] = await Promise.all([
Promise.all(
groupData.map(({ group, groupId }) =>
this.redis.scard(this.keys.concurrencyKey(group.name, groupId))
)
),
Promise.all(
groupData.map(({ group, groupId }) =>
group.getLimit(groupId).then((limit) => limit || group.defaultLimit)
)
),
]);
// Calculate minimum available capacity across all groups
let minCapacity = Infinity;
for (let i = 0; i < groupData.length; i++) {
const available = Math.max(0, limits[i]! - currents[i]!);
minCapacity = Math.min(minCapacity, available);
}
return minCapacity === Infinity ? 0 : minCapacity;
}
/**
* Get concurrency limit for a specific group.
*/
async getConcurrencyLimit(groupName: string, groupId: string): Promise<number> {
const group = this.groupsByName.get(groupName);
if (!group) {
throw new Error(`Unknown concurrency group: ${groupName}`);
}
return (await group.getLimit(groupId)) || group.defaultLimit;
}
/**
* Check if a group is at capacity.
*/
async isAtCapacity(groupName: string, groupId: string): Promise<boolean> {
const [current, limit] = await Promise.all([
this.getCurrentConcurrency(groupName, groupId),
this.getConcurrencyLimit(groupName, groupId),
]);
return current >= limit;
}
/**
* Get full state for a group.
*/
async getState(groupName: string, groupId: string): Promise<ConcurrencyState> {
const [current, limit] = await Promise.all([
this.getCurrentConcurrency(groupName, groupId),
this.getConcurrencyLimit(groupName, groupId),
]);
return {
groupName,
groupId,
current,
limit,
};
}
/**
* Get all active message IDs for a group.
*/
async getActiveMessages(groupName: string, groupId: string): Promise<string[]> {
const key = this.keys.concurrencyKey(groupName, groupId);
return await this.redis.smembers(key);
}
/**
* Force-clear concurrency for a group (use with caution).
* Useful for cleanup after crashes.
*/
async clearGroup(groupName: string, groupId: string): Promise<void> {
const key = this.keys.concurrencyKey(groupName, groupId);
await this.redis.del(key);
}
/**
* Remove a specific message from concurrency tracking.
* Useful for cleanup.
*/
async removeMessage(messageId: string, queue: QueueDescriptor): Promise<void> {
await this.release(queue, messageId);
}
/**
* Get configured group names.
*/
getGroupNames(): string[] {
return this.groups.map((g) => g.name);
}
/**
* Close the Redis connection.
*/
async close(): Promise<void> {
await this.redis.quit();
}
// ============================================================================
// Private Methods
// ============================================================================
#registerCommands(): void {
// Atomic multi-group reservation
// KEYS: concurrency set keys for each group (keyPrefix is applied by ioredis)
// ARGV[1]: messageId
// ARGV[2..n]: limits for each group (in same order as KEYS)
this.redis.defineCommand("reserveConcurrency", {
lua: `
local numGroups = #KEYS
local messageId = ARGV[1]
-- Check all groups first
for i = 1, numGroups do
local key = KEYS[i]
local limit = tonumber(ARGV[1 + i]) -- Limits start at ARGV[2]
local current = redis.call('SCARD', key)
if current >= limit then
return 0 -- At capacity
end
end
-- All groups have capacity, add message to all
for i = 1, numGroups do
local key = KEYS[i]
redis.call('SADD', key, messageId)
end
return 1
`,
});
}
}
// Extend Redis interface for custom commands
declare module "@internal/redis" {
interface RedisCommander<Context> {
reserveConcurrency(
numKeys: number,
keys: string[],
messageId: string,
...limits: string[]
): Promise<number>;
}
}