Skip to content

Commit 7e62644

Browse files
committed
Work related to ##232
1 parent 0a58d1c commit 7e62644

8 files changed

Lines changed: 2672 additions & 2512 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Change Log
22

3+
## 19.0.0
4+
- I am not aware of any breaking changes in this release, but emitting error events asynchronously could have subtle side effects, hence the major release
5+
- Deprecate session 'cancelled' event in favour of 'cancel' (both will work)
6+
- Refactor reconnection and resubscription code
7+
- Emit errors asynchronously to prevent them being caught by the amqplib main accept loop
8+
- Fix bug which throw an exception in the error handler when a close event was emitted with no error argument
9+
310
## 18.0.1
411

512
- Removed console.log when the channel pool destroyed a channel

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ The reason Rascal nacks the message is because the alternatives are to leave the
130130

131131
## Very Important Section About Event Handling
132132

133-
[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this
133+
[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering.
134+
135+
There are four places where you need to register error handlers.
134136

135137
1. Immediately after obtaining a broker instance
136138

@@ -1358,7 +1360,7 @@ If the message has not been auto-acknowledged you should ackOrNack it. **If you
13581360
13591361
The RabbitMQ broker may [cancel](https://www.rabbitmq.com/consumer-cancel.html) the consumer if the queue is deleted or the node on which the queue is located fails. [amqplib](https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) handles this by delivering a `null` message. When Rascal receives the null message it will
13601362
1361-
1. Emit a `cancelled` event from the subscription.
1363+
1. Emit a `cancel` event from the subscription.
13621364
1. Emit an `error` event from the subscription if the `cancel` event was not handled
13631365
1. Optionally attempt to resubscribe as per normal retry configuration. If the queue was deleted rather than being failed over, the queue will not automatically be re-created and retry attempts will fail indefinitely.
13641366

lib/amqp/Publication.js

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,27 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
9595
session._removePausedListener();
9696
if (err) return session.emit('error', err, messageId);
9797
if (session.isAborted()) return abortPublish(channel, messageId);
98-
const errorHandler = _.once(handleChannelError.bind(null, channel, messageId, session, config));
98+
99+
const disconnectionHandler = makeDisconnectionHandler(channel, messageId, session, config);
99100
const returnHandler = session.emit.bind(session, 'return');
100-
addListeners(channel, errorHandler, returnHandler);
101+
addListeners(channel, disconnectionHandler, returnHandler);
102+
101103
try {
102104
session._startPublish();
105+
103106
publishFn(channel, buffer, publishConfig, (err, ok) => {
104107
session._endPublish();
105108
if (err) {
106-
destroyChannel(channel, errorHandler, returnHandler);
109+
destroyChannel(channel, disconnectionHandler, returnHandler);
107110
return session.emit('error', err, messageId);
108111
}
109112

110-
ok ? returnChannel(channel, errorHandler, returnHandler) : deferReturnChannel(channel, errorHandler, returnHandler);
113+
ok ? returnChannel(channel, disconnectionHandler, returnHandler) : deferReturnChannel(channel, disconnectionHandler, returnHandler);
111114

112115
session.emit('success', messageId);
113116
});
114117
} catch (err) {
115-
returnChannel(channel, errorHandler, returnHandler);
118+
returnChannel(channel, disconnectionHandler, returnHandler);
116119
return session.emit('error', err, messageId);
117120
}
118121
});
@@ -125,19 +128,19 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
125128
returnChannelFn(channel);
126129
}
127130

