Skip to content

Commit 2a12dbe

Browse files
Add support for ShareConsumer in soak client and update usage instructionsx
1 parent d600e92 commit 2a12dbe

2 files changed

Lines changed: 221 additions & 24 deletions

File tree

tests/soak/run.sh

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ topic="pysoak-$TESTID-$librdkafka_version"
1717
logfile="${TESTID}.log.bz2"
1818
limit=$((50 * 1024 * 1024)) # 50MB
1919
export HOSTNAME=$(hostname)
20+
21+
share_flag=""
22+
if [[ "${SHARE:-}" == "true" ]]; then
23+
share_flag="--share"
24+
fi
25+
2026
echo "Starting soak client using topic $topic. Logging to $logfile."
2127
set +x
2228
while [ "$run" = true ]; do
2329
# Ignore SIGINT in children (inherited)
2430
trap "" SIGINT
25-
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 |& tee /dev/tty | bzip2 > $logfile &
31+
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 $share_flag |& tee /dev/tty | bzip2 > $logfile &
2632
PID=$!
2733
terminate_last() {
2834
# List children of $PID only

tests/soak/soakclient.py

Lines changed: 214 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# long term validation testing.
2121
#
2222
# Usage:
23-
# tests/soak/soakclient.py -i <testid> -t <topic> -r <produce-rate> -f <client-conf-file>
23+
# tests/soak/soakclient.py -i <testid> -t <topic> -r <produce-rate> -f <client-conf-file> [--share]
2424
#
2525
# A unique topic should be used for each soakclient instance.
2626
#
@@ -43,6 +43,12 @@
4343
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, version
4444
from confluent_kafka.admin import AdminClient, NewTopic
4545

46+
try:
47+
from confluent_kafka import ShareConsumer
48+
HAS_SHARE_CONSUMER = True
49+
except ImportError:
50+
HAS_SHARE_CONSUMER = False
51+
4652

4753
class SoakRecord(object):
4854
"""A private record type, with JSON serializer and deserializer"""
@@ -68,7 +74,7 @@ def deserialize(cls, binstr):
6874

6975
class SoakClient(object):
7076
"""The SoakClient consists of a Producer sending messages at
71-
the given rate, and a Consumer consuming the messages.
77+
the given rate, and a Consumer or ShareConsumer consuming the messages.
7278
Both clients print their message and error counters every 10 seconds.
7379
The producer and consumer run in separate background threads.
7480
"""
@@ -329,6 +335,147 @@ def producer_error_cb(self, err):
329335
self.producer_error_cb_cnt += 1
330336
self.incr_counter("producer.errorcb", 1)
331337

338+
def share_error_cb(self, err):
339+
"""Share consumer error callback"""
340+
self.logger.error("share: error_cb: {}".format(err))
341+
self.share_error_cb_cnt += 1
342+
self.incr_counter("share.errorcb", 1)
343+
344+
def share_status(self):
345+
"""Print share consumer status"""
346+
self.logger.info(
347+
"share: {} messages consumed, {} duplicates, "
348+
"{} missed, {} message errors, {} consumer errors, {} error_cbs".format(
349+
self.share_msg_cnt,
350+
self.share_msg_dup_cnt,
351+
self.share_msg_miss_cnt,
352+
self.share_msg_err_cnt,
353+
self.share_err_cnt,
354+
self.share_error_cb_cnt,
355+
)
356+
)
357+
358+
def share_run(self):
359+
"""Share consumer main loop"""
360+
self.share_consumer.subscribe([self.topic])
361+
362+
self.share_msg_cnt = 0
363+
self.share_msg_dup_cnt = 0
364+
self.share_msg_miss_cnt = 0
365+
self.share_msg_err_cnt = 0
366+
self.share_err_cnt = 0
367+
self.share_error_cb_cnt = 0
368+
369+
# Track highest offset seen per partition for duplicate/gap detection.
370+
# With implicit ack and a single share consumer, offsets should
371+
# progress sequentially per partition.
372+
hwmarks = defaultdict(int)
373+
374+
next_status = time.time() + self.disprate
375+
376+
while self.run:
377+
now = time.time()
378+
if now > next_status:
379+
self.share_status()
380+
next_status = now + self.disprate
381+
382+
try:
383+
messages = self.share_consumer.consume_batch(timeout=1.0)
384+
except Exception as ex:
385+
self.logger.error("share: consume_batch exception: {}".format(ex))
386+
self.share_err_cnt += 1
387+
self.incr_counter("share.error", 1)
388+
continue
389+
390+
if not messages:
391+
continue
392+
393+
for msg in messages:
394+
if msg.error() is not None:
395+
self.logger.error("share: error: {}".format(msg.error()))
396+
self.share_err_cnt += 1
397+
self.incr_counter("share.error", 1)
398+
continue
399+
400+
try:
401+
record = SoakRecord.deserialize(msg.value()) # noqa unused variable
402+
except ValueError as ex:
403+
self.logger.info(
404+
"share: Failed to deserialize message in "
405+
"{} [{}] at offset {} (headers {}): {}".format(
406+
msg.topic(), msg.partition(), msg.offset(),
407+
msg.headers(), ex
408+
)
409+
)
410+
self.share_msg_err_cnt += 1
411+
self.incr_counter("share.msgerr", 1)
412+
413+
self.share_msg_cnt += 1
414+
self.incr_counter("share.msg", 1)
415+
416+
# end-to-end latency
417+
headers = dict(msg.headers())
418+
txtime = headers.get('time', None)
419+
if txtime is not None:
420+
latency = time.time() - float(txtime)
421+
self.set_gauge(
422+
"share.e2e_latency", latency,
423+
tags={"partition": "{}".format(msg.partition())}
424+
)
425+
426+
if (self.share_msg_cnt % self.disprate) == 0:
427+
self.logger.info(
428+
"share: {} messages consumed: Message {} "
429+
"[{}] at offset {} (headers {})".format(
430+
self.share_msg_cnt, msg.topic(),
431+
msg.partition(), msg.offset(), msg.headers()
432+
)
433+
)
434+
435+
# Track per-partition high-water mark for duplicate/gap detection
436+
hwkey = "{}-{}".format(msg.topic(), msg.partition())
437+
hw = hwmarks[hwkey]
438+
439+
if hw > 0:
440+
if msg.offset() <= hw:
441+
self.logger.warning(
442+
"share: Old or duplicate message {} "
443+
"[{}] at offset {} (headers {}): wanted offset > {}".format(
444+
msg.topic(), msg.partition(), msg.offset(),
445+
msg.headers(), hw
446+
)
447+
)
448+
self.share_msg_dup_cnt += (hw + 1) - msg.offset()
449+
self.incr_counter("share.msgdup", 1)
450+
elif msg.offset() > hw + 1:
451+
self.logger.warning(
452+
"share: Lost messages, now at {} "
453+
"[{}] at offset {} (headers {}): "
454+
"expected offset {}+1".format(
455+
msg.topic(), msg.partition(), msg.offset(),
456+
msg.headers(), hw
457+
)
458+
)
459+
self.share_msg_miss_cnt += msg.offset() - (hw + 1)
460+
self.incr_counter("share.missedmsg", 1)
461+
462+
hwmarks[hwkey] = msg.offset()
463+
464+
self.share_consumer.close()
465+
self.share_status()
466+
467+
def share_thread_main(self):
468+
"""Share consumer thread main function"""
469+
try:
470+
self.share_run()
471+
except KeyboardInterrupt:
472+
self.logger.info("share: aborted by user")
473+
self.run = False
474+
except Exception as ex:
475+
self.logger.fatal("share: fatal exception: {}\n{}".format(
476+
ex, traceback.print_exc()))
477+
self.run = False
478+
332479
def rtt_stats(self, d):
333480
"""Extract broker rtt statistics from the stats dict in @param d"""
334481

@@ -394,7 +541,7 @@ def create_topic(self, topic, conf):
394541
else:
395542
raise
396543

397-
def __init__(self, testid, topic, rate, conf):
544+
def __init__(self, testid, topic, rate, conf, enable_share=False):
398545
"""SoakClient constructor. conf is the client configuration"""
399546
self.topic = topic
400547
self.rate = rate
@@ -446,50 +593,90 @@ def filter_config(conf, filter_out, strip_prefix):
446593
return out
447594

448595
# Create topic (might already exist)
449-
aconf = filter_config(conf, ["consumer.", "producer."], "admin.")
596+
aconf = filter_config(conf, ["consumer.", "producer.", "share."], "admin.")
450597
aconf['client.id'] = self.testid
451598
self.create_topic(self.topic, aconf)
452599

453600
#
454-
# Create Producer and Consumer, each running in its own thread.
601+
# Create Producer and Consumer/ShareConsumer, each in its own thread.
455602
#
456603
conf['stats_cb'] = self.stats_cb
457604
conf['statistics.interval.ms'] = 120000
458605

459606
# Producer
460-
pconf = filter_config(conf, ["consumer.", "admin."], "producer.")
607+
pconf = filter_config(conf, ["consumer.", "admin.", "share."], "producer.")
461608
pconf['error_cb'] = self.producer_error_cb
462609
pconf['client.id'] = self.testid
463610
self.producer = Producer(pconf)
464611

465-
# Consumer
466-
cconf = filter_config(conf, ["producer.", "admin."], "consumer.")
467-
cconf['error_cb'] = self.consumer_error_cb
468-
cconf['on_commit'] = self.consumer_commit_cb
469-
self.logger.info("consumer: using group.id {}".format(cconf['group.id']))
470-
cconf['client.id'] = self.testid
471-
self.consumer = Consumer(cconf)
472-
473-
# Initialize some counters to zero to make them appear in the metrics
474-
self.incr_counter("consumer.error", 0)
475-
self.incr_counter("consumer.msgdup", 0)
476612
self.incr_counter("producer.errorcb", 0)
477613

478614
# Create and start producer thread
479615
self.producer_thread = threading.Thread(target=self.producer_thread_main)
480616
self.producer_thread.start()
481617

482-
# Create and start consumer thread
483-
self.consumer_thread = threading.Thread(target=self.consumer_thread_main)
484-
self.consumer_thread.start()
618+
self.consumer = None
619+
self.consumer_thread = None
620+
self.share_consumer = None
621+
self.share_thread = None
622+
623+
if enable_share:
624+
if not HAS_SHARE_CONSUMER:
625+
raise RuntimeError(
626+
"ShareConsumer requested but not available in this "
627+
"confluent_kafka build."
628+
)
629+
630+
sconf = filter_config(conf, ["consumer.", "producer.", "admin."], "share.")
631+
sconf['error_cb'] = self.share_error_cb
632+
sconf['stats_cb'] = self.stats_cb
633+
sconf['statistics.interval.ms'] = 120000
634+
sconf['client.id'] = self.testid
635+
636+
# Always set a share-specific group.id.
637+
sconf['group.id'] = 'soakclient-share-{}-{}-{}'.format(
638+
self.hostname, version(), sys.version.split(' ')[0]
639+
)
640+
641+
self.logger.info("share: using group.id {}".format(sconf['group.id']))
642+
self.share_consumer = ShareConsumer(sconf)
643+
644+
# Initialize counters to zero
645+
self.incr_counter("share.error", 0)
646+
self.incr_counter("share.msgdup", 0)
647+
self.incr_counter("share.msgerr", 0)
648+
self.incr_counter("share.errorcb", 0)
649+
650+
# Create and start share consumer thread
651+
self.share_thread = threading.Thread(target=self.share_thread_main)
652+
self.share_thread.start()
653+
else:
654+
# Consumer
655+
cconf = filter_config(conf, ["producer.", "admin.", "share."], "consumer.")
656+
cconf['error_cb'] = self.consumer_error_cb
657+
cconf['on_commit'] = self.consumer_commit_cb
658+
self.logger.info("consumer: using group.id {}".format(cconf['group.id']))
659+
cconf['client.id'] = self.testid
660+
self.consumer = Consumer(cconf)
661+
662+
# Initialize some counters to zero to make them appear in the metrics
663+
self.incr_counter("consumer.error", 0)
664+
self.incr_counter("consumer.msgdup", 0)
665+
666+
# Create and start consumer thread
667+
self.consumer_thread = threading.Thread(target=self.consumer_thread_main)
668+
self.consumer_thread.start()
485669

486670
def terminate(self):
487-
"""Terminate Producer and Consumer"""
671+
"""Terminate Producer and Consumer/Share Consumer"""
488672
soak.logger.info("Terminating (ran for {}s)".format(time.time() - self.start_time))
489673
self.run = False
490674
# Wait for background threads to finish.
491675
self.producer_thread.join()
492-
self.consumer_thread.join()
676+
if self.share_thread is not None:
677+
self.share_thread.join()
678+
else:
679+
self.consumer_thread.join()
493680

494681
# Final resource usage
495682
soak.get_rusage()
@@ -576,6 +763,10 @@ def get_rusage(self):
576763
parser.add_argument(
577764
'-f', dest='conffile', type=argparse.FileType('r'), help='Configuration file (configprop=value format)'
578765
)
766+
parser.add_argument(
767+
'--share', dest='share', action='store_true', default=False,
768+
help='Enable share consumer thread'
769+
)
579770

580771
args = parser.parse_args()
581772

@@ -606,7 +797,7 @@ def get_rusage(self):
606797
conf['enable.partition.eof'] = False
607798

608799
# Create SoakClient
609-
soak = SoakClient(args.testid, args.topic, args.rate, conf)
800+
soak = SoakClient(args.testid, args.topic, args.rate, conf, enable_share=args.share)
610801

611802
# Get initial resource usage
612803
soak.get_rusage()

0 commit comments

Comments
 (0)