Skip to content

[GSoC 2026] Kafka Streams runner — KStreamsPayload Serde (GroupByKey prerequisite)#39051

Open
junaiddshaukat wants to merge 3 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-payload-serde
Open

[GSoC 2026] Kafka Streams runner — KStreamsPayload Serde (GroupByKey prerequisite)#39051
junaiddshaukat wants to merge 3 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-payload-serde

Conversation

@junaiddshaukat

@junaiddshaukat junaiddshaukat commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

Summary

A Kafka Serde for KStreamsPayload so the envelope can cross a topic boundary. Until now it only flowed in-JVM via ProcessorContext#forward; GroupByKey introduces the first real topic (the key-based repartition topic, per the plan agreed with @je-ik), which needs the payload serialized. Split out as its own small PR ahead of GBK.

Scope

  • KStreamsPayloadSerde<T>: parameterized by the Coder<WindowedValue<T>> for
    the data variant (different topics carry different element types; the watermark
    variant is coder-independent). Wire format: a one-byte discriminator + body —
    data = [0x00][windowedValueCoder-encoded value]; watermark =
    [0x01][long millis][int sourcePartition][int totalSourcePartitions].
  • Unit tests: round-trip of data, watermark, and terminal MAX watermark, plus an
    unknown-tag failure.

Out of scope

  • Wiring the serde into the GBK repartition / watermark fan-out — that's the
    GroupByKey PR.

Notes

  • Assumes non-null payloads: the topics it's used on (repartition, watermark
    fan-out) are not log-compacted, so no tombstones occur.

Testing

./gradlew :runners:kafka-streams:check green; 4 unit tests.

Closes #39042
Refs #18479
cc @je-ik

KStreamsPayload has so far only flowed in-JVM via ProcessorContext#forward, so
it needed no serialization. GroupByKey introduces the first real topic — the
key-based repartition topic — so the payload now has to be serialized.

KStreamsPayloadSerde<T> is parameterized by the Coder<WindowedValue<T>> for the
data variant, since different topics carry different element types; the
watermark report variant is coder-independent. The wire format is a one-byte
discriminator followed by the variant body: data is the windowed-value-coder
encoding; watermark is the millis + sourcePartition + totalSourcePartitions
report. The serde assumes non-null payloads, since the topics it is used on
(repartition and watermark fan-out) are not log-compacted.

Refs apache#18479
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a serialization mechanism for KStreamsPayload to support Kafka topic boundaries, which is a prerequisite for implementing GroupByKey in the Kafka Streams runner. By providing a custom Serde, the runner can now safely transmit both data elements and watermark reports across topics, whereas previously these payloads were restricted to in-JVM memory.

Highlights

  • New Kafka Serde Implementation: Introduced KStreamsPayloadSerde to enable serialization of KStreamsPayload objects for cross-topic communication.
  • Wire Format Definition: Implemented a byte-based discriminator format to distinguish between data payloads (using a configurable Coder) and watermark payloads.
  • Unit Testing: Added comprehensive tests for data and watermark round-trips, including handling of terminal watermarks and error cases for unknown tags.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces KStreamsPayloadSerde and its corresponding unit tests to support the serialization and deserialization of KStreamsPayload (both data and watermark variants) across Kafka topic boundaries. The review feedback suggests handling null inputs gracefully in both the serializer and deserializer to prevent potential NullPointerExceptions, which aligns with standard Kafka serialization practices.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +77 to +79
