Skip to content

Commit 3cc85bb

Browse files
committed
Merge branch 'bugfix/BB-752' into q/9.1
2 parents d252c93 + 63befa4 commit 3cc85bb

6 files changed

Lines changed: 94 additions & 4 deletions

File tree

extensions/gc/GarbageCollector.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,17 @@ class GarbageCollector extends EventEmitter {
167167
}
168168

169169
/**
170-
* Close the lifecycle consumer
170+
* Close the garbage collector consumer
171171
* @param {function} cb - callback function
172172
* @return {undefined}
173173
*/
174174
close(cb) {
175175
this._logger.debug('closing garbage collector consumer');
176-
this._consumer.close(cb);
176+
if (this._consumer) {
177+
this._consumer.close(cb);
178+
} else {
179+
cb();
180+
}
177181
}
178182

179183
processKafkaEntry(kafkaEntry, done) {

extensions/gc/service.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ initAndStart();
100100

101101
process.on('SIGTERM', () => {
102102
logger.info('received SIGTERM, exiting');
103-
garbageCollector.stop(() => {
103+
garbageCollector.close(() => {
104104
process.exit(0);
105105
});
106106
});

extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,11 @@ class LifecycleObjectProcessor extends EventEmitter {
145145
clearInterval(this._deleteInactiveCredentialsInterval);
146146
}
147147

148-
this._consumer.close(cb);
148+
if (this._consumers) {
149+
this._consumers.close(cb);
150+
} else {
151+
cb();
152+
}
149153
}
150154

151155
/**

lib/BackbeatConsumer.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,7 @@ class BackbeatConsumer extends EventEmitter {
823823
});
824824
// ensure consumer is active when calling offsetsStore() on
825825
// it, to avoid raising an exception (invalid state)
826+
// TODO : potential issue here, see BB-758
826827
if (committableOffset !== null && !this.isPaused()) {
827828
this._consumer.offsetsStore([{ topic, partition,
828829
offset: committableOffset }]);

tests/unit/gc/GarbageCollector.spec.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,47 @@ describe('garbage collector', function garbageCollector() {
9292
.setAttribute('target.owner', ownerId);
9393
gcTask.processActionEntry(action, done);
9494
});
95+
96+
describe('close() garbage collector', () => {
97+
it('should call close on the consumer when it exists', done => {
98+
const gcWithConsumer = new GarbageCollector({
99+
kafkaConfig: {},
100+
s3Config: { host: 'localhost', port: 7777 },
101+
gcConfig: {
102+
topic: 'backbeat-gc',
103+
auth: { type: 'account', account: 'bart' },
104+
consumer: { groupId: 'backbeat-gc-consumer-group' },
105+
},
106+
});
107+
let closeCalled = false;
108+
gcWithConsumer._consumer = {
109+
close: cb => {
110+
closeCalled = true;
111+
cb();
112+
},
113+
};
114+
gcWithConsumer.close(err => {
115+
assert.ifError(err);
116+
assert.strictEqual(closeCalled, true);
117+
done();
118+
});
119+
});
120+
121+
it('should call callback immediately when consumer is null', done => {
122+
const gcNoConsumer = new GarbageCollector({
123+
kafkaConfig: {},
124+
s3Config: { host: 'localhost', port: 7777 },
125+
gcConfig: {
126+
topic: 'backbeat-gc',
127+
auth: { type: 'account', account: 'bart' },
128+
consumer: { groupId: 'backbeat-gc-consumer-group' },
129+
},
130+
});
131+
assert.strictEqual(gcNoConsumer._consumer, null);
132+
gcNoConsumer.close(err => {
133+
assert.ifError(err);
134+
done();
135+
});
136+
});
137+
});
95138
});

tests/unit/lifecycle/LifecycleObjectExpirationProcessor.spec.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const assert = require('assert');
2+
const sinon = require('sinon');
23
const config = require('../../config.json');
34
const LifecycleObjectExpirationProcessor =
45
require('../../../extensions/lifecycle/objectProcessor/LifecycleObjectExpirationProcessor');
@@ -23,4 +24,41 @@ describe('LifecycleObjectExpirationProcessor', () => {
2324
config.extensions.lifecycle.objectTasksTopic,
2425
);
2526
});
27+
28+
describe('close() expiration processor', () => {
29+
it('should call close on consumers when they exist', done => {
30+
let closeCalled = false;
31+
objectProcessor._consumers = {
32+
close: cb => {
33+
closeCalled = true;
34+
cb();
35+
},
36+
};
37+
objectProcessor.close(err => {
38+
assert.ifError(err);
39+
assert.strictEqual(closeCalled, true);
40+
done();
41+
});
42+
});
43+
44+
it('should call callback immediately when consumers is null', done => {
45+
assert.strictEqual(objectProcessor._consumers, null);
46+
objectProcessor.close(err => {
47+
assert.ifError(err);
48+
done();
49+
});
50+
});
51+
52+
it('should clear deleteInactiveCredentialsInterval if set', done => {
53+
const spy = sinon.spy(global, 'clearInterval');
54+
const interval = setInterval(() => {}, 100000);
55+
objectProcessor._deleteInactiveCredentialsInterval = interval;
56+
objectProcessor.close(err => {
57+
assert.ifError(err);
58+
assert(spy.calledWith(interval));
59+
spy.restore();
60+
done();
61+
});
62+
});
63+
});
2664
});

0 commit comments

Comments
 (0)