diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 63bf426fe..794f7305f 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -219,6 +219,10 @@ class BackbeatConsumer extends EventEmitter { const consumerParams = { 'metadata.broker.list': this._kafkaHosts, 'group.id': this._groupId, + // This is the default in our current librdkafka version, but we + // pin it explicitly because we depend on eager rebalancing and + // don't want it changed implicitly by future version updates. + 'partition.assignment.strategy': 'range,roundrobin', // we manage stored offsets based on the highest // contiguous offset fully processed by a worker, so // disabling automatic offset store is needed @@ -872,37 +876,53 @@ class BackbeatConsumer extends EventEmitter { let lastBootstrapId; let producer; // eslint-disable-line prefer-const let producerTimer; - let consumerTimer; - function consumeCb(err, messages) { - if (err) { - return undefined; - } - messages.forEach(message => { - const bootstrapId = JSON.parse(message.value).bootstrapId; - if (bootstrapId) { + // Use chained setTimeout instead of setInterval to ensure + // only one consume() worker is in flight at a time. With + // setInterval, multiple C++ async workers can overlap, and + // since librdkafka 2.10.0 they survive an + // unsubscribe→subscribe transition, stealing messages from + // the next subscription. + function consume() { + self._consumer.consume(1, (err, messages) => { + if (err) { + self._log.debug('bootstrap consume error', { + error: err.message, + topic: self._topic, + groupId: self._groupId, + }); + setTimeout(consume, 200); + return undefined; + } + const match = messages.find(message => { + const bootstrapId = JSON.parse(message.value).bootstrapId; + if (!bootstrapId) { + return false; + } self._log.info('bootstraping backbeat consumer: ' + 'received bootstrap message', { bootstrapId, topic: self._topic, groupId: self._groupId }); - if (bootstrapId === lastBootstrapId) { - self._log.info('backbeat consumer is bootstrapped', - { topic: self._topic, groupId: self._groupId }); - clearInterval(producerTimer); - clearInterval(consumerTimer); - self._consumer.offsetsStore([{ - topic: self._topic, - partition: message.partition, - offset: message.offset + 1, - }]); - self._consumer.commit(); - self._consumer.unsubscribe(); - producer.close(() => { - self._bootstrapping = false; - self._onReady(); - }); - } + return bootstrapId === lastBootstrapId; + }); + if (!match) { + setTimeout(consume, 200); + return undefined; } + self._log.info('backbeat consumer is bootstrapped', + { topic: self._topic, groupId: self._groupId }); + clearInterval(producerTimer); + self._consumer.offsetsStore([{ + topic: self._topic, + partition: match.partition, + offset: match.offset + 1, + }]); + self._consumer.commit(); + self._consumer.unsubscribe(); + producer.close(() => { + self._bootstrapping = false; + self._onReady(); + }); + return undefined; }); - return undefined; } assert.strictEqual(this._consumer, null); producer = new BackbeatProducer({ @@ -927,9 +947,7 @@ class BackbeatConsumer extends EventEmitter { this._initConsumer(); this._consumer.on('ready', () => { this._consumer.subscribe([this._topic]); - consumerTimer = setInterval(() => { - this._consumer.consume(1, consumeCb); - }, 200); + consume(); }); }, 500); } diff --git a/lib/queuePopulator/KafkaLogConsumer/LogConsumer.js b/lib/queuePopulator/KafkaLogConsumer/LogConsumer.js index 2468f6fc4..851328003 100644 --- a/lib/queuePopulator/KafkaLogConsumer/LogConsumer.js +++ b/lib/queuePopulator/KafkaLogConsumer/LogConsumer.js @@ -54,6 +54,10 @@ class LogConsumer extends EventEmitter { setup(done) { // partition offsets will be managed by kafka const consumerParams = { + // This is the default in our current librdkafka version, but we + // pin it explicitly because we depend on eager rebalancing and + // don't want it changed implicitly by future version updates. + 'partition.assignment.strategy': 'range,roundrobin', // Manually manage storing offsets to ensure they are only stored // after the batch processing is fully completed. 'enable.auto.offset.store': false, diff --git a/package.json b/package.json index e4f807494..3ab4cbef8 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,7 @@ "minimatch": "^10.0.1", "mongodb": "^6.11.0", "node-forge": "^1.3.1", - "node-rdkafka": "^2.12.0", + "node-rdkafka": "^3.6.0", "node-rdkafka-prometheus": "^1.0.0", "node-schedule": "^2.1.1", "node-zookeeper-client": "^1.1.3", diff --git a/yarn.lock b/yarn.lock index a4e5d2d8f..a9a03dd08 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7613,11 +7613,16 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== -nan@^2.14.0, nan@^2.17.0, nan@^2.18.0, nan@^2.3.2: +nan@^2.14.0, nan@^2.18.0, nan@^2.3.2: version "2.23.1" resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975" integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw== +nan@^2.24.0: + version "2.26.2" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.26.2.tgz#2e5e25764224c737b9897790b57c3294d4dcee9c" + integrity sha512-0tTvBTYkt3tdGw22nrAy50x7gpbGCCFH3AFcyS5WiUu7Eu4vWlri1woE6qHBSfy11vksDqkiwjOnlR7WV8G1Hw== + napi-macros@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/napi-macros/-/napi-macros-2.0.0.tgz#2b6bae421e7b96eb687aa6c77a7858640670001b" @@ -7751,13 +7756,13 @@ node-rdkafka-prometheus@^1.0.0: "@log4js-node/log4js-api" "^1.0.0" prom-client "^12.0.0" -node-rdkafka@^2.12.0: - version "2.18.0" - resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.18.0.tgz#116950e49dfe804932c8bc6dbc68949793e72ee2" - integrity sha512-jYkmO0sPvjesmzhv1WFOO4z7IMiAFpThR6/lcnFDWgSPkYL95CtcuVNo/R5PpjujmqSgS22GMkL1qvU4DTAvEQ== +node-rdkafka@^3.6.0: + version "3.6.1" + resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-3.6.1.tgz#164357f08ff23a4722e89bfb3b2b09481c1e8cc1" + integrity sha512-sfpTbrT35429cs0RE8Yb9avGX9BiRRvpV7aweQsghKTjtrVZP3jBrKOPItWXAqHb8doyh3SkuGuUVBLgig1SRg== dependencies: bindings "^1.3.1" - nan "^2.17.0" + nan "^2.24.0" node-releases@^2.0.27: version "2.0.27"