Skip to content

Commit 5e65d47

Browse files
[GSoC 2026] Kafka Streams runner #39141: Add GroupByKey (GlobalWindow, fire at watermark)
1 parent 3e963a6 commit 5e65d47

7 files changed

Lines changed: 667 additions & 8 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.kafka.streams.translation;
19+
20+
import java.util.Collections;
21+
import java.util.HashSet;
22+
import java.util.Optional;
23+
import java.util.Set;
24+
import org.apache.kafka.common.utils.Utils;
25+
import org.apache.kafka.streams.processor.StreamPartitioner;
26+
27+
/**
28+
* Partitions records on the GroupByKey repartition topic.
29+
*
30+
* <ul>
31+
* <li><b>data</b> records go to the single partition selected by hashing the (already encoded
32+
* Beam key) Kafka record key — the same scheme Kafka's default partitioner uses — so every
33+
* value of a key lands together;
34+
* <li><b>watermark</b> reports are broadcast to <i>every</i> partition, so each downstream
35+
* GroupByKey task observes the terminal watermark and fires its keys.
36+
* </ul>
37+
*
38+
* @param <T> the data element type carried by data payloads
39+
*/
40+
class GroupByKeyBroadcastPartitioner<T> implements StreamPartitioner<byte[], KStreamsPayload<T>> {
41+
42+
@Override
43+
public Integer partition(String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) {
44+
// Required by the interface but unused: Kafka Streams calls partitions() (overridden below)
45+
// when it is present. Kept consistent with the data-hash path for safety.
46+
return key == null ? 0 : Utils.toPositive(Utils.murmur2(key)) % numPartitions;
47+
}
48+
49+
@Override
50+
public Optional<Set<Integer>> partitions(
51+
String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) {
52+
if (value.isWatermark()) {
53+
Set<Integer> all = new HashSet<>();
54+
for (int partition = 0; partition < numPartitions; partition++) {
55+
all.add(partition);
56+
}
57+
return Optional.of(all);
58+
}
59+
int partition = Utils.toPositive(Utils.murmur2(key)) % numPartitions;
60+
return Optional.of(Collections.singleton(partition));
61+
}
62+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.kafka.streams.translation;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.apache.beam.sdk.coders.Coder;
23+
import org.apache.beam.sdk.coders.CoderException;
24+
import org.apache.beam.sdk.coders.IterableCoder;
25+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
26+
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
27+
import org.apache.beam.sdk.util.CoderUtils;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.apache.beam.sdk.values.WindowedValue;
30+
import org.apache.beam.sdk.values.WindowedValues;
31+
import org.apache.kafka.streams.processor.api.Processor;
32+
import org.apache.kafka.streams.processor.api.ProcessorContext;
33+
import org.apache.kafka.streams.processor.api.Record;
34+
import org.apache.kafka.streams.state.KeyValueIterator;
35+
import org.apache.kafka.streams.state.KeyValueStore;
36+
import org.checkerframework.checker.nullness.qual.Nullable;
37+
import org.joda.time.Instant;
38+
39+
/**
40+
* Executes a {@code GroupByKey} (GlobalWindow, default trigger, no allowed lateness).
41+
*
42+
* <p>Records arrive on the repartition topic keyed by the encoded Beam key, so every value of a key
43+
* is co-located here. Each value is appended to a per-key buffer in a Kafka Streams state store.
44+
* Watermark reports are fed to a {@link WatermarkManager}; when the input watermark reaches {@link
45+
* BoundedWindow#TIMESTAMP_MAX_VALUE} (the end of the global window) every buffered key is emitted
46+
* once as {@code KV<K, Iterable<V>>} and the buffer cleared, then the watermark is forwarded
47+
* downstream.
48+
*
49+
* <p>Buffering whole value lists and re-encoding on each append is O(n^2) per key; fine for this
50+
* first GroupByKey, and replaced when this moves to runner-core {@code GroupAlsoByWindow}.
51+
*/
52+
class GroupByKeyProcessor
53+
implements Processor<byte[], KStreamsPayload<?>, byte[], KStreamsPayload<?>> {
54+
55+
private final String stateStoreName;
56+
private final Coder<Object> keyCoder;
57+
private final IterableCoder<@Nullable Object> bufferCoder;
58+
59+
private final WatermarkManager watermarkManager = new WatermarkManager();
60+
private Instant lastForwardedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
61+
// The global window fires exactly once, when the watermark first reaches its end. Later watermark
62+
// reports (e.g. the same terminal watermark broadcast across repartition partitions) must not
63+
// re-fire. This flag is in-memory only; restart correctness comes from the state store plus
64+
// exactly-once-v2: the buffered values and consumer offsets are committed atomically, and the
65+
// store is empty once a key has fired, so a restart cannot double-emit. Persisting watermark
66+
// holds is part of the separate WatermarkManager persistence work, not this initial GroupByKey.
67+
private boolean fired = false;
68+
69+
private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context;
70+
private @Nullable KeyValueStore<byte[], byte[]> store;
71+
72+
GroupByKeyProcessor(
73+
String stateStoreName, Coder<Object> keyCoder, Coder<@Nullable Object> valueCoder) {
74+
this.stateStoreName = stateStoreName;
75+
this.keyCoder = keyCoder;
76+
this.bufferCoder = IterableCoder.of(valueCoder);
77+
}
78+
79+
@Override
80+
public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
81+
this.context = context;
82+
this.store = context.getStateStore(stateStoreName);
83+
}
84+
85+
@Override
86+
public void process(Record<byte[], KStreamsPayload<?>> record) {
87+
KStreamsPayload<?> payload = record.value();
88+
if (payload.isData()) {
89+
byte[] encodedKey = record.key();
90+
Object element = payload.getData().getValue();
91+
if (encodedKey == null || element == null) {
92+
throw new IllegalStateException("GroupByKey data record is missing its key or value");
93+
}
94+
appendValue(encodedKey, element);
95+
return;
96+
}
97+
WatermarkPayload report = payload.asWatermark();
98+
watermarkManager.observe(
99+
report.getSourcePartition(),
100+
new Instant(report.getWatermarkMillis()),
101+
report.getTotalSourcePartitions());
102+
Instant advanced = watermarkManager.advance();
103+
if (!fired && !advanced.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
104+
fireAll(record);
105+
fired = true;
106+
}
107+
if (advanced.isAfter(lastForwardedWatermark)) {
108+
lastForwardedWatermark = advanced;
109+
forwardWatermark(record, advanced.getMillis());
110+
}
111+
}
112+
113+
private void appendValue(byte[] encodedKey, Object kvObject) {
114+
KV<?, ?> kv = (KV<?, ?>) kvObject;
115+
KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store);
116+
byte[] existing = kvStore.get(encodedKey);
117+
List<@Nullable Object> values = existing == null ? new ArrayList<>() : decodeBuffer(existing);
118+
values.add(kv.getValue());
119+
kvStore.put(encodedKey, encodeBuffer(values));
120+
}
121+
122+
private void fireAll(Record<byte[], KStreamsPayload<?>> trigger) {
123+
// NOTE: this emits every buffered key in a single watermark turn. For a very large key space
124+
// that risks memory pressure and exceeding the poll / transaction timeout. Acceptable for this
125+
// initial GlobalWindow GroupByKey (fire once at end of input); incremental, timer-driven output
126+
// via runner-core GroupAlsoByWindow lands with the windowing/timers work.
127+
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
128+
KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store);
129+
List<byte[]> firedKeys = new ArrayList<>();
130+
try (KeyValueIterator<byte[], byte[]> it = kvStore.all()) {
131+
while (it.hasNext()) {
132+
org.apache.kafka.streams.KeyValue<byte[], byte[]> entry = it.next();
133+
Object key = decodeKey(entry.key);
134+
List<@Nullable Object> values = decodeBuffer(entry.value);
135+
// The pane fires at the end of the global window, so the grouped element carries the
136+
// window's max timestamp (END_OF_GLOBAL_WINDOW). Emitting at TIMESTAMP_MIN_VALUE (the
137+
// default of valueInGlobalWindow) would make the output appear arbitrarily late and be
138+
// dropped downstream once the watermark has advanced.
139+
WindowedValue<KV<Object, Iterable<@Nullable Object>>> output =
140+
WindowedValues.timestampedValueInGlobalWindow(
141+
KV.of(key, (Iterable<@Nullable Object>) values),
142+
GlobalWindow.INSTANCE.maxTimestamp());
143+
ctx.forward(
144+
new Record<byte[], KStreamsPayload<?>>(
145+
entry.key, KStreamsPayload.data(output), trigger.timestamp()));
146+
firedKeys.add(entry.key);
147+
}
148+
}
149+
for (byte[] key : firedKeys) {
150+
kvStore.delete(key);
151+
}
152+
}
153+
154+
private void forwardWatermark(Record<byte[], KStreamsPayload<?>> trigger, long watermarkMillis) {
155+
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
156+
// GroupByKey is a single logical source for the next stage; report it as partition 0 of 1.
157+
ctx.forward(
158+
new Record<byte[], KStreamsPayload<?>>(
159+
trigger.key(), KStreamsPayload.watermark(watermarkMillis, 0, 1), trigger.timestamp()));
160+
}
161+
162+
private byte[] encodeBuffer(List<@Nullable Object> values) {
163+
try {
164+
return CoderUtils.encodeToByteArray(bufferCoder, values);
165+
} catch (CoderException e) {
166+
throw new RuntimeException("Failed to encode GroupByKey value buffer", e);
167+
}
168+
}
169+
170+
private List<@Nullable Object> decodeBuffer(byte[] bytes) {
171+
try {
172+
List<@Nullable Object> values = new ArrayList<>();
173+
for (@Nullable Object value : CoderUtils.decodeFromByteArray(bufferCoder, bytes)) {
174+
values.add(value);
175+
}
176+
return values;
177+
} catch (CoderException e) {
178+
throw new RuntimeException("Failed to decode GroupByKey value buffer", e);
179+
}
180+
}
181+
182+
private Object decodeKey(byte[] bytes) {
183+
try {
184+
return CoderUtils.decodeFromByteArray(keyCoder, bytes);
185+
} catch (CoderException e) {
186+
throw new RuntimeException("Failed to decode GroupByKey key", e);
187+
}
188+
}
189+
190+
private static <T> T checkInitialized(@Nullable T value) {
191+
if (value == null) {
192+
throw new IllegalStateException("GroupByKeyProcessor used before init()");
193+
}
194+
return value;
195+
}
196+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.kafka.streams.translation;
19+
20+
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
21+
22+
import org.apache.beam.model.pipeline.v1.RunnerApi;
23+
import org.apache.beam.sdk.coders.Coder;
24+
import org.apache.beam.sdk.coders.KvCoder;
25+
import org.apache.beam.sdk.values.KV;
26+
import org.apache.beam.sdk.values.WindowedValues;
27+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
28+
import org.apache.kafka.common.serialization.Serdes;
29+
import org.apache.kafka.streams.Topology;
30+
import org.apache.kafka.streams.state.Stores;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
32+
33+
/**
34+
* Translates the {@code beam:transform:group_by_key:v1} URN — the runner's first stateful,
35+
* shuffle-bearing transform.
36+
*
37+
* <p>This is the simplest GroupByKey: GlobalWindow, default trigger, no allowed lateness (per the
38+
* plan agreed with the mentor). Each key's values are buffered in a Kafka Streams state store and
39+
* emitted once as {@code KV<K, Iterable<V>>} when the watermark reaches {@link
40+
* org.apache.beam.sdk.transforms.windowing.BoundedWindow#TIMESTAMP_MAX_VALUE}.
41+
*
42+
* <p>Topology added (the Beam key becomes the Kafka record key so Kafka Streams shuffles by it):
43+
*
44+
* <ul>
45+
* <li>a {@link ShuffleByKeyProcessor} wired to the input's producer, which sets the Kafka record
46+
* key to the encoded Beam key for data records and passes watermark reports through;
47+
* <li>a {@link Topology#addSink sink} to an internal repartition topic, with the payload encoded
48+
* via {@link KStreamsPayloadSerde} and a {@link GroupByKeyBroadcastPartitioner} that hashes
49+
* data by key and fans watermark reports out to every partition;
50+
* <li>a {@link Topology#addSource source} reading the repartition topic back;
51+
* <li>the {@link GroupByKeyProcessor} plus a persistent state store, wired to the source.
52+
* </ul>
53+
*
54+
* <p>The repartition topic is expected to exist on the broker before the job starts (same
55+
* pre-create assumption as the Impulse bootstrap topic); auto-creation lands with the AdminClient
56+
* wiring in a follow-up.
57+
*/
58+
class GroupByKeyTranslator implements PTransformTranslator {
59+
60+
static final String SHUFFLE_SUFFIX = "-shuffle-by-key";
61+
static final String SINK_SUFFIX = "-repartition-sink";
62+
static final String SOURCE_SUFFIX = "-repartition-source";
63+
static final String STATE_STORE_SUFFIX = "-state";
64+
static final String REPARTITION_TOPIC_PREFIX = "__beam_gbk_";
65+
66+
@Override
67+
public void translate(
68+
String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
69+
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
70+
String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
71+
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
72+
73+
@SuppressWarnings({"unchecked", "rawtypes"})
74+
WindowedValues.WindowedValueCoder<KV<Object, Object>> inputCoder =
75+
(WindowedValues.WindowedValueCoder)
76+
instantiateCoder(inputPCollectionId, pipeline.getComponents());
77+
KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) inputCoder.getValueCoder();
78+
Coder<Object> keyCoder = kvCoder.getKeyCoder();
79+
// User values may be null; the checker tracks that through to the buffered iterables.
80+
@SuppressWarnings("unchecked")
81+
Coder<@Nullable Object> valueCoder =
82+
(Coder<@Nullable Object>) (Coder<?>) kvCoder.getValueCoder();
83+
84+
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
85+
86+
String shuffleName = transformId + SHUFFLE_SUFFIX;
87+
String sinkName = transformId + SINK_SUFFIX;
88+
String sourceName = transformId + SOURCE_SUFFIX;
89+
String stateStoreName = transformId + STATE_STORE_SUFFIX;
90+
String repartitionTopic = repartitionTopic(transformId);
91+
92+
KStreamsPayloadSerde<KV<Object, Object>> payloadSerde = new KStreamsPayloadSerde<>(inputCoder);
93+
94+
Topology topology = context.getTopology();
95+
96+
// Re-key data records by the encoded Beam key; pass watermark reports through.
97+
topology.addProcessor(shuffleName, () -> new ShuffleByKeyProcessor(keyCoder), parentProcessor);
98+
99+
// Shuffle through the repartition topic: data partitioned by key, watermark broadcast.
100+
topology.addSink(
101+
sinkName,
102+
repartitionTopic,
103+
Serdes.ByteArray().serializer(),
104+
payloadSerde.serializer(),
105+
new GroupByKeyBroadcastPartitioner<>(),
106+
shuffleName);
107+
topology.addSource(
108+
sourceName,
109+
Serdes.ByteArray().deserializer(),
110+
payloadSerde.deserializer(),
111+
repartitionTopic);
112+
113+
// Buffer values per key and fire KV<K, Iterable<V>> at the terminal watermark.
114+
topology.addProcessor(
115+
transformId,
116+
() -> new GroupByKeyProcessor(stateStoreName, keyCoder, valueCoder),
117+
sourceName);
118+
topology.addStateStore(
119+
Stores.keyValueStoreBuilder(
120+
Stores.persistentKeyValueStore(stateStoreName), Serdes.ByteArray(), Serdes.ByteArray()),
121+
transformId);
122+
123+
context.registerPCollectionProducer(outputPCollectionId, transformId);
124+
}
125+
126+
/** The internal repartition topic name for a GroupByKey transform. */
127+
static String repartitionTopic(String transformId) {
128+
return REPARTITION_TOPIC_PREFIX + transformId.replaceAll("[^a-zA-Z0-9._-]", "_");
129+
}
130+
}

runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public KafkaStreamsPipelineTranslator() {
5151
ImmutableMap.<String, PTransformTranslator>builder()
5252
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
5353
.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new RedistributeTranslator())
54+
.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
5455
.put(ExecutableStage.URN, new ExecutableStageTranslator())
5556
.build());
5657
}

0 commit comments

Comments
 (0)