Skip to content

Commit 3e963a6

Browse files
[GSoC 2026] Kafka Streams runner #39051: Add KStreamsPayload Serde for crossing topic boundaries
1 parent 1058e94 commit 3e963a6

6 files changed

Lines changed: 305 additions & 0 deletions

File tree

runners/kafka-streams/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
permitUnusedDeclared project(":sdks:java:build-tools")
4444

4545
implementation project(path: ":sdks:java:core", configuration: "shadow")
46+
implementation project(path: ":runners:kafka-streams:proto", configuration: "shadow")
4647
implementation project(path: ":model:pipeline", configuration: "shadow")
4748
implementation project(path: ":model:job-management", configuration: "shadow")
4849
implementation project(":runners:core-java")
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
plugins { id 'org.apache.beam.module' }
20+
21+
// Portability nature compiles the .proto against the vendored gRPC/protobuf (relocated to
22+
// org.apache.beam.vendor.grpc...), so consumers use the vendored protobuf runtime — no raw
23+
// com.google.protobuf on their classpath. Same pattern as the dataflow windmill proto module.
24+
applyPortabilityNature(
25+
publish: false,
26+
shadowJarValidationExcludes: ["org/apache/beam/runners/kafka/streams/v1/**"],
27+
archivesBaseName: 'beam-runners-kafka-streams-proto',
28+
generatedClassPatterns: [
29+
/^org\.apache\.beam\.runners\.kafka\.streams\.v1.*/
30+
]
31+
)
32+
33+
description = "Apache Beam :: Runners :: Kafka Streams :: Proto"
34+
ext.summary = "Kafka Streams runner control-message protos"
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
syntax = "proto3";
20+
21+
package org.apache.beam.runners.kafka.streams.v1;
22+
23+
option java_package = "org.apache.beam.runners.kafka.streams.v1";
24+
option java_outer_classname = "KafkaStreamsPayloadProtos";
25+
26+
// On-wire form of the in-JVM KStreamsPayload envelope, used when the payload must cross a Kafka
27+
// topic boundary (e.g. the GroupByKey repartition topic and the watermark fan-out). Protobuf is
28+
// used for compatible schema evolution and compact varint encoding.
29+
message KafkaStreamsPayload {
30+
// A watermark report: the watermark plus the in-band coordination fields the downstream
31+
// WatermarkManager needs.
32+
message WatermarkPayload {
33+
// Event-time watermark in milliseconds. Signed (sint64, zigzag-encoded) because Beam event
34+
// times can be negative, e.g. BoundedWindow.TIMESTAMP_MIN_VALUE.
35+
sint64 millis = 1;
36+
// The source partition this report is for.
37+
uint32 source_partition = 2;
38+
// The total number of source partitions feeding the downstream stage.
39+
uint32 total_partitions = 3;
40+
}
41+
42+
// A data element: the Beam WindowedValue encoded with the PCollection's windowed-value coder.
43+
message DataPayload {
44+
bytes value = 1;
45+
}
46+
47+
// Exactly one variant is set; the oneof case discriminates data vs watermark.
48+
oneof payload {
49+
WatermarkPayload watermark = 1;
50+
DataPayload data = 2;
51+
}
52+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.kafka.streams.translation;
19+
20+
import java.io.ByteArrayOutputStream;
21+
import java.io.IOException;
22+
import org.apache.beam.runners.kafka.streams.v1.KafkaStreamsPayloadProtos.KafkaStreamsPayload;
23+
import org.apache.beam.sdk.coders.Coder;
24+
import org.apache.beam.sdk.values.WindowedValue;
25+
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
26+
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
27+
import org.apache.kafka.common.errors.SerializationException;
28+
import org.apache.kafka.common.serialization.Deserializer;
29+
import org.apache.kafka.common.serialization.Serde;
30+
import org.apache.kafka.common.serialization.Serializer;
31+
32+
/**
33+
* Kafka {@link Serde} for {@link KStreamsPayload}, enabling the envelope to cross topic boundaries
34+
* (e.g. the repartition topic a {@code GroupByKey} introduces). Until now {@link KStreamsPayload}
35+
* only flowed in-JVM via {@code ProcessorContext#forward}, so no serialization was needed.
36+
*
37+
* <p>The wire form is the {@link KafkaStreamsPayload} protobuf message — protobuf gives compatible
38+
* schema evolution and compact varint encoding. The data variant carries the {@link WindowedValue}
39+
* encoded with the {@link Coder} supplied for the topic's PCollection; the watermark variant
40+
* carries the coder-independent watermark report. A {@link KStreamsPayloadSerde} is therefore
41+
* parameterized by the data {@link Coder} (different topics carry different element types).
42+
*
43+
* <p>The serde assumes non-null payloads: the topics it is used on (repartition and watermark
44+
* fan-out) are not log-compacted, so no tombstone (null-valued) records occur.
45+
*
46+
* @param <T> the data element type carried by data payloads on this topic
47+
*/
48+
public final class KStreamsPayloadSerde<T> implements Serde<KStreamsPayload<T>> {
49+
50+
private final Coder<WindowedValue<T>> dataCoder;
51+
52+
public KStreamsPayloadSerde(Coder<WindowedValue<T>> dataCoder) {
53+
this.dataCoder = dataCoder;
54+
}
55+
56+
@Override
57+
public Serializer<KStreamsPayload<T>> serializer() {
58+
return new PayloadSerializer();
59+
}
60+
61+
@Override
62+
public Deserializer<KStreamsPayload<T>> deserializer() {
63+
return new PayloadDeserializer();
64+
}
65+
66+
private final class PayloadSerializer implements Serializer<KStreamsPayload<T>> {
67+
@Override
68+
public byte[] serialize(String topic, KStreamsPayload<T> payload) {
69+
KafkaStreamsPayload.Builder proto = KafkaStreamsPayload.newBuilder();
70+
if (payload.isData()) {
71+
ByteArrayOutputStream encoded = new ByteArrayOutputStream();
72+
try {
73+
dataCoder.encode(payload.getData(), encoded);
74+
} catch (IOException e) {
75+
throw new SerializationException("Failed to encode KStreamsPayload data element", e);
76+
}
77+
proto.setData(
78+
KafkaStreamsPayload.DataPayload.newBuilder()
79+
.setValue(ByteString.copyFrom(encoded.toByteArray())));
80+
} else {
81+
WatermarkPayload watermark = payload.asWatermark();
82+
proto.setWatermark(
83+
KafkaStreamsPayload.WatermarkPayload.newBuilder()
84+
.setMillis(watermark.getWatermarkMillis())
85+
.setSourcePartition(watermark.getSourcePartition())
86+
.setTotalPartitions(watermark.getTotalSourcePartitions()));
87+
}
88+
return proto.build().toByteArray();
89+
}
90+
}
91+
92+
private final class PayloadDeserializer implements Deserializer<KStreamsPayload<T>> {
93+
@Override
94+
public KStreamsPayload<T> deserialize(String topic, byte[] bytes) {
95+
KafkaStreamsPayload proto;
96+
try {
97+
proto = KafkaStreamsPayload.parseFrom(bytes);
98+
} catch (InvalidProtocolBufferException e) {
99+
throw new SerializationException("Failed to parse KStreamsPayload", e);
100+
}
101+
switch (proto.getPayloadCase()) {
102+
case DATA:
103+
try {
104+
return KStreamsPayload.data(dataCoder.decode(proto.getData().getValue().newInput()));
105+
} catch (IOException e) {
106+
throw new SerializationException("Failed to decode KStreamsPayload data element", e);
107+
}
108+
case WATERMARK:
109+
KafkaStreamsPayload.WatermarkPayload watermark = proto.getWatermark();
110+
return KStreamsPayload.watermark(
111+
watermark.getMillis(),
112+
watermark.getSourcePartition(),
113+
watermark.getTotalPartitions());
114+
case PAYLOAD_NOT_SET:
115+
default:
116+
throw new SerializationException(
117+
"KStreamsPayload has no payload variant set: " + proto.getPayloadCase());
118+
}
119+
}
120+
}
121+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.kafka.streams.translation;
19+
20+
import static org.hamcrest.CoreMatchers.is;
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.Assert.assertThrows;
23+
24+
import org.apache.beam.sdk.coders.Coder;
25+
import org.apache.beam.sdk.coders.VarIntCoder;
26+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
27+
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
28+
import org.apache.beam.sdk.values.WindowedValue;
29+
import org.apache.beam.sdk.values.WindowedValues;
30+
import org.apache.kafka.common.errors.SerializationException;
31+
import org.apache.kafka.common.serialization.Deserializer;
32+
import org.apache.kafka.common.serialization.Serializer;
33+
import org.junit.Test;
34+
35+
/** Tests for {@link KStreamsPayloadSerde}. */
36+
public class KStreamsPayloadSerdeTest {
37+
38+
private static final String TOPIC = "ks-payload-serde-test";
39+
40+
private final Coder<WindowedValue<Integer>> dataCoder =
41+
WindowedValues.getFullCoder(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
42+
private final KStreamsPayloadSerde<Integer> serde = new KStreamsPayloadSerde<>(dataCoder);
43+
44+
private KStreamsPayload<Integer> roundTrip(KStreamsPayload<Integer> payload) {
45+
Serializer<KStreamsPayload<Integer>> serializer = serde.serializer();
46+
Deserializer<KStreamsPayload<Integer>> deserializer = serde.deserializer();
47+
return deserializer.deserialize(TOPIC, serializer.serialize(TOPIC, payload));
48+
}
49+
50+
@Test
51+
public void roundTripsDataPayload() {
52+
KStreamsPayload<Integer> payload = KStreamsPayload.data(WindowedValues.valueInGlobalWindow(42));
53+
KStreamsPayload<Integer> out = roundTrip(payload);
54+
assertThat(out.isData(), is(true));
55+
assertThat(out.getData().getValue(), is(42));
56+
assertThat(out, is(payload));
57+
}
58+
59+
@Test
60+
public void roundTripsWatermarkPayload() {
61+
KStreamsPayload<Integer> payload = KStreamsPayload.watermark(12345L, 2, 4);
62+
KStreamsPayload<Integer> out = roundTrip(payload);
63+
assertThat(out.isWatermark(), is(true));
64+
assertThat(out.asWatermark().getWatermarkMillis(), is(12345L));
65+
assertThat(out.asWatermark().getSourcePartition(), is(2));
66+
assertThat(out.asWatermark().getTotalSourcePartitions(), is(4));
67+
assertThat(out, is(payload));
68+
}
69+
70+
@Test
71+
public void roundTripsTerminalMaxWatermark() {
72+
KStreamsPayload<Integer> payload =
73+
KStreamsPayload.watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(), 0, 1);
74+
assertThat(
75+
roundTrip(payload).asWatermark().getWatermarkMillis(),
76+
is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
77+
}
78+
79+
@Test
80+
public void roundTripsNegativeWatermark() {
81+
// Beam event times can be negative; sint64 must round-trip them losslessly.
82+
KStreamsPayload<Integer> payload =
83+
KStreamsPayload.watermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(), 0, 1);
84+
assertThat(
85+
roundTrip(payload).asWatermark().getWatermarkMillis(),
86+
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
87+
}
88+
89+
@Test
90+
public void malformedBytesThrow() {
91+
// 0x7f encodes field 15 with the invalid wire type 7, so protobuf parsing fails.
92+
byte[] bogus = new byte[] {(byte) 0x7f};
93+
assertThrows(
94+
SerializationException.class, () -> serde.deserializer().deserialize(TOPIC, bogus));
95+
}
96+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ include(":runners:java-fn-execution")
144144
include(":runners:java-job-service")
145145
include(":runners:jet")
146146
include(":runners:kafka-streams")
147+
include(":runners:kafka-streams:proto")
147148
include(":runners:local-java")
148149
include(":runners:portability:java")
149150
include(":runners:prism")

0 commit comments

Comments
 (0)