Skip to content

Commit 49ef09d

Browse files
authored
Kds cross account stream (#5687)
Implementation for cross account stream support in KDS Signed-off-by: Souvik Bose <souvbose@amazon.com>
1 parent a75e8cc commit 49ef09d

17 files changed

Lines changed: 1188 additions & 349 deletions

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientApiHandler.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
package org.opensearch.dataprepper.plugins.kinesis.source;
1212

13+
import org.opensearch.dataprepper.plugins.kinesis.source.apihandler.KinesisClientApiHandler;
14+
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
1315
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
1416
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
1517
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@@ -19,8 +21,9 @@
1921
import software.amazon.kinesis.processor.MultiStreamTracker;
2022

2123
import java.time.Duration;
22-
import java.util.ArrayList;
2324
import java.util.List;
25+
import java.util.Objects;
26+
import java.util.stream.Collectors;
2427

2528
public class KinesisMultiStreamTracker implements MultiStreamTracker {
2629
private final KinesisSourceConfig sourceConfig;
@@ -34,21 +37,48 @@ public KinesisMultiStreamTracker(final KinesisSourceConfig sourceConfig, final S
3437
}
3538

3639
@Override
37-
public List<StreamConfig> streamConfigList() {
38-
List<StreamConfig> streamConfigList = new ArrayList<>();
39-
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
40-
StreamConfig streamConfig = getStreamConfig(kinesisStreamConfig);
41-
streamConfigList.add(streamConfig);
42-
}
43-
return streamConfigList;
40+
public List<StreamConfig> streamConfigList() {
41+
return sourceConfig.getStreams().stream()
42+
.map(this::createStreamConfig)
43+
.collect(Collectors.toList());
4444
}
4545

46-
private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
47-
StreamIdentifier sourceStreamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(kinesisStreamConfig.getName());
48-
return new StreamConfig(sourceStreamIdentifier,
49-
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
46+
private StreamConfig createStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
47+
StreamIdentifier streamIdentifier = getStreamIdentifier(kinesisStreamConfig);
48+
49+
// if the consumer strategy is polling, skip look up for consumer
50+
if (sourceConfig.getConsumerStrategy() == ConsumerStrategy.POLLING) {
51+
return new StreamConfig(streamIdentifier,
52+
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
53+
);
54+
}
55+
56+
// If stream arn and consumer arn is present, create a stream config based on the configured values
57+
if (Objects.nonNull(kinesisStreamConfig.getStreamArn()) && Objects.nonNull(kinesisStreamConfig.getConsumerArn())) {
58+
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), kinesisStreamConfig.getConsumerArn());
59+
}
60+
61+
// If stream arn is provided, lookup consumer arn based on the consumer name which is the data prepper application name
62+
if (Objects.nonNull(kinesisStreamConfig.getStreamArn())) {
63+
String consumerArn = kinesisClientAPIHandler.getConsumerArnForStream(kinesisStreamConfig.getStreamArn(), this.applicationName);
64+
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), consumerArn);
65+
}
66+
// Default case
67+
return new StreamConfig(streamIdentifier,
68+
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
69+
);
5070
}
5171

72+
private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisStreamConfig) {
73+
final String streamArn = kinesisStreamConfig.getStreamArn();
74+
final String streamName = kinesisStreamConfig.getName();
75+
76+
if (Objects.isNull(streamArn) && Objects.isNull(streamName)) {
77+
throw new IllegalArgumentException("Either ARN or name must be specified for Kinesis stream configuration");
78+
}
79+
80+
return kinesisClientAPIHandler.getStreamIdentifier(streamArn != null ? streamArn : streamName);
81+
}
5282
/**
5383
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
5484
*/

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.opensearch.dataprepper.model.record.Record;
2828
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
2929
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
30+
import org.opensearch.dataprepper.plugins.kinesis.source.apihandler.KinesisClientApiHandler;
31+
import org.opensearch.dataprepper.plugins.kinesis.source.apihandler.KinesisClientApiRetryHandler;
3032
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
3133
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
3234
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisShardRecordProcessorFactory;
@@ -178,8 +180,8 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
178180

