-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathrabbitmq.js
More file actions
71 lines (65 loc) · 2.24 KB
/
Copy pathrabbitmq.js
File metadata and controls
71 lines (65 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import amqp from 'amqplib';
import { isTransientError } from './errors.js';
import { retryWithBackoff } from './retry.js';
export async function connectRabbitMq(url, options = {}) {
return retryWithBackoff(() => amqp.connect(url, options.connectOptions), {
label: options.label || 'rabbitmq connect',
maxAttempts: options.maxAttempts || 5,
baseDelayMs: options.baseDelayMs || 250,
maxDelayMs: options.maxDelayMs || 10000,
shouldRetry: isTransientError
});
}
export async function createConfirmChannel(connection, options = {}) {
const channel = await connection.createConfirmChannel();
if (options.prefetch) {
await channel.prefetch(options.prefetch);
}
return channel;
}
export function publishJson(channel, exchange, routingKey, message, options = {}) {
const body = Buffer.from(JSON.stringify(message));
const publishOptions = {
contentType: 'application/json',
deliveryMode: 2,
persistent: true,
timestamp: Math.floor(Date.now() / 1000),
messageId: message.idempotencyKey || options.messageId,
headers: {
idempotencyKey: message.idempotencyKey,
schemaVersion: message.schemaVersion,
...(options.headers || {})
}
};
return retryWithBackoff(() => new Promise((resolve, reject) => {
channel.publish(exchange, routingKey, body, publishOptions, (error) => {
if (error) reject(error);
else resolve(true);
});
}), {
label: options.label || `rabbitmq publish ${exchange}:${routingKey}`,
maxAttempts: options.maxAttempts || 5,
baseDelayMs: options.baseDelayMs || 250,
maxDelayMs: options.maxDelayMs || 5000,
shouldRetry: isTransientError
});
}
export function consumeJson(channel, queue, handler, options = {}) {
return channel.consume(queue, async (message) => {
if (!message) return;
try {
const payload = JSON.parse(message.content.toString('utf8'));
await handler(payload, message);
channel.ack(message);
} catch (error) {
const retryable = options.requeueTransient === true && isTransientError(error);
channel.nack(message, false, retryable);
if (options.onError) {
options.onError(error, message, { retryable });
}
}
}, {
noAck: false,
consumerTag: options.consumerTag
});
}