Skip to content

Commit 72a85f5

Browse files
authored
set retry time interval configurable, increase the http client read timeout (#6320)
* set retry time interval configurable and increase the http client read timeout Signed-off-by: Xun Zhang <xunzh@amazon.com> * address comments Signed-off-by: Xun Zhang <xunzh@amazon.com> --------- Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent c35400b commit 72a85f5

5 files changed

Lines changed: 162 additions & 3 deletions

File tree

data-prepper-plugins/ml-inference-processor/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation 'org.json:json'
1414
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
1515
implementation 'org.projectlombok:lombok:1.18.22'
16+
implementation 'org.hibernate.validator:hibernate-validator:8.0.2.Final'
1617
annotationProcessor 'org.projectlombok:lombok:1.18.20'
1718
implementation 'software.amazon.awssdk:s3'
1819
testImplementation project(':data-prepper-test:test-event')

data-prepper-plugins/ml-inference-processor/src/main/java/org/opensearch/dataprepper/plugins/ml_inference/processor/MLProcessorConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import com.fasterxml.jackson.annotation.JsonProperty;
1010
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
1111
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
12+
import org.hibernate.validator.constraints.time.DurationMax;
13+
import org.hibernate.validator.constraints.time.DurationMin;
1214
import jakarta.validation.Valid;
1315
import jakarta.validation.constraints.NotNull;
1416
import lombok.Getter;
@@ -33,6 +35,7 @@
3335
public class MLProcessorConfig {
3436
private static final int DEFAULT_MAX_BATCH_SIZE = 100;
3537
public static final Duration DEFAULT_RETRY_WINDOW = Duration.ofMinutes(10);
38+
public static final int DEFAULT_RETRY_INTERVAL_SECONDS = 60; // default retry interval is 1 minute
3639

3740
@JsonProperty("aws")
3841
@NotNull
@@ -89,6 +92,19 @@ public class MLProcessorConfig {
8992
@JsonProperty("retry_time_window")
9093
private Duration retryTimeWindow = DEFAULT_RETRY_WINDOW;
9194

95+
@JsonPropertyDescription("The retry interval for throttled records. " +
96+
"Supports ISO_8601 duration notation (\"PT1M\", \"PT30S\") and simple notation (\"60s\", \"2m\"). " +
97+
"Valid range: 3 seconds to 5 minutes. Default is 60 seconds.")
98+
@ExampleValues({
99+
@ExampleValues.Example(value = "\"PT1M\"", description = "ISO-8601 format for 1 minute"),
100+
@ExampleValues.Example(value = "\"60s\"", description = "Simple format for 60 seconds"),
101+
@ExampleValues.Example(value = "\"2m\"", description = "Simple format for 2 minutes")
102+
})
103+
@JsonProperty("retry_interval")
104+
@DurationMin(seconds = 3)
105+
@DurationMax(seconds = 300)
106+
private Duration retryInterval = Duration.ofSeconds(DEFAULT_RETRY_INTERVAL_SECONDS);
107+
92108
@JsonProperty("dlq")
93109
private PluginModel dlq;
94110

data-prepper-plugins/ml-inference-processor/src/main/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreator.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class BedrockBatchJobCreator extends AbstractBatchJobCreator {
3838
@Getter
3939
private final ConcurrentLinkedQueue<RetryRecord> throttledRecords = new ConcurrentLinkedQueue<>();
4040
private final Lock processingLock;
41+
private final long retryIntervalMillis;
42+
private volatile long lastRetryTimestamp;
4143

4244
private static final String BEDROCK_PAYLOAD_TEMPLATE = "{\"parameters\": {\"inputDataConfig\": {\"s3InputDataConfig\": {\"s3Uri\": \"s3://\"}}," +
4345
"\"jobName\": \"\", \"outputDataConfig\": {\"s3OutputDataConfig\": {\"s3Uri\": \"s3://\"}}}}";
@@ -46,6 +48,8 @@ public BedrockBatchJobCreator(final MLProcessorConfig mlProcessorConfig, final A
4648
super(mlProcessorConfig, awsCredentialsSupplier, pluginMetrics, dlqPushHandler);
4749
this.awsCredentialsSupplier = awsCredentialsSupplier;
4850
this.processingLock = new ReentrantLock();
51+
this.retryIntervalMillis = mlProcessorConfig.getRetryInterval().toMillis();
52+
this.lastRetryTimestamp = System.currentTimeMillis();
4953
}
5054

5155
@Override
@@ -150,16 +154,39 @@ private void processRecord(Record<Event> record, List<Record<Event>> resultRecor
150154

151155
@Override
152156
public void addProcessedBatchRecordsToResults(List<Record<Event>> resultRecords) {
157+
if (throttledRecords.isEmpty()) {
158+
return;
159+
}
160+
161+
long currentTime = System.currentTimeMillis();
162+
long timeSinceLastRetry = currentTime - lastRetryTimestamp;
163+
164+
if (timeSinceLastRetry < retryIntervalMillis) {
165+
LOG.debug("Skipping retry processing. Only {}ms passed since last retry (need {}ms)",
166+
timeSinceLastRetry, retryIntervalMillis);
167+
return;
168+
}
169+
153170
if (!processingLock.tryLock()) {
154171
LOG.debug("Another thread is currently processing results, skipping this attempt");
155172
return;
156173
}
157174

158175
try {
176+
if (throttledRecords.isEmpty()) {
177+
LOG.debug("Queue became empty after acquiring lock, skipping timestamp update");
178+
return;
179+
}
180+
181+
LOG.info(NOISY, "Processing {} throttled records ({}s since last retry)",
182+
throttledRecords.size(), timeSinceLastRetry / 1000);
183+
159184
processThrottledRecords(resultRecords);
160185
} catch (Exception e) {
161186
LOG.error("Error in batch processing throttled records. Error: {}", e.getMessage());
162187
} finally {
188+
// Always update timestamp after a retry attempt (success or failure)
189+
lastRetryTimestamp = currentTime;
163190
processingLock.unlock();
164191
}
165192
}

data-prepper-plugins/ml-inference-processor/src/main/java/org/opensearch/dataprepper/plugins/ml_inference/processor/util/SdkHttpClientExecutor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,26 @@
1717
public class SdkHttpClientExecutor implements HttpClientExecutor {
1818
private final SdkHttpClient httpClient;
1919

20+
// Configuration constants
21+
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
22+
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30);
23+
private static final int DEFAULT_MAX_CONNECTIONS = 10;
24+
2025
public SdkHttpClientExecutor() {
26+
this(DEFAULT_CONNECTION_TIMEOUT, DEFAULT_READ_TIMEOUT, DEFAULT_MAX_CONNECTIONS);
27+
}
28+
29+
/**
30+
* Constructor with configurable timeouts for flexibility
31+
* @param connectionTimeout timeout for establishing connection
32+
* @param readTimeout timeout for reading response data
33+
* @param maxConnections maximum number of connections
34+
*/
35+
public SdkHttpClientExecutor(Duration connectionTimeout, Duration readTimeout, int maxConnections) {
2136
AttributeMap attributeMap = AttributeMap.builder()
22-
.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, Duration.ofMillis(30000))
23-
.put(SdkHttpConfigurationOption.READ_TIMEOUT, Duration.ofMillis(3000))
24-
.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, 10)
37+
.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout)
38+
.put(SdkHttpConfigurationOption.READ_TIMEOUT, readTimeout)
39+
.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConnections)
2540
.build();
2641
this.httpClient = new DefaultSdkHttpClientBuilder().buildWithDefaults(attributeMap);
2742
}

data-prepper-plugins/ml-inference-processor/src/test/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreatorTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.dataprepper.plugins.ml_inference.processor.dlq.DlqPushHandler;
2222
import org.opensearch.dataprepper.plugins.ml_inference.processor.exception.MLBatchJobException;
2323

24+
import java.time.Duration;
2425
import java.util.ArrayList;
2526
import java.util.Arrays;
2627
import java.util.List;
@@ -36,6 +37,7 @@
3637
import static org.mockito.Mockito.times;
3738
import static org.mockito.Mockito.verify;
3839
import static org.mockito.Mockito.when;
40+
import static org.opensearch.dataprepper.plugins.ml_inference.processor.MLProcessorConfig.DEFAULT_RETRY_INTERVAL_SECONDS;
3941
import static org.opensearch.dataprepper.plugins.ml_inference.processor.MLProcessorConfig.DEFAULT_RETRY_WINDOW;
4042
import static org.opensearch.dataprepper.plugins.ml_inference.processor.common.AbstractBatchJobCreator.NUMBER_OF_FAILED_BATCH_JOBS_CREATION;
4143
import static org.opensearch.dataprepper.plugins.ml_inference.processor.common.AbstractBatchJobCreator.NUMBER_OF_RECORDS_FAILED_IN_BATCH_JOB;
@@ -280,4 +282,102 @@ void testCreateMLBatchJob_ThrottledThenSuccess() {
280282
verify(bedrockBatchJobCreator, times(1)).incrementSuccessCounter();
281283
}
282284
}
285+
286+
@Test
287+
void testRetryInterval_SkipsRetryBeforeIntervalElapses() throws InterruptedException {
288+
// Mock retry interval BEFORE creating the object
289+
when(mlProcessorConfig.getRetryInterval()).thenReturn(Duration.ofSeconds(DEFAULT_RETRY_INTERVAL_SECONDS)); // 1 second for testing
290+
291+
// Create object with mocked config
292+
bedrockBatchJobCreator = spy(new BedrockBatchJobCreator(mlProcessorConfig, awsCredentialsSupplier, pluginMetrics, dlqPushHandler));
293+
294+
Event event = mock(Event.class);
295+
Record<Event> record = new Record<>(event);
296+
297+
when(event.getJsonNode()).thenReturn(OBJECT_MAPPER.createObjectNode()
298+
.put("bucket", "test-bucket")
299+
.put("key", "input.jsonl"));
300+
301+
try (MockedStatic<RetryUtil> mockedStatic = mockStatic(RetryUtil.class)) {
302+
// First attempt - gets throttled
303+
mockedStatic.when(() -> RetryUtil.retryWithBackoffWithResult(any(Runnable.class), any()))
304+
.thenReturn(new RetryUtil.RetryResult(false, new MLBatchJobException(429, "throttled"), 1));
305+
306+
List<Record<Event>> resultRecords = new ArrayList<>();
307+
308+
// First attempt - gets throttled
309+
bedrockBatchJobCreator.createMLBatchJob(Arrays.asList(record), resultRecords);
310+
assertEquals(1, bedrockBatchJobCreator.getThrottledRecords().size());
311+
312+
// Try to process immediately (should skip due to retry interval)
313+
bedrockBatchJobCreator.addProcessedBatchRecordsToResults(resultRecords);
314+
315+
// Verify record is still in queue (not processed due to retry interval)
316+
assertEquals(1, bedrockBatchJobCreator.getThrottledRecords().size());
317+
BedrockBatchJobCreator.RetryRecord throttledRecord = bedrockBatchJobCreator.getThrottledRecords().peek();
318+
assertNotNull(throttledRecord);
319+
assertEquals(0, throttledRecord.getRetryCount()); // Not incremented because retry was skipped
320+
assertTrue(resultRecords.isEmpty());
321+
}
322+
}
323+
324+
@Test
325+
void testRetryInterval_ProcessesAfterIntervalElapses() throws InterruptedException {
326+
// Mock retry interval BEFORE creating the object
327+
when(mlProcessorConfig.getRetryInterval()).thenReturn(Duration.ofSeconds(1)); // 1 second for testing
328+
329+
// Create object with mocked config
330+
bedrockBatchJobCreator = spy(new BedrockBatchJobCreator(mlProcessorConfig, awsCredentialsSupplier, pluginMetrics, dlqPushHandler));
331+
332+
Event event = mock(Event.class);
333+
Record<Event> record = new Record<>(event);
334+
335+
when(event.getJsonNode()).thenReturn(OBJECT_MAPPER.createObjectNode()
336+
.put("bucket", "test-bucket")
337+
.put("key", "input.jsonl"));
338+
339+
try (MockedStatic<RetryUtil> mockedStatic = mockStatic(RetryUtil.class)) {
340+
// First throttled, then success
341+
mockedStatic.when(() -> RetryUtil.retryWithBackoffWithResult(any(Runnable.class), any()))
342+
.thenReturn(new RetryUtil.RetryResult(false, new MLBatchJobException(429, "throttled"), 1))
343+
.thenReturn(new RetryUtil.RetryResult(true, null, 1));
344+
345+
List<Record<Event>> resultRecords = new ArrayList<>();
346+
347+
// First attempt - gets throttled
348+
bedrockBatchJobCreator.createMLBatchJob(Arrays.asList(record), resultRecords);
349+
assertEquals(1, bedrockBatchJobCreator.getThrottledRecords().size());
350+
351+
// Wait for retry interval to elapse
352+
Thread.sleep(1100); // Wait 1.1 seconds
353+
354+
// Now retry should proceed
355+
bedrockBatchJobCreator.addProcessedBatchRecordsToResults(resultRecords);
356+
357+
// Verify record was processed successfully
358+
assertTrue(bedrockBatchJobCreator.getThrottledRecords().isEmpty());
359+
assertEquals(1, resultRecords.size());
360+
verify(bedrockBatchJobCreator, times(1)).incrementSuccessCounter();
361+
}
362+
}
363+
364+
@Test
365+
void testRetryInterval_EmptyQueueDoesNotUpdateTimestamp() throws Exception {
366+
List<Record<Event>> resultRecords = new ArrayList<>();
367+
368+
// Try to process with empty queue
369+
long timestampBefore = getLastRetryTimestamp(bedrockBatchJobCreator);
370+
bedrockBatchJobCreator.addProcessedBatchRecordsToResults(resultRecords);
371+
long timestampAfter = getLastRetryTimestamp(bedrockBatchJobCreator);
372+
373+
// Verify timestamp was not updated (queue was empty)
374+
assertEquals(timestampBefore, timestampAfter);
375+
}
376+
377+
// Helper method to access private lastRetryTimestamp field using reflection
378+
private long getLastRetryTimestamp(BedrockBatchJobCreator creator) throws Exception {
379+
java.lang.reflect.Field field = BedrockBatchJobCreator.class.getDeclaredField("lastRetryTimestamp");
380+
field.setAccessible(true);
381+
return (long) field.get(creator);
382+
}
283383
}

0 commit comments

Comments
 (0)