Skip to content

Enable kafka sink integration tests as a github workflow action#6751

Merged
graytaylor0 merged 4 commits into
opensearch-project:mainfrom
graytaylor0:KafkaSinkInteg
Apr 21, 2026
Merged

Enable kafka sink integration tests as a github workflow action#6751
graytaylor0 merged 4 commits into
opensearch-project:mainfrom
graytaylor0:KafkaSinkInteg

Conversation

@graytaylor0

@graytaylor0 graytaylor0 commented Apr 10, 2026

Copy link
Copy Markdown
Member

Description

Enabled kafka sink integration tests as a github workflow action

Locally ran kafka source and sink integration tests to confirm they are passing. The github actions for this PR still fail due to the old integ test code running for kafka source

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Taylor Gray <tylgry@amazon.com>
import static org.mockito.Mockito.when;


/**

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this comment to force the tests to run as github action since it only runs when a change is detected. Is there a manual way to run a github action?

if (!records.isEmpty() && records.count() > 0) {
List<JsonNode> consumed = new ArrayList<>();
int maxRetries = 15;
while (consumed.size() < expectedRecords.size() && maxRetries-- > 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Awaitility instead?

topicConfig = mock(TopicProducerConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON);
when(topicConfig.isCreateTopic()).thenReturn(false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need this for the GitHub Actions to pass. Unless there is some code in this test that creates the topic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic is created in the test code separately, so the sink plugin does not create the topic, just the test.

created.set(false);
}
while (created.get() != false) {
while (!created.get()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility. Also this has not stopping point during failures.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 requested a review from dlvenable April 14, 2026 15:13

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0 !

@graytaylor0 graytaylor0 merged commit c39d6b0 into opensearch-project:main Apr 21, 2026
71 of 75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants