Skip to content

[DRAFT] Queue Semantics support in Kafka Ingestion#19311

Open
Shekharrajak wants to merge 18 commits intoapache:masterfrom
Shekharrajak:feature/kafka-share-group-ingestion
Open

[DRAFT] Queue Semantics support in Kafka Ingestion#19311
Shekharrajak wants to merge 18 commits intoapache:masterfrom
Shekharrajak:feature/kafka-share-group-ingestion

Conversation

@Shekharrajak
Copy link
Copy Markdown
Contributor

@Shekharrajak Shekharrajak commented Apr 14, 2026

Phase 1 #18439.

Description

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Demo

https://youtu.be/K_O1MH-AaE8

cd apache-druid-31.0.0

# Replace the kafka extension with our build
rm extensions/druid-kafka-indexing-service/*.jar
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can build from the source and use the tar

agg -> agg.hasSumAtLeast(numRecords)
);

Assertions.assertEquals(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure we read all the records.


for (ByteEntity entity : record.getData()) {
try {
final InputRow row = parseRow(entity, inputFormat, inputRowSchema, toolbox);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Share-group ingestion drops rows when one Kafka record expands to multiple rows

This runner processes each Kafka message as though it produces exactly one InputRow. parseRow() only returns iterator.next(), so later rows from the same message are silently discarded, and zero-row messages can pass null into driver.add. Because the code still ACKs the Kafka record after publish, those skipped rows are lost permanently rather than being retried.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants