Adding functionality to read from specific timestamps for KDS source#6415
Conversation
| assertNull(kinesisStreamConfig.getConsumerArn()); | ||
| assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.AT_TIMESTAMP); | ||
|
|
||
| if ("stream-1".equals(kinesisStreamConfig.getName())) { |
There was a problem hiding this comment.
It's better to avoid conditionals in tests.
I'd recommend that you create a map using Java streams.
Then, use assertions to make sure the map has the expected objects with expected values. This will avoid both a loop and conditions.
| @JsonProperty("checkpoint_interval") | ||
| private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL; | ||
|
|
||
| @JsonProperty("lookback_duration") |
There was a problem hiding this comment.
Is "lookback duration" a Kinesis term? If not, we may want to consider terms that we already using in Data Prepper. I think the S3 source currently uses range for this.
There was a problem hiding this comment.
No, its not a kinesis term. Doesn't lookback_duration make more sense? I can change it to range too to keep it consistent across Data prepper
There was a problem hiding this comment.
I think initial_offset makes sense here because it's consistent with initial_position/initial_timestamp and it makes it clear that the "offset" is only used initiallly and not if there is already a kinesis checkpoint available in the dynamodb coordination table.
There was a problem hiding this comment.
Use of initial_ makes sense. But, I think Kinesis doesn't use the term offset. Instead they are sequence numbers.
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.junit.jupiter.api.Assertions.*; |
| private Duration lookbackDuration; | ||
|
|
||
| @JsonDeserialize(using = LocalDateTimeDeserializer.class) | ||
| @JsonProperty("initial_timestamp") |
There was a problem hiding this comment.
we should have a validation mechanism to ensure that either of these 2 attributes are defined in the pipeline yaml.
There was a problem hiding this comment.
I have provided this in KinesisMultiStreamTracker class at Line 92
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
…pensearch-project#6415) Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
…pensearch-project#6415) Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
…pensearch-project#6415) Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
…pensearch-project#6415) Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
…pensearch-project#6415) Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
Description
Adds support for starting Kinesis stream reads from a specific timestamp, configurable via relative duration (lookback) or absolute timestamp. Users can specify the timestamps in two ways:
Issues Resolved
Resolves #6366
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.