Skip to content

Commit b3aece5

Browse files
authored
Add disable_checkpointing flag for ddb source stream (#5981)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 7b8478b commit b3aece5

3 files changed

Lines changed: 37 additions & 0 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ public class StreamConfig {
1616
@JsonProperty("view_on_remove")
1717
private StreamViewType viewForRemoves = StreamViewType.NEW_IMAGE;
1818

19+
@JsonProperty("disable_checkpointing")
20+
private boolean disableCheckpointing = false;
21+
1922
public StreamStartPosition getStartPosition() {
2023
return startPosition;
2124
}
@@ -24,4 +27,6 @@ public StreamViewType getStreamViewForRemoves() {
2427
return viewForRemoves;
2528
}
2629

30+
public boolean isDisableCheckpointing() { return disableCheckpointing; }
31+
2732
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ public Runnable createConsumer(final StreamPartition streamPartition,
7878
if (progressState.isPresent()) {
7979
// We can't checkpoint with acks yet
8080
sequenceNumber = acknowledgementSet == null ? null : progressState.get().getSequenceNumber();
81+
if (streamConfig.isDisableCheckpointing()) {
82+
sequenceNumber = null;
83+
}
8184
waitForExport = progressState.get().shouldWaitForExport();
8285
if (progressState.get().getStartTime() != 0) {
8386
startTime = Instant.ofEpochMilli(progressState.get().getStartTime());

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import org.junit.jupiter.api.BeforeEach;
1010
import org.junit.jupiter.api.Test;
1111
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.mockito.ArgumentCaptor;
1213
import org.mockito.Mock;
1314
import org.mockito.junit.jupiter.MockitoExtension;
1415
import org.opensearch.dataprepper.metrics.PluginMetrics;
16+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
1517
import org.opensearch.dataprepper.model.buffer.Buffer;
1618
import org.opensearch.dataprepper.model.event.Event;
1719
import org.opensearch.dataprepper.model.record.Record;
@@ -26,13 +28,15 @@
2628
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
2729
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
2830
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
31+
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
2932
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
3033

3134
import java.time.Instant;
3235
import java.util.Optional;
3336
import java.util.UUID;
3437

3538
import static org.hamcrest.MatcherAssert.assertThat;
39+
import static org.hamcrest.Matchers.equalTo;
3640
import static org.hamcrest.Matchers.notNullValue;
3741
import static org.hamcrest.Matchers.nullValue;
3842
import static org.mockito.ArgumentMatchers.any;
@@ -123,6 +127,31 @@ public void test_create_shardConsumer_correctly() {
123127
verify(streamApiInvocations).increment();
124128
}
125129

130+
@Test
131+
public void test_create_shardConsumer_correctly_with_is_disable_checkpointing_enabled_starts_from_trim_horizon() {
132+
133+
final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
134+
when(streamConfig.isDisableCheckpointing()).thenReturn(true);
135+
StreamProgressState state = new StreamProgressState();
136+
state.setWaitForExport(false);
137+
state.setSequenceNumber(UUID.randomUUID().toString());
138+
state.setStartTime(Instant.now().toEpochMilli());
139+
streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state));
140+
141+
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig);
142+
Runnable consumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, null);
143+
assertThat(consumer, notNullValue());
144+
145+
final ArgumentCaptor<GetShardIteratorRequest> captor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
146+
verify(dynamoDbStreamsClient).getShardIterator(captor.capture());
147+
148+
final GetShardIteratorRequest getShardIteratorRequest = captor.getValue();
149+
assertThat(getShardIteratorRequest.sequenceNumber(), equalTo(null));
150+
assertThat(getShardIteratorRequest.shardIteratorType(), equalTo(ShardIteratorType.TRIM_HORIZON));
151+
152+
verify(streamApiInvocations).increment();
153+
}
154+
126155
@Test
127156
public void test_create_shardConsumer_for_closedShards() {
128157
// For ending sequence number != null

0 commit comments

Comments
 (0)