Skip to content

Commit 8e9dcd0

Browse files
Refactor share consumer metrics to use generic consumer counters in soak client
1 parent ecb8c17 commit 8e9dcd0

2 files changed

Lines changed: 21 additions & 13 deletions

File tree

tests/soak/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ OpenTelemetry reporting supported through OTLP.
3030
5. Run some tests
3131
```bash
3232
TESTID=<testid> ./run.sh ccloud.config
33-
```
33+
```
34+
35+
## Share Consumer
36+
37+
To run with a share consumer instead of the regular consumer:
38+
```bash
39+
SHARE=true TESTID=<testid> ./run.sh ccloud.config
40+
```
41+
Requires KIP-932 compatible librdkafka and broker.

tests/soak/soakclient.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def share_error_cb(self, err):
339339
"""Share consumer error callback"""
340340
self.logger.error("share: error_cb: {}".format(err))
341341
self.share_error_cb_cnt += 1
342-
self.incr_counter("share.errorcb", 1)
342+
self.incr_counter("consumer.errorcb", 1)
343343

344344
def share_status(self):
345345
"""Print share consumer status"""
@@ -384,7 +384,7 @@ def share_run(self):
384384
except Exception as ex:
385385
self.logger.error("share: consume_batch exception: {}".format(ex))
386386
self.share_err_cnt += 1
387-
self.incr_counter("share.error", 1)
387+
self.incr_counter("consumer.error", 1)
388388
continue
389389

390390
if not messages:
@@ -394,7 +394,7 @@ def share_run(self):
394394
if msg.error() is not None:
395395
self.logger.error("share: error: {}".format(msg.error()))
396396
self.share_err_cnt += 1
397-
self.incr_counter("share.error", 1)
397+
self.incr_counter("consumer.error", 1)
398398
continue
399399

400400
try:
@@ -408,18 +408,18 @@ def share_run(self):
408408
)
409409
)
410410
self.share_msg_err_cnt += 1
411-
self.incr_counter("share.msgerr", 1)
411+
self.incr_counter("consumer.msgerr", 1)
412412

413413
self.share_msg_cnt += 1
414-
self.incr_counter("share.msg", 1)
414+
self.incr_counter("consumer.msg", 1)
415415

416416
# end-to-end latency
417417
headers = dict(msg.headers())
418418
txtime = headers.get('time', None)
419419
if txtime is not None:
420420
latency = time.time() - float(txtime)
421421
self.set_gauge(
422-
"share.e2e_latency", latency,
422+
"consumer.e2e_latency", latency,
423423
tags={"partition": "{}".format(msg.partition())}
424424
)
425425

@@ -446,7 +446,7 @@ def share_run(self):
446446
)
447447
)
448448
self.share_msg_dup_cnt += (hw + 1) - msg.offset()
449-
self.incr_counter("share.msgdup", 1)
449+
self.incr_counter("consumer.msgdup", 1)
450450
elif msg.offset() > hw + 1:
451451
self.logger.warning(
452452
"share: Lost messages, now at {} "
@@ -457,7 +457,7 @@ def share_run(self):
457457
)
458458
)
459459
self.share_msg_miss_cnt += msg.offset() - (hw + 1)
460-
self.incr_counter("share.missedmsg", 1)
460+
self.incr_counter("consumer.missedmsg", 1)
461461

462462
hwmarks[hwkey] = msg.offset()
463463

@@ -642,10 +642,10 @@ def filter_config(conf, filter_out, strip_prefix):
642642
self.share_consumer = ShareConsumer(sconf)
643643

644644
# Initialize counters to zero
645-
self.incr_counter("share.error", 0)
646-
self.incr_counter("share.msgdup", 0)
647-
self.incr_counter("share.msgerr", 0)
648-
self.incr_counter("share.errorcb", 0)
645+
self.incr_counter("consumer.error", 0)
646+
self.incr_counter("consumer.msgdup", 0)
647+
self.incr_counter("consumer.msgerr", 0)
648+
self.incr_counter("consumer.errorcb", 0)
649649

650650
# Create and start share consumer thread
651651
self.share_thread = threading.Thread(target=self.share_thread_main)

0 commit comments

Comments
 (0)