Skip to content

Commit 5829ac3

Browse files
committed
Adding functionality to read from specific timestamps for KDS source
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent c25f82f commit 5829ac3

7 files changed

Lines changed: 183 additions & 12 deletions

File tree

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
1515
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
1616
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
17+
import software.amazon.kinesis.common.InitialPositionInStream;
1718
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
1819
import software.amazon.kinesis.common.StreamConfig;
1920
import software.amazon.kinesis.common.StreamIdentifier;
2021
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
2122
import software.amazon.kinesis.processor.MultiStreamTracker;
2223

2324
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.time.ZoneOffset;
27+
import java.util.Date;
2428
import java.util.List;
2529
import java.util.Objects;
2630
import java.util.stream.Collectors;
@@ -45,28 +49,25 @@ public List<StreamConfig> streamConfigList() {
4549

4650
private StreamConfig createStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
4751
StreamIdentifier streamIdentifier = getStreamIdentifier(kinesisStreamConfig);
52+
InitialPositionInStreamExtended initialPosition = getInitialPositionExtended(kinesisStreamConfig);
4853

4954
// if the consumer strategy is polling, skip look up for consumer
5055
if (sourceConfig.getConsumerStrategy() == ConsumerStrategy.POLLING) {
51-
return new StreamConfig(streamIdentifier,
52-
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
53-
);
56+
return new StreamConfig(streamIdentifier, initialPosition);
5457
}
5558

5659
// If stream arn and consumer arn is present, create a stream config based on the configured values
5760
if (Objects.nonNull(kinesisStreamConfig.getStreamArn()) && Objects.nonNull(kinesisStreamConfig.getConsumerArn())) {
58-
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), kinesisStreamConfig.getConsumerArn());
61+
return new StreamConfig(streamIdentifier, initialPosition, kinesisStreamConfig.getConsumerArn());
5962
}
6063

6164
// If stream arn is provided, lookup consumer arn based on the consumer name which is the data prepper application name
6265
if (Objects.nonNull(kinesisStreamConfig.getStreamArn())) {
6366
String consumerArn = kinesisClientAPIHandler.getConsumerArnForStream(kinesisStreamConfig.getStreamArn(), this.applicationName);
64-
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), consumerArn);
67+
return new StreamConfig(streamIdentifier, initialPosition, consumerArn);
6568
}
6669
// Default case
67-
return new StreamConfig(streamIdentifier,
68-
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
69-
);
70+
return new StreamConfig(streamIdentifier, initialPosition);
7071
}
7172

7273
private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisStreamConfig) {
@@ -79,6 +80,21 @@ private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisSt
7980

8081
return kinesisClientAPIHandler.getStreamIdentifier(streamArn != null ? streamArn : streamName);
8182
}
83+
84+
private InitialPositionInStreamExtended getInitialPositionExtended(KinesisStreamConfig kinesisStreamConfig) {
85+
if (kinesisStreamConfig.getInitialPosition() == InitialPositionInStream.AT_TIMESTAMP) {
86+
Instant timestamp;
87+
if (Objects.nonNull(kinesisStreamConfig.getInitialTimestamp())) {
88+
timestamp = kinesisStreamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant();
89+
} else if (Objects.nonNull(kinesisStreamConfig.getLookbackDuration())) {
90+
timestamp = Instant.now().minus(kinesisStreamConfig.getLookbackDuration());
91+
} else {
92+
throw new IllegalArgumentException("Either initial_timestamp or lookback_duration must be specified when using AT_TIMESTAMP initial_position");
93+
}
94+
return InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
95+
}
96+
return InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition());
97+
}
8298
/**
8399
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
84100
*/

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
@Getter
2121
public enum InitialPositionInStreamConfig {
2222
LATEST("latest", InitialPositionInStream.LATEST),
23-
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON);
23+
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON),
24+
AT_TIMESTAMP("at_timestamp", InitialPositionInStream.AT_TIMESTAMP);
2425

2526
private final String position;
2627

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
15+
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
1416
import jakarta.validation.Valid;
1517
import lombok.Getter;
1618
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
1719
import software.amazon.awssdk.arns.Arn;
1820
import software.amazon.kinesis.common.InitialPositionInStream;
1921

