Skip to content

Commit 4528f4e

Browse files
authored
Add metric tracking total number of open shards, do not skip shards just because there is no record at the ending sequence number of that shard (#6260)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent f140d2e commit 4528f4e

5 files changed

Lines changed: 87 additions & 7 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public DynamoDBService(final EnhancedSourceCoordinator coordinator,
7777
s3Client = clientFactory.buildS3Client();
7878

7979
// A shard manager is responsible to retrieve the shard information from streams.
80-
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics);
80+
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics, pluginMetrics);
8181
tableConfigs = sourceConfig.getTableConfigs();
8282
executor = Executors.newFixedThreadPool(4);
8383
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.opensearch.dataprepper.plugins.source.dynamodb.leader;
22

3+
import io.micrometer.core.instrument.DistributionSummary;
4+
import org.opensearch.dataprepper.metrics.PluginMetrics;
35
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
@@ -23,6 +25,7 @@
2325
public class ShardManager {
2426

2527
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
28+
static final String TOTAL_OPEN_SHARDS = "totalOpenShards";
2629

2730
/**
2831
* Max number of shards to return in the DescribeStream API call, maximum 100.
@@ -47,14 +50,17 @@ public class ShardManager {
4750

4851
private final DynamoDbStreamsClient streamsClient;
4952
private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;
53+
private final DistributionSummary totalOpenShardCountDistributionSummary;
5054

5155

5256
public ShardManager(final DynamoDbStreamsClient streamsClient,
53-
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
57+
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
58+
final PluginMetrics pluginMetrics) {
5459
this.streamsClient = streamsClient;
5560
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
5661
streamMap = new HashMap<>();
5762
endingSequenceNumberMap = new HashMap<>();
63+
this.totalOpenShardCountDistributionSummary = pluginMetrics.summary(TOTAL_OPEN_SHARDS);
5864
}
5965

6066
/**
@@ -100,12 +106,16 @@ public List<Shard> runDiscovery(String streamArn) {
100106
});
101107

102108
if (streamInfo.getLastEvaluatedShardId() == null) {
103-
endingSequenceNumberMap = shards.stream()
109+
final List<Shard> closedShards = shards.stream()
104110
.filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() != null)
111+
.collect(Collectors.toList());
112+
endingSequenceNumberMap = closedShards.stream()
105113
.collect(Collectors.toMap(
106114
shard -> shard.shardId(),
107115
shard -> shard.sequenceNumberRange().endingSequenceNumber()
108116
));
117+
118+
totalOpenShardCountDistributionSummary.record(shards.size() - closedShards.size());
109119
}
110120
LOG.debug("New last evaluated shard ID is " + shards.get(shards.size() - 1).shardId());
111121
streamInfo.setLastEvaluatedShardId(shards.get(shards.size() - 1).shardId());

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,9 @@ private boolean shouldSkip() {
406406
if (lastShardIterator != null && !lastShardIterator.isEmpty()) {
407407
GetRecordsResponse response = callGetRecords(lastShardIterator);
408408
if (response.records().isEmpty()) {
409-
// Empty shard
410-
LOG.info("LastShardIterator is provided, but there is no Last Event Time, skip processing");
411-
return true;
409+
// There is no guarantee that the shard is empty just because there is no record at the endingSequenceNumber
410+
LOG.info("LastShardIterator is provided, but there is no Last Event Time, paginating through for documents");
411+
return false;
412412
}
413413

414414
Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
package org.opensearch.dataprepper.plugins.source.dynamodb.leader;
77

88
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.DistributionSummary;
910
import org.junit.jupiter.api.BeforeEach;
1011
import org.junit.jupiter.api.Test;
1112
import org.junit.jupiter.api.extension.ExtendWith;
1213
import org.mockito.Mock;
1314
import org.mockito.junit.jupiter.MockitoExtension;
15+
import org.opensearch.dataprepper.metrics.PluginMetrics;
1416
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
1517
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
1618
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
@@ -33,6 +35,7 @@
3335
import static org.mockito.Mockito.lenient;
3436
import static org.mockito.Mockito.verify;
3537
import static org.mockito.Mockito.when;
38+
import static org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager.TOTAL_OPEN_SHARDS;
3639

3740

3841
@ExtendWith(MockitoExtension.class)
@@ -52,6 +55,12 @@ class ShardManagerTest {
5255
@Mock
5356
private Counter streamApiInvocations;
5457

58+
@Mock
59+
private PluginMetrics pluginMetrics;
60+
61+
@Mock
62+
private DistributionSummary distributionSummary;
63+
5564
private ShardManager shardManager;
5665

5766

@@ -98,7 +107,9 @@ void setup() {
98107
.build();
99108

100109
lenient().when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(response);
101-
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics);
110+
111+
when(pluginMetrics.summary(TOTAL_OPEN_SHARDS)).thenReturn(distributionSummary);
112+
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics, pluginMetrics);
102113

103114
when(dynamoDBSourceAggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations);
104115
}
@@ -109,6 +120,8 @@ void test_getChildShardIds_should_return_child_shards() {
109120
assertThat(childShards, notNullValue());
110121
assertThat(childShards.size(), equalTo(6));
111122

123+
verify(distributionSummary).record(2);
124+
112125
List<String> childShardIds1 = shardManager.findChildShardIds(streamArn, "shardId-001");
113126
assertThat(childShardIds1, notNullValue());
114127
assertThat(childShardIds1.size(), equalTo(1));

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.junit.jupiter.api.BeforeEach;
1111
import org.junit.jupiter.api.Test;
1212
import org.junit.jupiter.api.extension.ExtendWith;
13+
import org.mockito.ArgumentCaptor;
1314
import org.mockito.Mock;
1415
import org.mockito.MockedStatic;
1516
import org.mockito.junit.jupiter.MockitoExtension;
@@ -46,6 +47,9 @@
4647
import java.util.Random;
4748
import java.util.UUID;
4849

50+
import static org.hamcrest.MatcherAssert.assertThat;
51+
import static org.hamcrest.Matchers.equalTo;
52+
import static org.hamcrest.Matchers.notNullValue;
4953
import static org.junit.jupiter.api.Assertions.assertThrows;
5054
import static org.mockito.ArgumentMatchers.any;
5155
import static org.mockito.ArgumentMatchers.anyString;
@@ -480,6 +484,59 @@ void test_shard_has_no_records_null_iterator() throws Exception {
480484

481485
}
482486
}
487+
488+
@Test
489+
void test_shard_has_no_records_null_with_last_shard_iterator_paginates_through_shard() throws Exception {
490+
final AcknowledgementSet finalAcknowledgementSet = mock(AcknowledgementSet.class);
491+
when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class)))
492+
.thenReturn(finalAcknowledgementSet);
493+
494+
final String lastShardIterator = UUID.randomUUID().toString();
495+
496+
// Set up response with null nextShardIterator to trigger end of shard
497+
GetRecordsResponse response = GetRecordsResponse.builder()
498+
.records(List.of())
499+
.nextShardIterator(null)
500+
.build();
501+
final ArgumentCaptor<GetRecordsRequest> getRecordsRequest = ArgumentCaptor.forClass(GetRecordsRequest.class);
502+
when(dynamoDbStreamsClient.getRecords(getRecordsRequest.capture())).thenReturn(response);
503+
504+
try (MockedStatic<ShardConsumer> shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> {
505+
if (invocation.getMethod().getName().equals("stopAll")) {
506+
return null;
507+
} else if (invocation.getMethod().getName().equals("shouldStop")) {
508+
return false;
509+
}
510+
return invocation.callRealMethod();
511+
})) {
512+
ShardConsumer shardConsumer;
513+
try (final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
514+
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
515+
shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig)
516+
.shardIterator(shardIterator)
517+
.lastShardIterator(lastShardIterator)
518+
.shardAcknowledgementManager(shardAcknowledgementManager)
519+
.streamPartition(streamPartition)
520+
.tableInfo(tableInfo)
521+
.startTime(null)
522+
.waitForExport(false)
523+
.build();
524+
}
525+
526+
shardConsumer.run();
527+
528+
// Verify acknowledgment set created for records with shardIterator == null (true)
529+
verify(shardAcknowledgementManager).createAcknowledgmentSet(eq(streamPartition), eq(END_OF_SHARD), eq(true));
530+
// Verify final acknowledgment set created and completed when shardIterator is null
531+
verify(finalAcknowledgementSet).complete();
532+
533+
final List<GetRecordsRequest> requestWithLastShardIterator = getRecordsRequest.getAllValues();
534+
assertThat(requestWithLastShardIterator, notNullValue());
535+
assertThat(requestWithLastShardIterator.size(), equalTo(2));
536+
assertThat(requestWithLastShardIterator.get(0).shardIterator(), equalTo(lastShardIterator));
537+
assertThat(requestWithLastShardIterator.get(1).shardIterator(), equalTo(shardIterator));
538+
}
539+
}
483540
private List<Record> buildRecords(int count) {
484541
List<Record> records = new ArrayList<>();
485542
for (int i = 0; i < count; i++) {

0 commit comments

Comments
 (0)