128-
function returnChannel(channel, errorHandler, returnHandler) {
129-
removeListeners(channel, errorHandler, returnHandler);
131+
function returnChannel(channel, disconnectionHandler, returnHandler) {
132+
removeListeners(channel, disconnectionHandler, returnHandler);
130133
returnChannelFn(channel);
131134
}
132135

133-
function deferReturnChannel(channel, errorHandler, returnHandler) {
136+
function deferReturnChannel(channel, disconnectionHandler, returnHandler) {
134137
channel.once('drain', () => {
135-
returnChannel(channel, errorHandler, returnHandler);
138+
returnChannel(channel, disconnectionHandler, returnHandler);
136139
});
137140
}
138141

139-
function destroyChannel(channel, errorHandler, returnHandler) {
140-
removeListeners(channel, errorHandler, returnHandler);
142+
function destroyChannel(channel, disconnectionHandler, returnHandler) {
143+
removeListeners(channel, disconnectionHandler, returnHandler);
141144
destroyChannelFn(channel);
142145
}
143146

@@ -163,19 +166,29 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
163166
}
164167
}
165168

166-
function addListeners(channel, errorHandler, returnHandler) {
167-
channel.on('error', errorHandler);
169+
function makeDisconnectionHandler(channel, messageId, session, config) {
170+
return _.once((err) => {
171+
// Use setImmediate to avoid amqplib accept loop swallowing errors
172+
setImmediate(() => (err
173+
// Treat close events with errors as error events
174+
? handleChannelError(channel, messageId, session, config, err)
175+
: handleChannelClose(channel, messageId, session, config)));
176+
});
177+
}
178+
179+
function addListeners(channel, disconnectionHandler, returnHandler) {
180+
channel.on('error', disconnectionHandler);
168181
channel.on('return', returnHandler);
169-
channel.connection.once('error', errorHandler);
170-
channel.connection.once('close', errorHandler);
182+
channel.connection.once('error', disconnectionHandler);
183+
channel.connection.once('close', disconnectionHandler);
171184
}
172185

173-
function removeListeners(channel, errorHandler, returnHandler) {
186+
function removeListeners(channel, disconnectionHandler, returnHandler) {
174187
channel.removeAllListeners('drain');
175-
channel.removeListener('error', errorHandler);
188+
channel.removeListener('error', disconnectionHandler);
176189
channel.removeListener('return', returnHandler);
177-
channel.connection.removeListener('error', errorHandler);
178-
channel.connection.removeListener('close', errorHandler);
190+
channel.connection.removeListener('error', disconnectionHandler);
191+
channel.connection.removeListener('close', disconnectionHandler);
179192
}
180193

181194
function publishToExchange(channel, content, config, next) {
@@ -252,3 +265,8 @@ function handleChannelError(borked, messageId, emitter, config, err) {
252265
debug('Channel error: %s during publication of message: %s to %s using channel: %s', err.message, messageId, config.name, borked._rascal_id);
253266
emitter.emit('error', err, messageId);
254267
}
268+
269+
function handleChannelClose(borked, messageId, emitter, config) {
270+
debug('Channel closed during publication of message: %s to %s using channel: %s', messageId, config.name, borked._rascal_id);
271+
emitter.emit('close', messageId);
272+
}

lib/amqp/Subscription.js

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
6161
_configureQos(config, channel, (err) => {
6262
if (err) return done(err);
6363

64-
const removeErrorHandlers = attachErrorHandlers(channel, session, config);
65-
const onMessage = _onMessage.bind(null, session, config, removeErrorHandlers);
64+
const removeDisconnectionHandlers = attachDisconnectionHandlers(channel, session, config);
65+
const onMessage = _onMessage.bind(null, session, config, removeDisconnectionHandlers);
6666

6767
channel.consume(config.source, onMessage, config.options, (err, response) => {
6868
if (err) {
6969
debug('Error subscribing to %s using channel: %s. %s', config.source, channel._rascal_id, err.message);
70-
removeErrorHandlers();
70+
removeDisconnectionHandlers();
7171
return done(err);
7272
}
7373
session._open(channel, response.consumerTag, (err) => {
@@ -88,8 +88,8 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
8888
async.series(qos, next);
8989
}
9090

91-
function _onMessage(session, config, removeErrorHandlers, message) {
92-
if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);
91+
function _onMessage(session, config, removeDisconnectionHandlers, message) {
92+
if (!message) return handleConsumerCancel(session, config, removeDisconnectionHandlers);
9393

9494
debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
9595
session._incrementUnacknowledgeMessageCount(message.fields.consumerTag);
@@ -249,51 +249,66 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
249249
if (err) session.emit('error', err);
250250
}
251251

252-
function attachErrorHandlers(channel, session, config) {
252+
function attachDisconnectionHandlers(channel, session, config) {
253253
/* eslint-disable no-use-before-define */
254254
const connection = channel.connection;
255-
const removeErrorHandlers = _.once(() => {
256-
channel.removeListener('error', errorHandler);
255+
const removeDisconnectionHandlers = _.once(() => {
256+
channel.removeListener('error', disconnectionHandler);
257257
channel.on('error', (err) => {
258258
debug('Suppressing error on cancelled session: %s to prevent connection errors. %s', channel._rascal_id, err.message);
259259
});
260-
connection.removeListener('error', errorHandler);
261-
connection.removeListener('close', errorHandler);
260+
connection.removeListener('error', disconnectionHandler);
261+
connection.removeListener('close', disconnectionHandler);
262+
});
263+
264+
const disconnectionHandler = makeDisconnectionHandler(session, config, removeDisconnectionHandlers);
265+
channel.on('error', disconnectionHandler);
266+
connection.once('error', disconnectionHandler);
267+
connection.once('close', disconnectionHandler);
268+
return removeDisconnectionHandlers;
269+
}
270+
271+
function makeDisconnectionHandler(session, config, removeDisconnectionHandlers) {
272+
return _.once((err) => {
273+
// Use setImmediate to avoid amqplib accept loop swallowing errors
274+
setImmediate(() => (err
275+
// Treat close events with errors as error events
276+
? handleChannelError(session, config, removeDisconnectionHandlers, 0, err)
277+
: handleChannelClose(session, config, removeDisconnectionHandlers, 0)));
262278
});
263-
const errorHandler = _.once(handleChannelError.bind(null, session, config, removeErrorHandlers, 0));
264-
channel.on('error', errorHandler);
265-
connection.once('error', errorHandler);
266-
connection.once('close', errorHandler);
267-
return removeErrorHandlers;
268279
}
269280

270-
function handleChannelError(session, config, removeErrorHandlers, attempts, err) {
281+
function handleChannelError(session, config, removeDisconnectionHandler, attempt, err) {
271282
debug('Handling channel error: %s from %s using channel: %s', err.message, config.name, session._getRascalChannelId());
272-
if (removeErrorHandlers) removeErrorHandlers();
283+
if (removeDisconnectionHandler) removeDisconnectionHandler();
273284
session.emit('error', err);
274-
config.retry
275-
&& subscribeNow(session, config, (err) => {
276-
if (!err) return;
277-
const delay = timer.next();
278-
debug('Will attempt resubscription(%d) to %s in %dms', attempts + 1, config.name, delay);
279-
session._schedule(handleChannelError.bind(null, session, config, null, attempts + 1, err), delay);
280-
});
285+
retrySubscription(session, config, attempt + 1);
281286
}
282287

283-
function handleConsumerCancel(session, config, removeErrorHandlers) {
288+
function handleChannelClose(session, config, removeDisconnectionHandler, attempt) {
289+
debug('Handling channel close from %s using channel: %s', config.name, session._getRascalChannelId());
290+
removeDisconnectionHandler();
291+
session.emit('close');
292+
retrySubscription(session, config, attempt + 1);
293+
}
294+
295+
function handleConsumerCancel(session, config, removeDisconnectionHandler) {
284296
debug('Received consumer cancel from %s using channel: %s', config.name, session._getRascalChannelId());
285-
removeErrorHandlers();
297+
removeDisconnectionHandler();
298+
const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name));
299+
session.emit('cancelled', cancelErr) || session.emit('cancel', cancelErr) || session.emit('error', cancelErr);
286300
session._close((err) => {
287301
if (err) debug('Error cancelling subscription: %s', err.message);
288-
const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name));
289-
session.emit('cancelled', cancelErr) || session.emit('error', cancelErr);
290-
config.retry
291-
&& subscribeNow(session, config, (err) => {
292-
if (!err) return;
293-
const delay = timer.next();
294-
debug('Will attempt resubscription(%d) to %s in %dms', 1, config.name, delay);
295-
session._schedule(handleChannelError.bind(null, session, config, null, 1, err), delay);
296-
});
302+
retrySubscription(session, config, 1);
303+
});
304+
}
305+
306+
function retrySubscription(session, config, attempt) {
307+
config.retry && subscribeNow(session, config, (err) => {
308+
if (!err) return;
309+
const delay = timer.next();
310+
debug('Will attempt resubscription(%d) to %s in %dms', attempt, config.name, delay);
311+
session._schedule(handleChannelError.bind(null, session, config, null, attempt, err), delay);
297312
});
298313
}
299314
}

lib/amqp/Vhost.js

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ function Vhost(vhostConfig, components) {
5757
connectionConfig = ctx.connectionConfig;
5858
timer = backoff(ctx.connectionConfig.retry);
5959

60-
attachErrorHandlers(config);
60+
attachDisconnectionHandlers(config);
6161
forwardRabbitMQConnectionEvents();
6262
ensureChannelPools();
6363
resumeChannelAllocation();
@@ -471,11 +471,21 @@ function Vhost(vhostConfig, components) {
471471
);
472472
}
473473

474-
function attachErrorHandlers(config) {
474+
function attachDisconnectionHandlers(config) {
475475
connection.removeAllListeners('error');
476-
const errorHandler = _.once(handleConnectionError.bind(null, connection, config));
477-
connection.on('error', errorHandler);
478-
connection.on('close', errorHandler);
476+
const disconectionHandler = makeDisconnectionHandler(config);
477+
connection.on('error', disconectionHandler);
478+
connection.on('close', disconectionHandler);
479+
}
480+
481+
function makeDisconnectionHandler(config) {
482+
return _.once((err) => {
483+
// Use setImmediate to avoid amqplib accept loop swallowing errors
484+
setImmediate(() => (err
485+
// Treat close events with errors as error events
486+
? handleConnectionError(connection, config, err)
487+
: handleConnectionClose(connection, config)));
488+
});
479489
}
480490

481491
function handleConnectionError(borked, config, err) {
@@ -484,12 +494,24 @@ function Vhost(vhostConfig, components) {
484494
connection = undefined;
485495
self.emit('disconnect');
486496
self.emit('error', err, self.getConnectionDetails());
487-
connectionConfig.retry
488-
&& self.init((err) => {
489-
if (!err) return;
490-
const delay = timer.next();
491-
debug('Will attempt reconnection in in %dms', delay);
492-
reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay);
493-
});
497+
retryConnection(borked, config);
498+
}
499+
500+
function handleConnectionClose(borked, config) {
501+
debug('Handling connection close initially from connection: %s, %s', borked._rascal_id, connectionConfig.loggableUrl);
502+
pauseChannelAllocation();
503+
connection = undefined;
504+
self.emit('disconnect');
505+
self.emit('close', self.getConnectionDetails());
506+
retryConnection(borked, config);
507+
}
508+
509+
function retryConnection(borked, config) {
510+
connectionConfig.retry && self.init((err) => {
511+
if (!err) return;
512+
const delay = timer.next();
513+
debug('Will attempt reconnection in in %dms', delay);
514+
reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay);
515+
});
494516
}
495517
}

0 commit comments

Comments
 (0)