179181
ConfigsBuilder configsBuilder =
180182
new ConfigsBuilder(
181-
new KinesisMultiStreamTracker(kinesisSourceConfig, applicationName, new KinesisClientApiHandler(kinesisClient, Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
182-
.withMaxAttempts(NUM_OF_RETRIES), NUM_OF_RETRIES)),
183+
new KinesisMultiStreamTracker(kinesisSourceConfig, applicationName, new KinesisClientApiHandler(kinesisClient, new KinesisClientApiRetryHandler(Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
184+
.withMaxAttempts(NUM_OF_RETRIES), NUM_OF_RETRIES))),
183185
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
184186
workerIdentifierGenerator.generate(), processorFactory
185187
)

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1818
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1919
import org.opensearch.dataprepper.model.buffer.Buffer;
20+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
2021
import org.opensearch.dataprepper.model.event.Event;
22+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2123
import org.opensearch.dataprepper.model.record.Record;
2224
import org.opensearch.dataprepper.model.source.Source;
23-
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
24-
import org.opensearch.dataprepper.model.plugin.PluginFactory;
25-
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
2625
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
26+
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.kinesis.source.apihandler;
12+
13+
import lombok.NonNull;
14+
import lombok.extern.slf4j.Slf4j;
15+
import org.opensearch.dataprepper.plugins.kinesis.source.exceptions.KinesisConsumerNotFoundException;
16+
import software.amazon.awssdk.arns.Arn;
17+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
18+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
19+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
20+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
21+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
22+
import software.amazon.awssdk.services.kinesis.model.KinesisException;
23+
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
24+
import software.amazon.kinesis.common.StreamIdentifier;
25+
26+
import java.util.Objects;
27+
import java.util.concurrent.CompletionException;
28+
29+
@Slf4j
30+
public class KinesisClientApiHandler {
31+
private static final String COLON = ":";
32+
33+
private final KinesisAsyncClient kinesisClient;
34+
private final KinesisClientApiRetryHandler kinesisClientApiRetryHandler;
35+
36+
public KinesisClientApiHandler(@NonNull final KinesisAsyncClient kinesisClient, @NonNull final KinesisClientApiRetryHandler kinesisClientApiRetryHandler) {
37+
this.kinesisClient = kinesisClient;
38+
this.kinesisClientApiRetryHandler = kinesisClientApiRetryHandler;
39+
}
40+
41+
public StreamIdentifier getStreamIdentifier(final String streamNameOrArn) {
42+
if (isArn(streamNameOrArn)) {
43+
return getStreamIdentifierFromArn(streamNameOrArn);
44+
}
45+
return getStreamIdentifierFromName(streamNameOrArn);
46+
}
47+
48+
public String getConsumerArnForStream(final String streamArn, final String consumerName) {
49+
if (Objects.isNull(streamArn) || streamArn.trim().isEmpty()) {
50+
throw new IllegalArgumentException("Stream ARN cannot be null or empty");
51+
}
52+
if (Objects.isNull(consumerName) || consumerName.trim().isEmpty()) {
53+
throw new IllegalArgumentException("Consumer name cannot be null or empty");
54+
}
55+
DescribeStreamConsumerResponse response = describeStreamConsumer(streamArn, consumerName);
56+
if (Objects.isNull(response)) {
57+
throw new KinesisConsumerNotFoundException(
58+
String.format("Kinesis stream consumer not found for %s", consumerName));
59+
}
60+
return response.consumerDescription().consumerARN();
61+
}
62+
63+
private boolean isArn(final String streamNameOrArn) {
64+
try {
65+
Arn.fromString(streamNameOrArn);
66+
return true;
67+
} catch (IllegalArgumentException e) {
68+
return false;
69+
}
70+
}
71+
72+
private StreamIdentifier getStreamIdentifierFromArn(final String streamArnString) {
73+
Arn streamArn = Arn.fromString(streamArnString);
74+
String streamName = streamArn.resource().resource();
75+
DescribeStreamSummaryResponse response = getStreamDescriptionSummary(
76+
buildStreamSummaryRequest(streamName, streamArnString));
77+
return StreamIdentifier.multiStreamInstance(
78+
streamArn,
79+
response.streamDescriptionSummary().streamCreationTimestamp().getEpochSecond());
80+
}
81+
82+
private StreamIdentifier getStreamIdentifierFromName(final String streamName) {
83+
DescribeStreamSummaryResponse response = getStreamDescriptionSummary(
84+
buildStreamSummaryRequest(streamName, null));
85+
StreamDescriptionSummary summary = response.streamDescriptionSummary();
86+
return StreamIdentifier.multiStreamInstance(createStreamIdentifierString(summary));
87+
}
88+
89+
private DescribeStreamSummaryRequest buildStreamSummaryRequest(String streamName, String streamArn) {
90+
return DescribeStreamSummaryRequest.builder()
91+
.streamName(streamName)
92+
.streamARN(streamArn)
93+
.build();
94+
}
95+
96+
private String createStreamIdentifierString(StreamDescriptionSummary summary) {
97+
String accountId = Arn.fromString(summary.streamARN()).accountId().get();
98+
long creationEpochSecond = summary.streamCreationTimestamp().getEpochSecond();
99+
return String.join(COLON, accountId, summary.streamName(),
100+
String.valueOf(creationEpochSecond));
101+
}
102+
103+
private DescribeStreamSummaryResponse getStreamDescriptionSummary(
104+
DescribeStreamSummaryRequest request) {
105+
return kinesisClientApiRetryHandler.executeWithRetry(
106+
"getStreamDescriptionSummary",
107+
() -> kinesisClient.describeStreamSummary(request).join(),
108+
(ex, attempt) -> handleStreamSummaryException((CompletionException) ex, request.streamName())
109+
);
110+
}
111+
112+
private void handleStreamSummaryException(CompletionException ex, String streamName) {
113+
Throwable cause = ex.getCause();
114+
if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) {
115+
log.error("AWS error while describing stream summary for stream {}: {}",
116+
streamName, ex.getMessage());
117+
} else {
118+
log.error("Unexpected error while describing stream summary for stream {}",
119+
streamName, ex);
120+
}
121+
}
122+
123+
private DescribeStreamConsumerResponse describeStreamConsumer(
124+
final String streamArn, final String consumerName) {
125+
DescribeStreamConsumerRequest request = DescribeStreamConsumerRequest.builder()
126+
.streamARN(streamArn)
127+
.consumerName(consumerName)
128+
.build();
129+
130+
return kinesisClientApiRetryHandler.executeWithRetry(
131+
"describeStreamConsumer",
132+
() -> kinesisClient.describeStreamConsumer(request).join(),
133+
(ex, attempt) -> handleConsumerException((CompletionException) ex, streamArn, attempt)
134+
);
135+
}
136+
137+
private void handleConsumerException(CompletionException ex, String streamArn, int attempt) {
138+
Throwable cause = ex.getCause();
139+
if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) {
140+
log.error("AWS error while describing stream consumer for stream {}: {}. Attempt {}.",
141+
streamArn, ex.getMessage(), attempt + 1);
142+
} else {
143+
log.error("Unexpected error while describing stream consumer for stream {}. Attempt {} of {}.",
144+
streamArn, attempt + 1, ex);
145+
}
146+
}
147+
}
148+

0 commit comments

Comments
 (0)