Skip to content

Commit 497cc4d

Browse files
Add explicit mode support for ShareConsumer with alternating commit strategy
1 parent 99a12b1 commit 497cc4d

2 files changed

Lines changed: 75 additions & 4 deletions

File tree

tests/soak/run.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@ if [[ "${SHARE:-}" == "true" ]]; then
2323
share_flag="--share"
2424
fi
2525

26+
explicit_flag=""
27+
if [[ "${EXPLICIT:-}" == "true" ]]; then
28+
explicit_flag="--explicit"
29+
fi
30+
2631
echo "Starting soak client using topic $topic. Logging to $logfile."
2732
set +x
2833
while [ "$run" = true ]; do
2934
# Ignore SIGINT in children (inherited)
3035
trap "" SIGINT
31-
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 $share_flag |& tee /dev/tty | bzip2 > $logfile &
36+
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 $share_flag $explicit_flag |& tee /dev/tty | bzip2 > $logfile &
3237
PID=$!
3338
terminate_last() {
3439
# List children of $PID only

tests/soak/soakclient.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,17 @@ def share_run(self):
368368

369369
# Track highest offset seen per partition for duplicate/gap detection.
370370
# With implicit ack and a single share consumer, offsets should
371-
# progress sequentially per partition.
371+
# progress sequentially per partition. Skipped in Explict mode.
372372
hwmarks = defaultdict(int)
373373

374+
# In explicit mode, commit every N messages and alternate between
375+
# commit_async and commit_sync.
376+
COMMIT_EVERY_MSGS = 1000
377+
msgs_since_commit = 0
378+
commit_use_sync = False
379+
380+
self.logger.info("share: running in mode={}".format(self.share_mode))
381+
374382
next_status = time.time() + self.disprate
375383

376384
while self.run:
@@ -461,6 +469,46 @@ def share_run(self):
461469

462470
hwmarks[hwkey] = msg.offset()
463471

472+
# Explicit mode: ack each message with ACCEPT and flush acks
473+
# every N messages via a standalone ShareAcknowledge RPC,
474+
# alternating commit_async and commit_sync.
475+
if self.share_mode == 'explicit':
476+
try:
477+
self.share_consumer.acknowledge(msg)
478+
msgs_since_commit += 1
479+
except KafkaException as ex:
480+
self.logger.error(
481+
"share: acknowledge failed: {}".format(ex))
482+
self.share_err_cnt += 1
483+
self.incr_counter("consumer.error", 1)
484+
485+
if msgs_since_commit >= COMMIT_EVERY_MSGS:
486+
try:
487+
if commit_use_sync:
488+
result = self.share_consumer.commit_sync(
489+
timeout=10.0)
490+
partition_errs = sum(
491+
1 for err in result.values()
492+
if err is not None
493+
)
494+
if partition_errs > 0:
495+
self.logger.warning(
496+
"share: commit_sync had {} partition "
497+
"error(s)".format(partition_errs)
498+
)
499+
self.share_err_cnt += 1
500+
self.incr_counter("consumer.error", 1)
501+
else:
502+
self.share_consumer.commit_async()
503+
except KafkaException as ex:
504+
self.logger.error(
505+
"share: commit_{} exception: {}".format(
506+
"sync" if commit_use_sync else "async", ex))
507+
self.share_err_cnt += 1
508+
self.incr_counter("consumer.error", 1)
509+
commit_use_sync = not commit_use_sync
510+
msgs_since_commit = 0
511+
464512
self.share_consumer.close()
465513
self.share_status()
466514

@@ -541,14 +589,16 @@ def create_topic(self, topic, conf):
541589
else:
542590
raise
543591

544-
def __init__(self, testid, topic, rate, conf, enable_share=False):
592+
def __init__(self, testid, topic, rate, conf, enable_share=False,
593+
share_mode='implicit'):
545594
"""SoakClient constructor. conf is the client configuration"""
546595
self.topic = topic
547596
self.rate = rate
548597
self.disprate = int(rate * 10)
549598
self.run = True
550599
self.stats_cnt = {'producer': 0, 'consumer': 0}
551600
self.start_time = time.time()
601+
self.share_mode = share_mode
552602

553603
# OTEL instruments
554604
self.counters = {}
@@ -637,6 +687,12 @@ def filter_config(conf, filter_out, strip_prefix):
637687
sconf.pop('stats_cb', None)
638688
sconf['client.id'] = self.testid
639689

690+
# In explicit mode, switch the consumer's ack policy. Default is
691+
# implicit (next poll auto-acks); without this flip, calls to
692+
# acknowledge() return _STATE because the message is already acked.
693+
if self.share_mode == 'explicit':
694+
sconf['share.acknowledgement.mode'] = 'explicit'
695+
640696
# Always set a share-specific group.id.
641697
sconf['group.id'] = 'soakclient-share-{}-{}-{}'.format(
642698
self.hostname, version(), sys.version.split(' ')[0]
@@ -771,9 +827,18 @@ def get_rusage(self):
771827
'--share', dest='share', action='store_true', default=False,
772828
help='Enable share consumer thread'
773829
)
830+
parser.add_argument(
831+
'--explicit', dest='explicit', action='store_true', default=False,
832+
help='Share consumer: per-msg ACCEPT + alternating commit_async/sync (requires --share)'
833+
)
774834

775835
args = parser.parse_args()
776836

837+
share_mode = 'explicit' if args.explicit else 'implicit'
838+
839+
if share_mode == 'explicit' and not args.share:
840+
parser.error('--explicit requires --share')
841+
777842
conf = dict()
778843
if args.conffile is not None:
779844
# Parse client configuration file.
@@ -801,7 +866,8 @@ def get_rusage(self):
801866
conf['enable.partition.eof'] = False
802867

803868
# Create SoakClient
804-
soak = SoakClient(args.testid, args.topic, args.rate, conf, enable_share=args.share)
869+
soak = SoakClient(args.testid, args.topic, args.rate, conf,
870+
enable_share=args.share, share_mode=share_mode)
805871

806872
# Get initial resource usage
807873
soak.get_rusage()

0 commit comments

Comments
 (0)