Skip to content

Commit 6ebfe8f

Browse files
committed
1 parent 48c0198 commit 6ebfe8f

7 files changed

Lines changed: 29 additions & 16 deletions

File tree

.github/workflows/node-js-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- 15672:15672
1414
strategy:
1515
matrix:
16-
node-version: [14.x]
16+
node-version: [14.x, 16.x, 18.x, 20.x]
1717
steps:
1818
- uses: actions/checkout@v3
1919
- uses: actions/setup-node@v3
@@ -30,7 +30,7 @@ jobs:
3030
runs-on: ubuntu-latest
3131
services:
3232
rabbitmq:
33-
image: rabbitmq:3-management-alpine
33+
image: rabbitmq:3.13.2-management-alpine
3434
ports:
3535
- 5672:5672
3636
- 15672:15672

lib/amqp/SubscriberError.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const format = require('util').format;
33
const _ = require('lodash');
44
const async = require('async');
55
const setTimeoutUnref = require('../utils/setTimeoutUnref');
6+
const { EMPTY_X_DEATH } = require('./XDeath');
67

78
module.exports = function SubscriptionRecovery(broker, vhost) {
89
this.handle = function (session, message, err, recoveryOptions, next) {
@@ -86,7 +87,7 @@ module.exports = function SubscriptionRecovery(broker, vhost) {
8687

8788
if (strategyConfig.immediateNack) {
8889
const xDeathRecords = message.properties.headers['x-death'] || [];
89-
const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue };
90+
const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH;
9091
_.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue], { immediateNack: true, xDeath });
9192
}
9293

lib/amqp/Subscription.js

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const SubscriberSession = require('./SubscriberSession');
88
const SubscriberError = require('./SubscriberError');
99
const backoff = require('../backoff');
1010
const setTimeoutUnref = require('../utils/setTimeoutUnref');
11+
const { EMPTY_X_DEATH } = require('./XDeath');
1112

1213
module.exports = {
1314
create(broker, vhost, counter, config, next) {
@@ -214,16 +215,22 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
214215
function immediateNack(message) {
215216
const originalQueue = message.properties.headers.rascal.originalQueue;
216217
const xDeathRecords = message.properties.headers['x-death'] || [];
217-
const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue };
218-
const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], { count: 0, queue: originalQueue });
218+
const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH;
219+
const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], EMPTY_X_DEATH);
219220
const hasImmediateNackHeader = _.has(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
220221
if (!hasImmediateNackHeader) return false;
221222
debug('Message %s has been marked for immediate nack. Previous xDeath is %o. Current xDeath is %o.', message.properties.messageId, previousXDeath, currentXDeath);
222-
if (currentXDeath.count === previousXDeath.count) return true;
223-
debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId);
224-
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
225-
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']);
226-
return false;
223+
// See https://github.com/rabbitmq/rabbitmq-server/issues/11331
224+
// RabbitMQ v3.13 stopped updating the xDeath record's count property.
225+
// RabbitMQ v3.12 does not update the xDeath record's time property.
226+
// Therefore having test them both
227+
if (currentXDeath.count > previousXDeath.count || currentXDeath.time.value > previousXDeath.time.value) {
228+
debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId);
229+
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
230+
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']);
231+
return false;
232+
}
233+
return true;
227234
}
228235

229236
function getAckOrNack(session, message) {

lib/amqp/XDeath.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
const EMPTY_X_DEATH = { count: 0, time: { value: 0 } };
2+
3+
module.exports = {
4+
EMPTY_X_DEATH,
5+
};

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
"lint:fix": "eslint --fix .",
4141
"lint-staged": "lint-staged",
4242
"coverage": "nyc --report html --reporter lcov --reporter text-summary zUnit",
43-
"docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management-alpine",
43+
"docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12.9-management-alpine",
4444
"prepare": "husky install"
4545
},
4646
"lint-staged": {

test/subscriptions.tests.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,7 +1651,7 @@ describe('Subscriptions', () => {
16511651
count++;
16521652
if (count === 1) {
16531653
assert.ok(message);
1654-
ackOrNack(new Error('immediate nack'), {
1654+
ackOrNack(new Error(`Test Error ${count}`), {
16551655
strategy: 'republish',
16561656
immediateNack: true,
16571657
});
@@ -1739,7 +1739,7 @@ describe('Subscriptions', () => {
17391739
count++;
17401740
if (count <= 2) {
17411741
assert.ok(message);
1742-
ackOrNack(new Error(`immediate nack: ${count}`), {
1742+
ackOrNack(new Error(`Test Error ${count}`), {
17431743
strategy: 'republish',
17441744
immediateNack: true,
17451745
});
@@ -1762,7 +1762,7 @@ describe('Subscriptions', () => {
17621762
});
17631763
},
17641764
);
1765-
}, { exclusive: true, timeout: 10000 });
1765+
});
17661766

17671767
it('should forward messages to publication when requested', (test, done) => {
17681768
createBroker(

test/subscriptionsAsPromised.tests.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,7 @@ describe('Subscriptions As Promised', () => {
10961096
subscription.on('message', (message, content, ackOrNack) => {
10971097
assert.strictEqual(++count, 1);
10981098
assert.ok(message);
1099-
ackOrNack(new Error('immediate nack'), { strategy: 'republish', immediateNack: true });
1099+
ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true });
11001100
});
11011101
});
11021102

@@ -1167,7 +1167,7 @@ describe('Subscriptions As Promised', () => {
11671167
count++;
11681168
if (count === 1) {
11691169
assert.ok(message);
1170-
ackOrNack(new Error('immediate nack'), {
1170+
ackOrNack(new Error(`Test Error ${count}`), {
11711171
strategy: 'republish',
11721172
immediateNack: true,
11731173
});

0 commit comments

Comments
 (0)