Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -45,28 +49,25 @@ public List<StreamConfig> streamConfigList() {

private StreamConfig createStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier streamIdentifier = getStreamIdentifier(kinesisStreamConfig);
InitialPositionInStreamExtended initialPosition = getInitialPositionExtended(kinesisStreamConfig);

// if the consumer strategy is polling, skip look up for consumer
if (sourceConfig.getConsumerStrategy() == ConsumerStrategy.POLLING) {
return new StreamConfig(streamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
);
return new StreamConfig(streamIdentifier, initialPosition);
}

// If stream arn and consumer arn is present, create a stream config based on the configured values
if (Objects.nonNull(kinesisStreamConfig.getStreamArn()) && Objects.nonNull(kinesisStreamConfig.getConsumerArn())) {
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), kinesisStreamConfig.getConsumerArn());
return new StreamConfig(streamIdentifier, initialPosition, kinesisStreamConfig.getConsumerArn());
}

// If stream arn is provided, lookup consumer arn based on the consumer name which is the data prepper application name
if (Objects.nonNull(kinesisStreamConfig.getStreamArn())) {
String consumerArn = kinesisClientAPIHandler.getConsumerArnForStream(kinesisStreamConfig.getStreamArn(), this.applicationName);
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), consumerArn);
return new StreamConfig(streamIdentifier, initialPosition, consumerArn);
}
// Default case
return new StreamConfig(streamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
);
return new StreamConfig(streamIdentifier, initialPosition);
}

private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisStreamConfig) {
Expand All @@ -79,6 +80,21 @@ private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisSt

return kinesisClientAPIHandler.getStreamIdentifier(streamArn != null ? streamArn : streamName);
}

private InitialPositionInStreamExtended getInitialPositionExtended(KinesisStreamConfig kinesisStreamConfig) {
if (kinesisStreamConfig.getInitialPosition() == InitialPositionInStream.AT_TIMESTAMP) {
Instant timestamp;
if (Objects.nonNull(kinesisStreamConfig.getInitialTimestamp())) {
timestamp = kinesisStreamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant();
} else if (Objects.nonNull(kinesisStreamConfig.getRange())) {
timestamp = Instant.now().minus(kinesisStreamConfig.getRange());
} else {
throw new IllegalArgumentException("Either initial_timestamp or range must be specified when using AT_TIMESTAMP initial_position");
}
return InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
}
return InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition());
}
/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
@Getter
public enum InitialPositionInStreamConfig {
LATEST("latest", InitialPositionInStream.LATEST),
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON);
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON),
AT_TIMESTAMP("at_timestamp", InitialPositionInStream.AT_TIMESTAMP);

private final String position;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import jakarta.validation.Valid;
import lombok.Getter;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import software.amazon.awssdk.arns.Arn;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;

@Getter
Expand All @@ -44,6 +47,13 @@ public class KinesisStreamConfig {
@JsonProperty("checkpoint_interval")
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;

@JsonProperty("range")
private Duration range;

@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("initial_timestamp")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a validation mechanism to ensure that either of these 2 attributes are defined in the pipeline yaml.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have provided this in KinesisMultiStreamTracker class at Line 92

private LocalDateTime initialTimestamp;

public InitialPositionInStream getInitialPosition() {
return initialPosition.getPositionInStream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -190,6 +194,85 @@ void testStreamConfigWithStreamArnOnly() {
assertEquals("streamName", expectedIdentifier.streamName());
}

@Test
void testStreamConfigWithAtTimeStampInitialPositionWithInitialTimestamp() {
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
when(streamConfig.getInitialTimestamp()).thenReturn(LocalDateTime.of(2024, 1, 15, 10, 30));
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));

StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
.thenReturn(expectedIdentifier);
final String expectedConsumerArn = UUID.randomUUID().toString();
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
.thenReturn(expectedConsumerArn);

List<StreamConfig> configs = createObjectUnderTest().streamConfigList();

assertEquals(1, configs.size());
StreamConfig resultConfig = configs.get(0);
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
assertEquals("streamName", expectedIdentifier.streamName());
assertEquals(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(
streamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant())), resultConfig.initialPositionInStreamExtended());
}

@Test
void testStreamConfigWithAtTimeStampInitialPositionWithRange() {
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
when(streamConfig.getRange()).thenReturn(Duration.ofMinutes(30));
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));

StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
.thenReturn(expectedIdentifier);
final String expectedConsumerArn = UUID.randomUUID().toString();
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
.thenReturn(expectedConsumerArn);