public byte[] serialize(String topic, KStreamsPayload<T> payload) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Kafka, serializers should handle null payloads gracefully by returning null (representing a tombstone or empty payload). Currently, passing a null payload will cause a NullPointerException when calling payload.isData().

    @Override
    public byte[] serialize(String topic, KStreamsPayload<T> payload) {
      if (payload == null) {
        return null;
      }
      ByteArrayOutputStream out = new ByteArrayOutputStream();

Comment on lines +100 to +102
@Override
public KStreamsPayload<T> deserialize(String topic, byte[] bytes) {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Kafka, deserializers should handle null bytes gracefully by returning null. Currently, passing null bytes to ByteArrayInputStream will cause a NullPointerException.

    @Override
    public KStreamsPayload<T> deserialize(String topic, byte[] bytes) {
      if (bytes == null) {
        return null;
      }
      ByteArrayInputStream in = new ByteArrayInputStream(bytes);

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @Abacn added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@je-ik je-ik left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of moving to protobuf serialization. Something like
message KafkaStreamsPayload {
enum Type {
UNKNOWN = 0;
WATERMARK = 1;
DATA = 2;
}
message WatermarkPaylaod {
uint64 millis = 1;
uint32 sourcePartition = 2;
uint32 totalPartitions = 3;
}
message DataPayload {
bytes payload = 1;
}
Type type = 1;
oneof payload {
WatermarkPayload watermark = 1;
DataPayload data = 2;
}
}

DataOutputStream dataOut = new DataOutputStream(out);
dataOut.writeByte(WATERMARK_TAG);
dataOut.writeLong(watermark.getWatermarkMillis());
dataOut.writeInt(watermark.getSourcePartition());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be probably better to use protocol buffers for the serialization. It naturally supports schema evolution in a compatible way and also optimized payload sizes through varint/varlong encoders.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done switched to a protobuf KafkaStreamsPayload message, wired via applyGrpcNature (you were right, it's additive over applyJavaNature so we keep spotbugs + nullness). Two small tweaks to the schema: dropped the Type enum since the oneof case already discriminates and it collided with the oneof on field 1, and used sint64 for millis instead of uint64 since Beam event times can be negative (TIMESTAMP_MIN_VALUE). Let me know if you'd rather keep Type explicit.

Per @je-ik review: replace the hand-rolled byte format with a protobuf
KafkaStreamsPayload message, for compatible schema evolution and compact
varint encoding. Proto codegen is wired via applyGrpcNature (additive over
applyJavaNature, so spotbugs and nullness stay on); the generated message is
excluded from the nullness checker via generatedClassPatterns, and the
unvendored com.google.protobuf import is allowed for the serde via the
checkstyle suppressions file.

Two notes on the schema vs the sketch: the redundant Type enum is dropped
(the oneof case already discriminates data vs watermark, and it collided with
the oneof on field number 1), and the watermark millis field is sint64 rather
than uint64 because Beam event times can be negative (TIMESTAMP_MIN_VALUE).

Refs apache#18479
<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*runners.*kafka-streams.*KStreamsPayloadSerde\.java" />

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if this is actually correct. Is there a chance we could use trhe vendored version? If anything, we probbaly might relocate the final classes. It might not be critical, because - because we are using the FnAPI the classpath will be physically separated between user code (SDK) and the runner. But it would feel 'safer' if we could use the vendored version.

Per @je-ik review: use the vendored protobuf rather than raw com.google.protobuf
(which dropped the checkstyle suppression). protoc emits raw com.google.protobuf
and the relocation to the vendored package only happens at shade time, so the
vendored classes have to come from a separately-shaded module. Move the proto
into a small portability-nature sub-module (:runners:kafka-streams:proto, same
pattern as the dataflow windmill proto module); the runner depends on its shaded
output and the serde imports the vendored
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString. This keeps
the main runner on applyJavaNature (spotbugs stays on) and removes the
suppressions.xml entry.

Refs apache#18479
@junaiddshaukat

Copy link
Copy Markdown
Contributor Author

I switched to the vendored protobuf. Moved the proto into a small portability-nature sub-module (:runners:kafka-streams:proto, same pattern as the dataflow windmill proto module), so its shaded output uses the vendored protobuf; the serde now imports org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString and I dropped the suppressions.xml entry. The main runner stays on applyJavaNature so spotbugs is still on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants