Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/kafka-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
permitUnusedDeclared project(":sdks:java:build-tools")

implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:kafka-streams:proto", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(":runners:core-java")
Expand Down
34 changes: 34 additions & 0 deletions runners/kafka-streams/proto/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }

// Portability nature compiles the .proto against the vendored gRPC/protobuf (relocated to
// org.apache.beam.vendor.grpc...), so consumers use the vendored protobuf runtime — no raw
// com.google.protobuf on their classpath. Same pattern as the dataflow windmill proto module.
applyPortabilityNature(
publish: false,
shadowJarValidationExcludes: ["org/apache/beam/runners/kafka/streams/v1/**"],
archivesBaseName: 'beam-runners-kafka-streams-proto',
generatedClassPatterns: [
/^org\.apache\.beam\.runners\.kafka\.streams\.v1.*/
]
)

description = "Apache Beam :: Runners :: Kafka Streams :: Proto"
ext.summary = "Kafka Streams runner control-message protos"
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";

package org.apache.beam.runners.kafka.streams.v1;

option java_package = "org.apache.beam.runners.kafka.streams.v1";
option java_outer_classname = "KafkaStreamsPayloadProtos";

// On-wire form of the in-JVM KStreamsPayload envelope, used when the payload must cross a Kafka
// topic boundary (e.g. the GroupByKey repartition topic and the watermark fan-out). Protobuf is
// used for compatible schema evolution and compact varint encoding.
message KafkaStreamsPayload {
// A watermark report: the watermark plus the in-band coordination fields the downstream
// WatermarkManager needs.
message WatermarkPayload {
// Event-time watermark in milliseconds. Signed (sint64, zigzag-encoded) because Beam event
// times can be negative, e.g. BoundedWindow.TIMESTAMP_MIN_VALUE.
sint64 millis = 1;
// The source partition this report is for.
uint32 source_partition = 2;
// The total number of source partitions feeding the downstream stage.
uint32 total_partitions = 3;
}

// A data element: the Beam WindowedValue encoded with the PCollection's windowed-value coder.
message DataPayload {
bytes value = 1;
}

// Exactly one variant is set; the oneof case discriminates data vs watermark.
oneof payload {
WatermarkPayload watermark = 1;
DataPayload data = 2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.runners.kafka.streams.v1.KafkaStreamsPayloadProtos.KafkaStreamsPayload;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* Kafka {@link Serde} for {@link KStreamsPayload}, enabling the envelope to cross topic boundaries
* (e.g. the repartition topic a {@code GroupByKey} introduces). Until now {@link KStreamsPayload}
* only flowed in-JVM via {@code ProcessorContext#forward}, so no serialization was needed.
*
* <p>The wire form is the {@link KafkaStreamsPayload} protobuf message — protobuf gives compatible
* schema evolution and compact varint encoding. The data variant carries the {@link WindowedValue}
* encoded with the {@link Coder} supplied for the topic's PCollection; the watermark variant
* carries the coder-independent watermark report. A {@link KStreamsPayloadSerde} is therefore
* parameterized by the data {@link Coder} (different topics carry different element types).
*
* <p>The serde assumes non-null payloads: the topics it is used on (repartition and watermark
* fan-out) are not log-compacted, so no tombstone (null-valued) records occur.
*
* @param <T> the data element type carried by data payloads on this topic
*/
public final class KStreamsPayloadSerde<T> implements Serde<KStreamsPayload<T>> {

private final Coder<WindowedValue<T>> dataCoder;

public KStreamsPayloadSerde(Coder<WindowedValue<T>> dataCoder) {
this.dataCoder = dataCoder;
}

@Override
public Serializer<KStreamsPayload<T>> serializer() {
return new PayloadSerializer();
}

@Override
public Deserializer<KStreamsPayload<T>> deserializer() {
return new PayloadDeserializer();
}

private final class PayloadSerializer implements Serializer<KStreamsPayload<T>> {
@Override
public byte[] serialize(String topic, KStreamsPayload<T> payload) {
KafkaStreamsPayload.Builder proto = KafkaStreamsPayload.newBuilder();
if (payload.isData()) {
ByteArrayOutputStream encoded = new ByteArrayOutputStream();
try {
dataCoder.encode(payload.getData(), encoded);
} catch (IOException e) {
throw new SerializationException("Failed to encode KStreamsPayload data element", e);
}
proto.setData(
KafkaStreamsPayload.DataPayload.newBuilder()
.setValue(ByteString.copyFrom(encoded.toByteArray())));
} else {
WatermarkPayload watermark = payload.asWatermark();
proto.setWatermark(
KafkaStreamsPayload.WatermarkPayload.newBuilder()
.setMillis(watermark.getWatermarkMillis())
.setSourcePartition(watermark.getSourcePartition())
.setTotalPartitions(watermark.getTotalSourcePartitions()));
}
return proto.build().toByteArray();
}
}

private final class PayloadDeserializer implements Deserializer<KStreamsPayload<T>> {
@Override
public KStreamsPayload<T> deserialize(String topic, byte[] bytes) {
KafkaStreamsPayload proto;
try {
proto = KafkaStreamsPayload.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Failed to parse KStreamsPayload", e);
}
switch (proto.getPayloadCase()) {
case DATA:
try {
return KStreamsPayload.data(dataCoder.decode(proto.getData().getValue().newInput()));
} catch (IOException e) {
throw new SerializationException("Failed to decode KStreamsPayload data element", e);
}
case WATERMARK:
KafkaStreamsPayload.WatermarkPayload watermark = proto.getWatermark();
return KStreamsPayload.watermark(
watermark.getMillis(),
watermark.getSourcePartition(),
watermark.getTotalPartitions());
case PAYLOAD_NOT_SET:
default:
throw new SerializationException(
"KStreamsPayload has no payload variant set: " + proto.getPayloadCase());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Test;

/** Tests for {@link KStreamsPayloadSerde}. */
public class KStreamsPayloadSerdeTest {

private static final String TOPIC = "ks-payload-serde-test";

private final Coder<WindowedValue<Integer>> dataCoder =
WindowedValues.getFullCoder(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
private final KStreamsPayloadSerde<Integer> serde = new KStreamsPayloadSerde<>(dataCoder);

private KStreamsPayload<Integer> roundTrip(KStreamsPayload<Integer> payload) {
Serializer<KStreamsPayload<Integer>> serializer = serde.serializer();
Deserializer<KStreamsPayload<Integer>> deserializer = serde.deserializer();
return deserializer.deserialize(TOPIC, serializer.serialize(TOPIC, payload));
}

@Test
public void roundTripsDataPayload() {
KStreamsPayload<Integer> payload = KStreamsPayload.data(WindowedValues.valueInGlobalWindow(42));
KStreamsPayload<Integer> out = roundTrip(payload);
assertThat(out.isData(), is(true));
assertThat(out.getData().getValue(), is(42));
assertThat(out, is(payload));
}

@Test
public void roundTripsWatermarkPayload() {
KStreamsPayload<Integer> payload = KStreamsPayload.watermark(12345L, 2, 4);
KStreamsPayload<Integer> out = roundTrip(payload);
assertThat(out.isWatermark(), is(true));
assertThat(out.asWatermark().getWatermarkMillis(), is(12345L));
assertThat(out.asWatermark().getSourcePartition(), is(2));
assertThat(out.asWatermark().getTotalSourcePartitions(), is(4));
assertThat(out, is(payload));
}

@Test
public void roundTripsTerminalMaxWatermark() {
KStreamsPayload<Integer> payload =
KStreamsPayload.watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(), 0, 1);
assertThat(
roundTrip(payload).asWatermark().getWatermarkMillis(),
is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
}

@Test
public void roundTripsNegativeWatermark() {
// Beam event times can be negative; sint64 must round-trip them losslessly.
KStreamsPayload<Integer> payload =
KStreamsPayload.watermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(), 0, 1);
assertThat(
roundTrip(payload).asWatermark().getWatermarkMillis(),
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
}

@Test
public void malformedBytesThrow() {
// 0x7f encodes field 15 with the invalid wire type 7, so protobuf parsing fails.
byte[] bogus = new byte[] {(byte) 0x7f};
assertThrows(
SerializationException.class, () -> serde.deserializer().deserialize(TOPIC, bogus));
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ include(":runners:java-fn-execution")
include(":runners:java-job-service")
include(":runners:jet")
include(":runners:kafka-streams")
include(":runners:kafka-streams:proto")
include(":runners:local-java")
include(":runners:portability:java")
include(":runners:prism")
Expand Down
Loading