List<StreamConfig> configs = createObjectUnderTest().streamConfigList();

assertEquals(1, configs.size());
StreamConfig resultConfig = configs.get(0);
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
assertEquals("streamName", expectedIdentifier.streamName());
assertEquals(InitialPositionInStream.AT_TIMESTAMP, resultConfig.initialPositionInStreamExtended().getInitialPositionInStream());

long actualTime = resultConfig.initialPositionInStreamExtended().getTimestamp().getTime();
assertTrue(actualTime <= System.currentTimeMillis() - Duration.ofMinutes(30).toMillis());
}

@Test
void testStreamConfigWithAtTimeStampInitialPositionWithNoRangeAndNoInitialTimestamp() {
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
when(streamConfig.getRange()).thenReturn(null);
when(streamConfig.getInitialTimestamp()).thenReturn(null);
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));

StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
.thenReturn(expectedIdentifier);

KinesisMultiStreamTracker tracker = createObjectUnderTest();

IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
tracker::streamConfigList);
assertEquals("Either initial_timestamp or range must be " +
"specified when using AT_TIMESTAMP initial_position",
exception.getMessage());
}

@Test
void testStreamConfigWithNoArnOrName() {
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@ void testInitialPositionGetByNameEarliest() {
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
}

@Test
void testInitialPositionGetByNameAtTimestamp() {
final InitialPositionInStreamConfig initialPositionInStreamConfig = InitialPositionInStreamConfig.fromPositionValue("at_timestamp");
assertEquals(initialPositionInStreamConfig, InitialPositionInStreamConfig.AT_TIMESTAMP);
assertEquals(initialPositionInStreamConfig.toString(), "at_timestamp");
assertEquals(initialPositionInStreamConfig.getPosition(), "at_timestamp");
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.io.Reader;
import java.io.StringReader;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -46,6 +48,7 @@ public class KinesisSourceConfigTest {
private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml";
private static final String PIPELINE_CONFIG_STREAM_ARN_ENABLED = "pipeline_with_stream_arn_config.yaml";
private static final String PIPELINE_CONFIG_STREAM_ARN_CONSUMER_ARN_ENABLED = "pipeline_with_stream_arn_consumer_arn_config.yaml";
private static final String PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP = "pipeline_with_initial_position_at_timestamp_config.yaml";
private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute

KinesisSourceConfig kinesisSourceConfig;
Expand Down Expand Up @@ -234,4 +237,49 @@ void testSourceConfigWithStreamArnConsumerArn() {
assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL);
}
}

@Test
@Tag(PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP)
void testSourceConfigWithInitialPositionAtTimestamp() {

assertThat(kinesisSourceConfig, notNullValue());
assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate());
assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout());
assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts());
assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime());
assertFalse(kinesisSourceConfig.isAcknowledgments());
assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout());
assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue());
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1);
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole");
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsExternalId());
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsHeaderOverrides());
assertNotNull(kinesisSourceConfig.getCodec());
List<KinesisStreamConfig> streamConfigs = kinesisSourceConfig.getStreams();
assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT);

assertEquals(streamConfigs.size(), 2);

Map<String, KinesisStreamConfig> streamConfigMap = streamConfigs.stream()
.collect(Collectors.toMap(KinesisStreamConfig::getName, config -> config));

assertEquals(2, streamConfigMap.size());

KinesisStreamConfig stream1 = streamConfigMap.get("stream-1");
assertNotNull(stream1);
assertNull(stream1.getStreamArn());
assertNull(stream1.getConsumerArn());
assertEquals(InitialPositionInStream.AT_TIMESTAMP, stream1.getInitialPosition());
assertEquals(Duration.parse("P3DT12H"), stream1.getRange());
assertNull(stream1.getInitialTimestamp());

KinesisStreamConfig stream2 = streamConfigMap.get("stream-2");
assertNotNull(stream2);
assertNull(stream2.getStreamArn());
assertNull(stream2.getConsumerArn());
assertEquals(InitialPositionInStream.AT_TIMESTAMP, stream2.getInitialPosition());
assertNull(stream2.getRange());
assertEquals(LocalDateTime.parse("2024-01-15T10:30:00"), stream2.getInitialTimestamp());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
source:
kinesis:
streams:
- stream_name: "stream-1"
initial_position: "AT_TIMESTAMP"
range: "P3DT12H"
- stream_name: "stream-2"
initial_position: "AT_TIMESTAMP"
initial_timestamp: "2024-01-15T10:30:00"
codec:
ndjson:
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole"
region: "us-east-1"
Loading