Skip to content

KAFKA-10844: KStream mark as partitioned#22221

Open
zheguang wants to merge 11 commits into
apache:trunkfrom
zheguang:zheguang-KAFKA-10844
Open

KAFKA-10844: KStream mark as partitioned#22221
zheguang wants to merge 11 commits into
apache:trunkfrom
zheguang:zheguang-KAFKA-10844

Conversation

@zheguang

@zheguang zheguang commented May 6, 2026

Copy link
Copy Markdown
Contributor

This patch implements KIP-759: Unneeded repartition cancelling. The "unneeded" part is entirely by the user's own judgement :)

A stream with key-changing operation can be marked as partitioned as intended to avoid repartition inserted to its topology. The main use case is for aggregation by a "composite key" -- a stream partitioned by key A can readily aggregate by composite key (A, B) without repartition. The reason being given two records, if their (A, B) are equal, then their A is equal.

Spec/Test:

  • markAsPartitioned is no-op when no key changing
  • markAsPartitioned should not repartition in resulting topology
  • markAsPartitioned on a branch doesn't affect another branch
  • Join should work when join for the original key intended by markAsPartitioned
  • IQ should work when query for the original key intended by markAsPartitioned

@github-actions github-actions Bot added triage PRs from the community streams labels May 6, 2026
*
* @return a new, mutated {@code KStream} that will not repartition in subsequent operations.
*/
KStream<K, V> markAsPartitioned();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if KStream.scala should also be updated with this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw that KStream.scala is deprecated by 4.3... so maybe not worth updating?

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java Outdated
final String topologyNotMarked = builder.build().describe().toString();
notMarked.markAsPartitioned();
final String topologyMarked = builder.build().describe().toString();
assertEquals(topologyNotMarked, topologyMarked);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be also assert for notMarked and notMarked.markAsPartitioned() to be same as this is returned in the impl

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... leaning towards keeping as is for the spec: "markAsPartitioned is no-op when no key changing". Testing implementation details might be an overkill.

@muralibasani muralibasani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zheguang I have a few minor comments.

@github-actions github-actions Bot removed the triage PRs from the community label May 10, 2026

@muralibasani muralibasani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@UladzislauBlok

Copy link
Copy Markdown
Contributor

LGTM
Thanks for PR and KIP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants