1717import tech .illuin .pipeline .step .result .ResultView ;
1818
1919import java .util .Map ;
20+ import java .util .concurrent .ConcurrentHashMap ;
2021
2122import static tech .illuin .pipeline .metering .MeterRegistryKey .fill ;
2223
@@ -28,6 +29,7 @@ public class RetryStep<T extends Indexable, I> implements Step<T, I>
2829 private final Step <T , I > step ;
2930 private final Retry retry ;
3031 private final RetryStepHandler handler ;
32+ private final Map <String , Integer > retryCounterContainer ;
3133
3234 public static final String RUN_COUNT_KEY = "pipeline.step.retry.run_count" ;
3335 public static final String RETRY_COUNT_KEY = "pipeline.step.retry.retry_count" ;
@@ -43,6 +45,7 @@ public RetryStep(Step<T, I> step, Retry retry, RetryStepHandler handler)
4345 this .step = step ;
4446 this .retry = retry ;
4547 this .handler = handler ;
48+ this .retryCounterContainer = new ConcurrentHashMap <>();
4649 }
4750
4851 @ Override
@@ -101,12 +104,13 @@ private void onSuccess(T object, I input, Object payload, ResultView view, Local
101104 context .pipelineTag ().pipeline (),
102105 context .pipelineTag ().uid (),
103106 context .componentTag ().id (),
104- this .retry . getMetrics (). getNumberOfTotalCalls ( )
107+ this .retryCounterContainer . get ( context . pipelineTag (). uid () )
105108 );
106109 this .handler .onSuccess (object , input , payload , view , context );
107110 }
108111 finally {
109112 counter (RUN_SUCCESS_KEY , context ).increment ();
113+ this .retryCounterContainer .remove (context .pipelineTag ().uid ());
110114 }
111115 }
112116
@@ -126,21 +130,26 @@ private void onError(T object, I input, Object payload, ResultView view, LocalCo
126130 }
127131 finally {
128132 counter (RUN_FAILURE_KEY , context , Tag .of ("error" , ex .getClass ().getName ())).increment ();
133+ this .retryCounterContainer .remove (context .pipelineTag ().uid ());
129134 }
130135 }
131136
132137 private void onAttempt (T object , I input , Object payload , ResultView view , LocalContext context )
133138 {
134139 try {
135- long totalCalls = this .retry .getMetrics ().getNumberOfTotalCalls ();
136- logger .trace (
137- "{}#{} retry wrapper {} - retry attempt #{} - {} left" ,
138- context .pipelineTag ().pipeline (),
139- context .pipelineTag ().uid (),
140- context .componentTag ().id (),
141- totalCalls ,
142- this .retry .getRetryConfig ().getMaxAttempts () - (totalCalls + 1 )
143- );
140+ if (!this .retryCounterContainer .containsKey (context .pipelineTag ().uid ()))
141+ this .retryCounterContainer .put (context .pipelineTag ().uid (), 0 );
142+ else {
143+ Integer retryCount = this .retryCounterContainer .merge (context .pipelineTag ().uid (), 1 , Integer ::sum );
144+ logger .trace (
145+ "{}#{} retry wrapper {} - retry attempt #{} - {} left" ,
146+ context .pipelineTag ().pipeline (),
147+ context .pipelineTag ().uid (),
148+ context .componentTag ().id (),
149+ retryCount ,
150+ this .retry .getRetryConfig ().getMaxAttempts () - retryCount
151+ );
152+ }
144153 this .handler .onRetry (object , input , payload , view , context );
145154 }
146155 finally {
0 commit comments