Skip to content

Commit 7d03a7f

Browse files
Refactor explicit mode handling in ShareConsumer to alternate between sync and async commits
1 parent 497cc4d commit 7d03a7f

1 file changed

Lines changed: 24 additions & 34 deletions

File tree

tests/soak/soakclient.py

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import json
3030
import logging
3131
import os
32+
import random
3233
import resource
3334
import sys
3435
import threading
@@ -371,12 +372,6 @@ def share_run(self):
371372
# progress sequentially per partition. Skipped in Explict mode.
372373
hwmarks = defaultdict(int)
373374

374-
# In explicit mode, commit every N messages and alternate between
375-
# commit_async and commit_sync.
376-
COMMIT_EVERY_MSGS = 1000
377-
msgs_since_commit = 0
378-
commit_use_sync = False
379-
380375
self.logger.info("share: running in mode={}".format(self.share_mode))
381376

382377
next_status = time.time() + self.disprate
@@ -469,45 +464,40 @@ def share_run(self):
469464

470465
hwmarks[hwkey] = msg.offset()
471466

472-
# Explicit mode: ack each message with ACCEPT and flush acks
473-
# every N messages via a standalone ShareAcknowledge RPC,
474-
# alternating commit_async and commit_sync.
467+
# Explicit mode: ack each message with ACCEPT. Commit fires
468+
# once per batch after the for-msg loop below.
475469
if self.share_mode == 'explicit':
476470
try:
477471
self.share_consumer.acknowledge(msg)
478-
msgs_since_commit += 1
479472
except KafkaException as ex:
480473
self.logger.error(
481474
"share: acknowledge failed: {}".format(ex))
482475
self.share_err_cnt += 1
483476
self.incr_counter("consumer.error", 1)
484477

485-
if msgs_since_commit >= COMMIT_EVERY_MSGS:
486-
try:
487-
if commit_use_sync:
488-
result = self.share_consumer.commit_sync(
489-
timeout=10.0)
490-
partition_errs = sum(
491-
1 for err in result.values()
492-
if err is not None
493-
)
494-
if partition_errs > 0:
495-
self.logger.warning(
496-
"share: commit_sync had {} partition "
497-
"error(s)".format(partition_errs)
498-
)
499-
self.share_err_cnt += 1
500-
self.incr_counter("consumer.error", 1)
501-
else:
502-
self.share_consumer.commit_async()
503-
except KafkaException as ex:
504-
self.logger.error(
505-
"share: commit_{} exception: {}".format(
506-
"sync" if commit_use_sync else "async", ex))
478+
if self.share_mode == 'explicit':
479+
use_sync = random.random() < 0.5
480+
try:
481+
if use_sync:
482+
result = self.share_consumer.commit_sync(timeout=10.0)
483+
partition_errs = sum(
484+
1 for err in result.values() if err is not None
485+
)
486+
if partition_errs > 0:
487+
self.logger.warning(
488+
"share: commit_sync had {} partition error(s)"
489+
.format(partition_errs)
490+
)
507491
self.share_err_cnt += 1
508492
self.incr_counter("consumer.error", 1)
509-
commit_use_sync = not commit_use_sync
510-
msgs_since_commit = 0
493+
else:
494+
self.share_consumer.commit_async()
495+
except KafkaException as ex:
496+
self.logger.error(
497+
"share: commit_{} exception: {}".format(
498+
"sync" if use_sync else "async", ex))
499+
self.share_err_cnt += 1
500+
self.incr_counter("consumer.error", 1)
511501

512502
self.share_consumer.close()
513503
self.share_status()

0 commit comments

Comments
 (0)