Skip to content

Commit 5a3df3d

Browse files
committed
Revert "Ensure shards are completed when last getRecords call has no records and no shardIterator (opensearch-project#5958)"
This reverts commit 13049d3. Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent b8ffadb commit 5a3df3d

2 files changed

Lines changed: 1 addition & 100 deletions

File tree

  • data-prepper-plugins/dynamodb-source/src

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,10 @@ public void run() {
252252
String sequenceNumber = "";
253253
int interval;
254254
List<software.amazon.awssdk.services.dynamodb.model.Record> records;
255-
boolean createdFinalAcknowledgmentSetForShard = false;
255+
256256
while (!shouldStop) {
257257
if (shardIterator == null) {
258258
// End of Shard
259-
if (shardAcknowledgementManager != null && !createdFinalAcknowledgmentSetForShard) {
260-
final AcknowledgementSet finalAcknowledgmentSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, true);
261-
finalAcknowledgmentSet.complete();
262-
}
263259
LOG.debug("Reached end of shard");
264260
break;
265261
}
@@ -291,9 +287,6 @@ public void run() {
291287
AcknowledgementSet acknowledgementSet = null;
292288
if (shardAcknowledgementManager != null) {
293289
acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null);
294-
if (shardIterator == null) {
295-
createdFinalAcknowledgmentSetForShard = true;
296-
}
297290
}
298291

299292
records = response.records().stream()

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import static org.junit.jupiter.api.Assertions.assertThrows;
5050
import static org.mockito.ArgumentMatchers.any;
5151
import static org.mockito.ArgumentMatchers.anyString;
52-
import static org.mockito.ArgumentMatchers.eq;
5352
import static org.mockito.BDDMockito.given;
5453
import static org.mockito.Mockito.lenient;
5554
import static org.mockito.Mockito.mock;
@@ -348,9 +347,6 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() {
348347

349348
@Test
350349
void test_run_shardConsumer_calls_startUpdatingOwnershipForShard() throws Exception {
351-
final AcknowledgementSet finalAcknowledgementSet = mock(AcknowledgementSet.class);
352-
when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class)))
353-
.thenReturn(finalAcknowledgementSet);
354350
try (final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
355351
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
356352
ShardConsumer shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig)
@@ -368,94 +364,6 @@ void test_run_shardConsumer_calls_startUpdatingOwnershipForShard() throws Except
368364
verify(shardAcknowledgementManager).startUpdatingOwnershipForShard(streamPartition);
369365
}
370366

371-
@Test
372-
void test_shard_has_records_null_iterator() throws Exception {
373-
final AcknowledgementSet finalAcknowledgementSet = mock(AcknowledgementSet.class);
374-
when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class)))
375-
.thenReturn(finalAcknowledgementSet);
376-
377-
// Set up response with null nextShardIterator to trigger end of shard
378-
GetRecordsResponse response = GetRecordsResponse.builder()
379-
.records(buildRecords(1))
380-
.nextShardIterator(null)
381-
.build();
382-
when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response);
383-
384-
try (MockedStatic<ShardConsumer> shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> {
385-
if (invocation.getMethod().getName().equals("stopAll")) {
386-
return null;
387-
} else if (invocation.getMethod().getName().equals("shouldStop")) {
388-
return false;
389-
}
390-
return invocation.callRealMethod();
391-
})) {
392-
ShardConsumer shardConsumer;
393-
try (final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
394-
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
395-
shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig)
396-
.shardIterator(shardIterator)
397-
.shardAcknowledgementManager(shardAcknowledgementManager)
398-
.streamPartition(streamPartition)
399-
.tableInfo(tableInfo)
400-
.startTime(null)
401-
.waitForExport(false)
402-
.build();
403-
}
404-
405-
shardConsumer.run();
406-
407-
// Verify acknowledgment set created for records with shardIterator == null (true)
408-
verify(shardAcknowledgementManager).createAcknowledgmentSet(eq(streamPartition), any(String.class), eq(true));
409-
// Verify final acknowledgment set created and completed when shardIterator is null
410-
verify(finalAcknowledgementSet).complete();
411-
412-
}
413-
}
414-
415-
416-
@Test
417-
void test_shard_has_no_records_null_iterator() throws Exception {
418-
final AcknowledgementSet finalAcknowledgementSet = mock(AcknowledgementSet.class);
419-
when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class)))
420-
.thenReturn(finalAcknowledgementSet);
421-
422-
// Set up response with null nextShardIterator to trigger end of shard
423-
GetRecordsResponse response = GetRecordsResponse.builder()
424-
.records(List.of())
425-
.nextShardIterator(null)
426-
.build();
427-
when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response);
428-
429-
try (MockedStatic<ShardConsumer> shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> {
430-
if (invocation.getMethod().getName().equals("stopAll")) {
431-
return null;
432-
} else if (invocation.getMethod().getName().equals("shouldStop")) {
433-
return false;
434-
}
435-
return invocation.callRealMethod();
436-
})) {
437-
ShardConsumer shardConsumer;
438-
try (final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
439-
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
440-
shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig)
441-
.shardIterator(shardIterator)
442-
.shardAcknowledgementManager(shardAcknowledgementManager)
443-
.streamPartition(streamPartition)
444-
.tableInfo(tableInfo)
445-
.startTime(null)
446-
.waitForExport(false)
447-
.build();
448-
}
449-
450-
shardConsumer.run();
451-
452-
// Verify acknowledgment set created for records with shardIterator == null (true)
453-
verify(shardAcknowledgementManager).createAcknowledgmentSet(eq(streamPartition), any(String.class), eq(true));
454-
// Verify final acknowledgment set created and completed when shardIterator is null
455-
verify(finalAcknowledgementSet).complete();
456-
457-
}
458-
}
459367
private List<Record> buildRecords(int count) {
460368
List<Record> records = new ArrayList<>();
461369
for (int i = 0; i < count; i++) {

0 commit comments

Comments
 (0)