2022
import java.time.Duration;
23+
import java.time.LocalDateTime;
2124
import java.util.Objects;
2225

2326
@Getter
@@ -44,6 +47,13 @@ public class KinesisStreamConfig {
4447
@JsonProperty("checkpoint_interval")
4548
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;
4649

50+
@JsonProperty("lookback_duration")
51+
private Duration lookbackDuration;
52+
53+
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
54+
@JsonProperty("initial_timestamp")
55+
private LocalDateTime initialTimestamp;
56+
4757
public InitialPositionInStream getInitialPosition() {
4858
return initialPosition.getPositionInStream();
4959
}

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTrackerTest.java

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,16 @@
2828

2929
import java.time.Duration;
3030
import java.time.Instant;
31+
import java.time.LocalDateTime;
32+
import java.time.ZoneOffset;
3133
import java.util.ArrayList;
34+
import java.util.Date;
3235
import java.util.HashMap;
3336
import java.util.List;
3437
import java.util.Map;
3538
import java.util.UUID;
3639

37-
import static org.junit.jupiter.api.Assertions.assertEquals;
38-
import static org.junit.jupiter.api.Assertions.assertNull;
39-
import static org.junit.jupiter.api.Assertions.assertThrows;
40+
import static org.junit.jupiter.api.Assertions.*;
4041
import static org.mockito.Mockito.mock;
4142
import static org.mockito.Mockito.when;
4243

@@ -190,6 +191,85 @@ void testStreamConfigWithStreamArnOnly() {
190191
assertEquals("streamName", expectedIdentifier.streamName());
191192
}
192193

194+
@Test
195+
void testStreamConfigWithAtTimeStampInitialPositionWithInitialTimestamp() {
196+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
197+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
198+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
199+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
200+
when(streamConfig.getInitialTimestamp()).thenReturn(LocalDateTime.of(2024, 1, 15, 10, 30));
201+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
202+
203+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
204+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
205+
.thenReturn(expectedIdentifier);
206+
final String expectedConsumerArn = UUID.randomUUID().toString();
207+
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
208+
.thenReturn(expectedConsumerArn);
209+
210+
List<StreamConfig> configs = createObjectUnderTest().streamConfigList();
211+
212+
assertEquals(1, configs.size());
213+
StreamConfig resultConfig = configs.get(0);
214+
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
215+
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
216+
assertEquals("streamName", expectedIdentifier.streamName());
217+
assertEquals(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(
218+
streamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant())), resultConfig.initialPositionInStreamExtended());
219+
}
220+
221+
@Test
222+
void testStreamConfigWithAtTimeStampInitialPositionWithLookbackDuration() {
223+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
224+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
225+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
226+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
227+
when(streamConfig.getLookbackDuration()).thenReturn(Duration.ofMinutes(30));
228+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
229+
230+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
231+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
232+
.thenReturn(expectedIdentifier);
233+
final String expectedConsumerArn = UUID.randomUUID().toString();
234+
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
235+
.thenReturn(expectedConsumerArn);
236+
237+
List<StreamConfig> configs = createObjectUnderTest().streamConfigList();
238+
239+
assertEquals(1, configs.size());
240+
StreamConfig resultConfig = configs.get(0);
241+
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
242+
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
243+
assertEquals("streamName", expectedIdentifier.streamName());
244+
assertEquals(InitialPositionInStream.AT_TIMESTAMP, resultConfig.initialPositionInStreamExtended().getInitialPositionInStream());
245+
246+
long actualTime = resultConfig.initialPositionInStreamExtended().getTimestamp().getTime();
247+
assertTrue(actualTime <= System.currentTimeMillis() - Duration.ofMinutes(30).toMillis());
248+
}
249+
250+
@Test
251+
void testStreamConfigWithAtTimeStampInitialPositionWithNoLookbackDurationAndNoInitialTimestamp() {
252+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
253+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
254+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
255+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
256+
when(streamConfig.getLookbackDuration()).thenReturn(null);
257+
when(streamConfig.getInitialTimestamp()).thenReturn(null);
258+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
259+
260+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
261+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
262+
.thenReturn(expectedIdentifier);
263+
264+
KinesisMultiStreamTracker tracker = createObjectUnderTest();
265+
266+
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
267+
tracker::streamConfigList);
268+
assertEquals("Either initial_timestamp or lookback_duration must be " +
269+
"specified when using AT_TIMESTAMP initial_position",
270+
exception.getMessage());
271+
}
272+
193273
@Test
194274
void testStreamConfigWithNoArnOrName() {
195275
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfigTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,13 @@ void testInitialPositionGetByNameEarliest() {
3535
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
3636
}
3737

38+
@Test
39+
void testInitialPositionGetByNameAtTimestamp() {
40+
final InitialPositionInStreamConfig initialPositionInStreamConfig = InitialPositionInStreamConfig.fromPositionValue("at_timestamp");
41+
assertEquals(initialPositionInStreamConfig, InitialPositionInStreamConfig.AT_TIMESTAMP);
42+
assertEquals(initialPositionInStreamConfig.toString(), "at_timestamp");
43+
assertEquals(initialPositionInStreamConfig.getPosition(), "at_timestamp");
44+
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
45+
}
46+
3847
}

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.Reader;
2929
import java.io.StringReader;
3030
import java.time.Duration;
31+
import java.time.LocalDateTime;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -46,6 +47,7 @@ public class KinesisSourceConfigTest {
4647
private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml";
4748
private static final String PIPELINE_CONFIG_STREAM_ARN_ENABLED = "pipeline_with_stream_arn_config.yaml";
4849
private static final String PIPELINE_CONFIG_STREAM_ARN_CONSUMER_ARN_ENABLED = "pipeline_with_stream_arn_consumer_arn_config.yaml";
50+
private static final String PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP = "pipeline_with_initial_position_at_timestamp_config.yaml";
4951
private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute
5052

5153
KinesisSourceConfig kinesisSourceConfig;
@@ -234,4 +236,43 @@ void testSourceConfigWithStreamArnConsumerArn() {
234236
assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL);
235237
}
236238
}
239+
240+
@Test
241+
@Tag(PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP)
242+
void testSourceConfigWithInitialPositionAtTimestamp() {
243+
244+
assertThat(kinesisSourceConfig, notNullValue());
245+
assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate());
246+
assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout());
247+
assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts());
248+
assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime());
249+
assertFalse(kinesisSourceConfig.isAcknowledgments());
250+
assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout());
251+
assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue());
252+
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1);
253+
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole");
254+
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsExternalId());
255+
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsHeaderOverrides());
256+
assertNotNull(kinesisSourceConfig.getCodec());
257+
List<KinesisStreamConfig> streamConfigs = kinesisSourceConfig.getStreams();
258+
assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT);
259+
260+
assertEquals(streamConfigs.size(), 2);
261+
262+
for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) {
263+
assertTrue(kinesisStreamConfig.getName().contains("stream"));
264+
assertNull(kinesisStreamConfig.getStreamArn());
265+
assertNull(kinesisStreamConfig.getConsumerArn());
266+
assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.AT_TIMESTAMP);
267+
268+
if ("stream-1".equals(kinesisStreamConfig.getName())) {
269+
assertEquals(Duration.parse("P3DT12H"), kinesisStreamConfig.getLookbackDuration());
270+
assertNull(kinesisStreamConfig.getInitialTimestamp());
271+
} else if ("stream-2".equals(kinesisStreamConfig.getName())) {
272+
assertNull(kinesisStreamConfig.getLookbackDuration());
273+
assertEquals(LocalDateTime.parse("2024-01-15T10:30:00"), kinesisStreamConfig.getInitialTimestamp());
274+
}
275+
}
276+
}
277+
237278
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
source:
2+
kinesis:
3+
streams:
4+
- stream_name: "stream-1"
5+
initial_position: "AT_TIMESTAMP"
6+
lookback_duration: "P3DT12H"
7+
- stream_name: "stream-2"
8+
initial_position: "AT_TIMESTAMP"
9+
initial_timestamp: "2024-01-15T10:30:00"
10+
codec:
11+
ndjson:
12+
aws:
13+
sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole"
14+
region: "us-east-1"

0 commit comments

Comments
 (0)