Skip to content

Add kafka buffer backward compatibility test#6406

Merged
graytaylor0 merged 2 commits into
opensearch-project:mainfrom
graytaylor0:KafkabufferImprovements
Jan 15, 2026
Merged

Add kafka buffer backward compatibility test#6406
graytaylor0 merged 2 commits into
opensearch-project:mainfrom
graytaylor0:KafkabufferImprovements

Conversation

@graytaylor0
Copy link
Copy Markdown
Member

@graytaylor0 graytaylor0 commented Jan 14, 2026

Description

This change adds an end to end test to data prepper for testing kafka buffer backward compatibility. The steps of the test are

  1. Run the latest release data prepper docker image and write a couple of documents to the kafka buffer. Intentionally put delay so that those records are not consumed from the buffer
  2. Run the current build of data prepper to consume the records from the kafka buffer
  3. Verify the data in OpenSearch

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 force-pushed the KafkabufferImprovements branch from 273b69c to 55b0a31 Compare January 14, 2026 23:32
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. This looks like it will work well.

At a high level, we should improve the name:

  • The naming doesn't imply that this is the Kafka buffer. We also support a source and a nascent sink.
  • Do we really need to explicitly use "backward" as we have not forward compatibility implied?

Maybe call the project kafka-buffer-compatibility and name the task kafkaBufferCompatibilityTest.

* by writing data with a released version and reading with the current build.
*/

def RELEASED_VERSION = project.hasProperty('backwardCompatVersion') ?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid abbreviations like "compat". I think releasedVersion would also be a better name for the property.

-PreleasedVersion=2.13.0

But, we can also use backwardCompatibilityVersion.

buffer:
kafka:
topics:
- name: backward-compat-test-topic
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backward-compatibility-test-topic

kafka:
topics:
- name: backward-compat-test-topic
group_id: backward-compat-reader-group
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backward-compatibility-reader-group

username: "admin"
password: "admin"
insecure: true
index: "backward-compat-test-index"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-backward-compatibility-test-index

Since this is an OpenSearch domain, it is probably best to scope with kafka as well.

}

// Clean up tasks - remove any existing containers/networks before starting
tasks.register('cleanupKafkaBackwardCompatTest') {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tasks.register('cleanupKafkaBackwardCompatTest') {
tasks.register('cleanupKafkaBackwardCompatibilityTest') {

3. Both messages appear in OpenSearch with correct content
4. Test completes with ✅ SUCCESS message

## CI/CD Integration
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this section.

*/

def RELEASED_VERSION = project.hasProperty('backwardCompatVersion') ?
project.getProperty('backwardCompatVersion') : '2.10.0'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not 2.13.0?

Also, we should have 2 as an alias for the last released 2.x Data Prepper. That may be a better default.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to use '2' so it always gets the latest release

private static final int WRITER_HTTP_PORT = 2021;
private static final String TEST_INDEX_NAME = "backward-compat-test-index";
private static final String MESSAGE_KEY = "message";
private static final String TEST_VALUE_1 = "test-record-1-from-released-version";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to generate these values in @BeforeEach instead.

You can make them somewhat random as well:

testValue1 = "test-record-1-from-released-version" + UUID.randomUUID.toString();

import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import org.junit.Assert;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JUnit 4. Please replace with JUnit 5.

builder.withUsername("admin");
builder.withPassword("admin");
builder.withInsecure(true);
final AwsCredentialsSupplier awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this at all?

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 force-pushed the KafkabufferImprovements branch from cdbdac1 to b0b5fa6 Compare January 15, 2026 22:05
@graytaylor0 graytaylor0 requested a review from dlvenable January 15, 2026 22:22
@graytaylor0 graytaylor0 merged commit eed7755 into opensearch-project:main Jan 15, 2026
78 of 85 checks passed
ashrao94 pushed a commit to ashrao94/data-prepper that referenced this pull request Jan 22, 2026
Signed-off-by: Taylor Gray <tylgry@amazon.com>
san81 pushed a commit to san81/data-prepper that referenced this pull request Jan 27, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants