Skip to content

Commit aa79ee9

Browse files
committed
fix(consumer): drain in-flight bootstrap consume workers before unsubscribe
The bootstrap consumer uses setInterval(200ms) to call consume(1, cb), creating multiple concurrent C++ async workers (each with a 1000ms timeout). Since librdkafka 2.10.0, these workers survive an unsubscribe→subscribe transition and can dequeue messages from the next subscription, causing them to be lost to the normal consume pipeline. Track in-flight workers with a counter and defer unsubscribe() until all have completed. Issue: BB-760
1 parent 1735ada commit aa79ee9

1 file changed

Lines changed: 27 additions & 11 deletions

File tree

lib/BackbeatConsumer.js

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,32 @@ class BackbeatConsumer extends EventEmitter {
873873
let producer; // eslint-disable-line prefer-const
874874
let producerTimer;
875875
let consumerTimer;
876+
let inFlightConsumes = 0;
877+
// Wait for all in-flight consume(1, consumeCb) workers to complete
878+
// before calling unsubscribe(). The setInterval(200ms) pattern can
879+
// have multiple consume workers running concurrently in the C++
880+
// thread pool (each with a 1000ms timeout). Since librdkafka 2.10.0,
881+
// these workers survive an unsubscribe→subscribe transition and can
882+
// steal messages from the next subscription.
883+
function _finishBootstrap(partition, offset) {
884+
if (inFlightConsumes > 0) {
885+
setTimeout(() => _finishBootstrap(partition, offset), 50);
886+
return;
887+
}
888+
self._consumer.offsetsStore([{
889+
topic: self._topic,
890+
partition,
891+
offset,
892+
}]);
893+
self._consumer.commit();
894+
self._consumer.unsubscribe();
895+
producer.close(() => {
896+
self._bootstrapping = false;
897+
self._onReady();
898+
});
899+
}
876900
function consumeCb(err, messages) {
901+
inFlightConsumes--;
877902
if (err) {
878903
return undefined;
879904
}
@@ -888,17 +913,7 @@ class BackbeatConsumer extends EventEmitter {
888913
{ topic: self._topic, groupId: self._groupId });
889914
clearInterval(producerTimer);
890915
clearInterval(consumerTimer);
891-
self._consumer.offsetsStore([{
892-
topic: self._topic,
893-
partition: message.partition,
894-
offset: message.offset + 1,
895-
}]);
896-
self._consumer.commit();
897-
self._consumer.unsubscribe();
898-
producer.close(() => {
899-
self._bootstrapping = false;
900-
self._onReady();
901-
});
916+
_finishBootstrap(message.partition, message.offset + 1);
902917
}
903918
}
904919
});
@@ -928,6 +943,7 @@ class BackbeatConsumer extends EventEmitter {
928943
this._consumer.on('ready', () => {
929944
this._consumer.subscribe([this._topic]);
930945
consumerTimer = setInterval(() => {
946+
inFlightConsumes++;
931947
this._consumer.consume(1, consumeCb);
932948
}, 200);
933949
});

0 commit comments

Comments
 (0)