22
33import com .danubemessaging .client .errors .DanubeClientException ;
44import com .danubemessaging .client .internal .consumer .TopicConsumer ;
5+ import com .danubemessaging .client .internal .retry .RetryManager ;
56import com .danubemessaging .client .model .StreamMessage ;
67import java .util .ArrayList ;
78import java .util .List ;
@@ -59,6 +60,10 @@ public synchronized void subscribe() {
5960 List <String > partitions = client .lookupService ().topicPartitions (client .serviceUri (), options .topic ());
6061 List <String > targets = partitions .isEmpty () ? List .of (options .topic ()) : partitions ;
6162
63+ RetryManager retryManager = hasCustomRetryOptions ()
64+ ? new RetryManager (options .maxRetries (), options .baseBackoffMs (), options .maxBackoffMs ())
65+ : client .retryManager ();
66+
6267 List <TopicConsumer > createdConsumers = new ArrayList <>(targets .size ());
6368 try {
6469 for (int i = 0 ; i < targets .size (); i ++) {
@@ -73,7 +78,7 @@ public synchronized void subscribe() {
7378 client .lookupService (),
7479 client .authService (),
7580 client .healthCheckService (),
76- client . retryManager () ,
81+ retryManager ,
7782 options ,
7883 partitionTopic ,
7984 partitionConsumerName );
@@ -194,6 +199,10 @@ private void notifyFatalReceiveError(TopicConsumer topicConsumer, RuntimeExcepti
194199 }
195200 }
196201
202+ private boolean hasCustomRetryOptions () {
203+ return options .maxRetries () > 0 || options .baseBackoffMs () > 0 || options .maxBackoffMs () > 0 ;
204+ }
205+
197206 private void ensureOpen () {
198207 if (lifecycleState .get () == LifecycleState .CLOSED ) {
199208 throw new DanubeClientException ("Consumer is closed" );
0 commit comments