Skip to content

Commit 767faa5

Browse files
fix: sorted cache keys, logging, and no-tx metric in Kafka hooks
- confluentkafka: sort bootstrap servers for cache key in produce/consume wrappers and in _fetch_cluster_id so broker order doesn't affect cache hits - kafkapython: add _logger; collapse no-transaction early-return in wrap_KafkaProducer_send (cluster metric already recorded inside transaction block); add debug logging to all except-Exception handlers in _run() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9799069 commit 767faa5

2 files changed

Lines changed: 9 additions & 18 deletions

File tree

newrelic/hooks/messagebroker_confluentkafka.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ def _run():
7878
if cache_key:
7979
with _nr_cluster_id_cache_lock:
8080
_nr_cluster_id_cache[cache_key] = cluster_id
81-
except Exception:
81+
except Exception as e:
82+
_logger.debug("NR Kafka cluster ID fetch failed", exc_info=True)
8283
if cache_key:
8384
with _nr_cluster_id_cache_lock:
8485
_nr_cluster_id_cache.pop(cache_key, None)
@@ -117,7 +118,7 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
117118

118119
cluster_id = getattr(instance, "_nr_cluster_id", None)
119120
if not cluster_id and hasattr(instance, "_nr_bootstrap_servers"):
120-
_cache_key = ",".join(instance._nr_bootstrap_servers)
121+
_cache_key = ",".join(sorted(instance._nr_bootstrap_servers))
121122
cluster_id = _nr_cluster_id_cache.get(_cache_key) or None
122123
if cluster_id:
123124
instance._nr_cluster_id = cluster_id # cache on instance for future calls
@@ -236,7 +237,7 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
236237
)
237238
cluster_id = getattr(instance, "_nr_cluster_id", None)
238239
if not cluster_id and hasattr(instance, "_nr_bootstrap_servers"):
239-
_cache_key = ",".join(instance._nr_bootstrap_servers)
240+
_cache_key = ",".join(sorted(instance._nr_bootstrap_servers))
240241
cluster_id = _nr_cluster_id_cache.get(_cache_key) or None
241242
if cluster_id:
242243
instance._nr_cluster_id = cluster_id

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import logging
1415
import sys
1516
import threading
1617

@@ -37,6 +38,7 @@
3738

3839
_kafka_cluster_id_cache = {}
3940
_nr_cluster_id_cache_lock = threading.Lock()
41+
_logger = logging.getLogger(__name__)
4042

4143

4244
def _bootstrap_cache_key(bootstrap_servers):
@@ -54,16 +56,6 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
5456
transaction = current_transaction()
5557

5658
if transaction is None:
57-
app = application_instance()
58-
if not app.active:
59-
app.activate()
60-
servers = instance.config.get("bootstrap_servers", []) if hasattr(instance, "config") else []
61-
if app.active and servers:
62-
cache_key = _bootstrap_cache_key(servers)
63-
cluster_id = _kafka_cluster_id_cache.get(cache_key)
64-
if cluster_id:
65-
topic = (_bind_send(*args, **kwargs)[0] or "Default")
66-
app.record_custom_metric(KAFKA_CLUSTER_METRIC_PRODUCE.format(cluster_id, topic), 1)
6759
return wrapped(*args, **kwargs)
6860

6961
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
@@ -221,15 +213,13 @@ def _run():
221213
admin._client.poll(timeout_ms=3000)
222214
try:
223215
result = admin.describe_cluster()
224-
cluster_id = result.get("cluster_id") if isinstance(result, dict) else (
225-
getattr(result, "cluster_id", None) or getattr(result, "clusterId", None)
226-
)
216+
cluster_id = getattr(result, "cluster_id", None) or getattr(result, "clusterId", None)
227217
except Exception:
228-
pass
218+
_logger.debug("NR Kafka describe_cluster failed", exc_info=True)
229219
if not cluster_id:
230220
cluster_id = getattr(meta, "cluster_id", None) or getattr(meta, "_cluster_id", None)
231221
except Exception:
232-
pass
222+
_logger.debug("NR Kafka cluster ID fetch failed", exc_info=True)
233223
finally:
234224
try:
235225
if admin is not None:

0 commit comments

Comments
 (0)