Skip to content

Adding functionality to read from specific timestamps for KDS source#6415

Merged
sb2k16 merged 1 commit into
opensearch-project:mainfrom
divbok:main
Jan 20, 2026
Merged

Adding functionality to read from specific timestamps for KDS source#6415
sb2k16 merged 1 commit into
opensearch-project:mainfrom
divbok:main

Conversation

@divbok
Copy link
Copy Markdown
Collaborator

@divbok divbok commented Jan 15, 2026

Description

Adds support for starting Kinesis stream reads from a specific timestamp, configurable via relative duration (lookback) or absolute timestamp. Users can specify the timestamps in two ways:

  1. Relative duration(lookback) - Specify how far back from the current time to start reading
kinesis-pipeline:
  source:
    kinesis:
      streams:
        - stream_name: my-stream
          initial_position: AT_TIMESTAMP
          lookback_duration: "P3DT12H"  # Start from 3 days 12 hours ago
  1. Absolute timestamp - Specify an exact timestamp to start reading from
kinesis-pipeline:
  source:
    kinesis:
      streams:
        - stream_name: my-stream
          initial_position: AT_TIMESTAMP
          initial_timestamp: "2024-01-15T10:30:00Z" #Specified in UTC timezone format

Issues Resolved

Resolves #6366

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

assertNull(kinesisStreamConfig.getConsumerArn());
assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.AT_TIMESTAMP);

if ("stream-1".equals(kinesisStreamConfig.getName())) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to avoid conditionals in tests.

I'd recommend that you create a map using Java streams.

Then, use assertions to make sure the map has the expected objects with expected values. This will avoid both a loop and conditions.

@JsonProperty("checkpoint_interval")
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;

@JsonProperty("lookback_duration")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "lookback duration" a Kinesis term? If not, we may want to consider terms that we already using in Data Prepper. I think the S3 source currently uses range for this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, its not a kinesis term. Doesn't lookback_duration make more sense? I can change it to range too to keep it consistent across Data prepper

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think initial_offset makes sense here because it's consistent with initial_position/initial_timestamp and it makes it clear that the "offset" is only used initiallly and not if there is already a kinesis checkpoint available in the dynamodb coordination table.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of initial_ makes sense. But, I think Kinesis doesn't use the term offset. Instead they are sequence numbers.

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.*;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use explicit imports.

private Duration lookbackDuration;

@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("initial_timestamp")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a validation mechanism to ensure that either of these 2 attributes are defined in the pipeline yaml.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have provided this in KinesisMultiStreamTracker class at Line 92

Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
@sb2k16 sb2k16 merged commit 90d76bb into opensearch-project:main Jan 20, 2026
49 of 52 checks passed
ashrao94 pushed a commit to ashrao94/data-prepper that referenced this pull request Jan 22, 2026
san81 pushed a commit to san81/data-prepper that referenced this pull request Jan 27, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…pensearch-project#6415)

Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for reading from specific timestamps in Kinesis source plugin

4 participants