Skip to content

Support isolation.level config for Kafka consumer#5894

Merged
dlvenable merged 1 commit into
opensearch-project:mainfrom
lhooss:feat/support-isolation-level
Aug 1, 2025
Merged

Support isolation.level config for Kafka consumer#5894
dlvenable merged 1 commit into
opensearch-project:mainfrom
lhooss:feat/support-isolation-level

Conversation

@lhooss
Copy link
Copy Markdown
Contributor

@lhooss lhooss commented Jul 23, 2025

Description

This pull request adds support for the isolation.level Kafka consumer configuration in Data Prepper.

The following changes were made:

  • Introduced a new method getIsolationLevel() in the TopicConsumerConfig interface.

  • Implemented the new property isolation_level with a default value of "read_uncommitted" in both:

    • SourceTopicConfig (Kafka source plugin)

    • BufferTopicConfig (Kafka buffer plugin)

  • Ensured backward compatibility by setting sensible defaults and integrating with existing configuration parsing logic.

  • This allows users to configure Data Prepper to read only committed messages by setting isolation_level: read_committed in their pipeline YAML file.

This is especially useful when working with transactional Kafka producers and ensures message consistency in distributed systems.

Issues Resolved

Resolves #5896

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhooss , Thank you for this contribution! I left a couple of comments, but we should be able to get this in once those are done. Thanks!

@JsonProperty("fetch_min_bytes")
private ByteCount fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;

@JsonProperty("isolation_level")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this to CommonTopicConfig instead of here and SourceTopicConfig. It will make it common between the two.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the buffer would need to support transactions. Let's just remove it from BufferTopicConfig and make this a source property only.

I understand from #5896 that you are using Kafka as a source.

@lhooss lhooss force-pushed the feat/support-isolation-level branch from 0282215 to 3002461 Compare July 30, 2025 12:17
@lhooss
Copy link
Copy Markdown
Contributor Author

lhooss commented Jul 30, 2025

@lhooss , Thank you for this contribution! I left a couple of comments, but we should be able to get this in once those are done. Thanks!

@dlvenable Thank you for your feedback and for reviewing the PR!
I’ve addressed your comments, please let me know if anything else is needed.

@lhooss lhooss force-pushed the feat/support-isolation-level branch 3 times, most recently from 72566e5 to 2f1060a Compare July 30, 2025 23:49
@lhooss lhooss requested a review from dlvenable July 31, 2025 00:12
Signed-off-by: Mohamed Houssam LAHRACH <lahrach.houssam@gmail.com>
@lhooss lhooss force-pushed the feat/support-isolation-level branch from 2f1060a to 39c2ee7 Compare August 1, 2025 11:14
@lhooss lhooss requested a review from dlvenable August 1, 2025 11:16
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @lhooss for the contribution!

@dlvenable dlvenable merged commit 439fde2 into opensearch-project:main Aug 1, 2025
45 of 50 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for Kafka consumer config: isolation.level

4 participants