Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
73faa68
[Dataflow Streaming] [Multi Key] StreamingModeExecutionContext refact…
arunpandianp Jun 4, 2026
53bc9a6
trigger postsubmit tests
arunpandianp Jun 4, 2026
7720cf8
fix tests
arunpandianp Jun 4, 2026
72740d2
fix tests
arunpandianp Jun 4, 2026
0f96e72
improve work synchronization
arunpandianp Jun 4, 2026
51b9257
cleanup logic
arunpandianp Jun 5, 2026
9a4e7be
cleanup logic
arunpandianp Jun 5, 2026
3f36afd
address comments
arunpandianp Jun 8, 2026
f3cc628
improve WindowingWindmillReader
arunpandianp Jun 8, 2026
58e0ef9
spotless fix
arunpandianp Jun 8, 2026
3dceab0
[Dataflow Streaming] Fix nullness supression in StreamingModeExecutio…
arunpandianp Jun 8, 2026
e199438
make windmillTagEncoding final
arunpandianp Jun 8, 2026
700dfbc
address comments
arunpandianp Jun 8, 2026
bc5bee2
Move SideInputStateFetcherFactory from start to constructor
arunpandianp Jun 8, 2026
6d7f28e
Merge branch 'contextnullness' into multikey_context_review
arunpandianp Jun 8, 2026
c4a52c1
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 8, 2026
1107a60
Merge beam/master into multikey_context_review
arunpandianp Jun 8, 2026
47eb7d6
Address comment
arunpandianp Jun 8, 2026
24505dd
Address comment
arunpandianp Jun 8, 2026
4e0d174
Fix UnderInitialization
arunpandianp Jun 9, 2026
96abcca
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 15, 2026
15e8d0a
address comments
arunpandianp Jun 17, 2026
996f561
address comments
arunpandianp Jun 30, 2026
6b3218b
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 30, 2026
a51c2f7
fix test
arunpandianp Jun 30, 2026
3036b54
address comments
arunpandianp Jun 30, 2026
5557cbb
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
public abstract class WindmillReaderIteratorBase<T>
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
private final StreamingModeExecutionContext context;
private final Windmill.WorkItem work;
private int bundleIndex = 0;
private int messageIndex = -1;
private Windmill.WorkItem work;
private int bundleIndex;
private int messageIndex;
private @Nullable WindowedValue<T> current = null;
private final ValueProvider<Boolean> skipUndecodableElements;
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
Expand All @@ -47,6 +47,8 @@ protected WindmillReaderIteratorBase(
this.context = context;
this.skipUndecodableElements = skipUndecodableElements;
this.work = context.getWorkItem();
this.bundleIndex = 0;
this.messageIndex = -1;
}

@Override
Expand All @@ -57,22 +59,33 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
throw new WorkItemCancelledException(checkNotNull(context.getWorkItem()).getShardingKey());
}

while (true) {
if (bundleIndex >= work.getMessageBundlesCount()) {
current = null;
// If elements are exhausted, try advancing the execution context to the next key in the
// group
context.finishKey();
if (context.advance()) {
// Transition succeeded! Update iterator references to the new work item
resetWorkFromContext();
continue;
}

// All work items are exhausted.
current = null;
return false;
}

Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
++messageIndex;
if (messageIndex >= bundle.getMessagesCount()) {
messageIndex = -1;
++bundleIndex;
continue;
}

try {
current = checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
return true;
Expand All @@ -91,6 +104,12 @@ public boolean advance() throws IOException {
}
}

private void resetWorkFromContext() {
this.work = context.getWorkItem();
this.bundleIndex = 0;
this.messageIndex = -1;
}

protected abstract WindowedValue<T> decodeMessage(Windmill.Message message) throws IOException;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -49,7 +48,6 @@
@Internal
class WindowingWindmillReader<K, T> extends NativeReader<WindowedValue<KeyedWorkItem<K, T>>> {

private final Coder<K> keyCoder;
private final Coder<T> valueCoder;
private final Coder<? extends BoundedWindow> windowCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
Expand All @@ -66,7 +64,6 @@ class WindowingWindmillReader<K, T> extends NativeReader<WindowedValue<KeyedWork
this.windowCoder = inputCoder.getWindowCoder();
WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T> keyedWorkItemCoder =
(WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T>) inputCoder.getValueCoder();
this.keyCoder = keyedWorkItemCoder.getKeyCoder();
this.valueCoder = keyedWorkItemCoder.getElementCoder();
this.context = context;
this.skipUndecodableElements = skipUndecodableElements;
Expand Down Expand Up @@ -129,73 +126,77 @@ public static <K, T> WindowingWindmillReader<K, T> create(
return new WindowingWindmillReader<>(coder, context, skipUndecodableElements);
}

private KeyedWorkItem<K, T> createKeyedWorkItem() {
@SuppressWarnings("unchecked")
@Nullable
K key = (K) context.getKey();
return new WindmillKeyedWorkItem<>(
key,
context.getWorkItem(),
windowCoder,
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
context.getDrainMode(),
skipUndecodableElements.isAccessible()
&& Boolean.TRUE.equals(skipUndecodableElements.get()));
}

private boolean isEmpty(KeyedWorkItem<K, T> keyedWorkItem) {
return Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable());
}

@Override
public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throws IOException {
final K key =
keyCoder.decode(
checkStateNotNull(context.getSerializedKey()).newInput(), Coder.Context.OUTER);
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(
key,
workItem,
windowCoder,
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
context.getDrainMode(),
skipUndecodableElements.isAccessible()
&& Boolean.TRUE.equals(skipUndecodableElements.get()));
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
final WindowedValue<KeyedWorkItem<K, T>> value = new ValueInEmptyWindows<>(keyedWorkItem);

// Return a noop iterator when current workitem is an empty workitem.
if (isEmptyWorkItem) {
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
@Override
public boolean start() throws IOException {
context.finishKey();
return false;
}

@Override
public boolean advance() throws IOException {
return false;
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;

@Override
public boolean start() throws IOException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just implement start() and advance() with a helper method taking a bool on whether or not to advance initially? seems safer if there is more setup/checking added before processing an item

if (context.workIsFailed()) {
throw new WorkItemCancelledException(
checkStateNotNull(context.getWorkItem()).getShardingKey());
}

@Override
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
throw new NoSuchElementException();
KeyedWorkItem<K, T> firstKeyedWorkItem = createKeyedWorkItem();
if (isEmpty(firstKeyedWorkItem)) {
return advance(); // Try to transition immediately if the first key is empty!
}
};
} else {
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;

@Override
public boolean start() throws IOException {
current = value;
return true;
current = new ValueInEmptyWindows<>(firstKeyedWorkItem);
return true;
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
throw new WorkItemCancelledException(
checkStateNotNull(context.getWorkItem()).getShardingKey());
}

@Override
public boolean advance() throws IOException {
current = null;
while (true) {
context.finishKey();
if (context.advance()) {
KeyedWorkItem<K, T> newKeyedWorkItem = createKeyedWorkItem();
if (isEmpty(newKeyedWorkItem)) {
continue;
}
current = new ValueInEmptyWindows<>(newKeyedWorkItem);
return true;
}

current = null;
return false;
}
}

@Override
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
if (current == null) {
throw new NoSuchElementException();
}
return value;
@Override
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
if (current == null) {
throw new NoSuchElementException();
}
};
}
return current;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,16 @@
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;
import java.util.HashMap;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.KeyTransitionListener;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +53,7 @@ public static ComputationWorkExecutor.Builder builder() {

public abstract DataflowWorkExecutor workExecutor();

public abstract StreamingModeExecutionContext context();
abstract StreamingModeExecutionContext context();

public abstract Optional<Coder<?>> keyCoder();

Expand All @@ -67,15 +62,24 @@ public static ComputationWorkExecutor.Builder builder() {
/**
* Executes DoFns for the Work. Blocks the calling thread until DoFn(s) have completed execution.
*/
public final void executeWork(
@Nullable Object key,
public final StreamingModeExecutionContext executeWork(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
BoundedQueueExecutor workQueueExecutor,
BoundedQueueExecutorWorkHandle budgetHandle,
KeyTransitionListener keyTransitionListener)
throws Exception {
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
context()
.start(
work,
stateReader,
workExecutor(),
workQueueExecutor,
budgetHandle,
keyCoder().orElse(null),
keyTransitionListener);
workExecutor().execute();
return context();
}

/**
Expand All @@ -84,25 +88,14 @@ public final void executeWork(
*/
public final void invalidate() {
context().invalidateCache();
context().reset();
try {
workExecutor().close();
} catch (Exception e) {
LOG.warn("Failed to close map task executor: ", e);
}
}

public final long computeSourceBytesProcessed(String sourceBytesCounterName) {
HashMap<String, ElementCounter> counters =
((DataflowMapTaskExecutor) workExecutor())
.getReadOperation()
.receivers[0]
.getOutputCounters();

return Optional.ofNullable(counters.get(sourceBytesCounterName))
.map(counter -> ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset())
.orElse(0L);
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setWorkExecutor(DataflowWorkExecutor workExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -82,6 +84,8 @@ public final class Work implements RefreshableWork {
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";
private final AtomicReference<@Nullable AtomicBoolean> onFailureListener =
Comment thread
scwhittle marked this conversation as resolved.
new AtomicReference<>(null);
private final boolean drainMode;

private Work(
Expand Down Expand Up @@ -191,6 +195,10 @@ public long getSerializedWorkItemSize() {
return serializedWorkItemSize;
}

public String getComputationId() {
return processingContext.computationId();
}

@Override
public ShardedKey getShardedKey() {
return shardedKey;
Expand Down Expand Up @@ -244,6 +252,19 @@ public void setProcessingThreadName(String processingThreadName) {
@Override
public void setFailed() {
this.isFailed = true;
AtomicBoolean listener = onFailureListener.get();
if (listener != null) {
listener.set(true);
}
}

// Sets the passed in boolean to true if the work fails
// Supports registering only one boolean at a time.
public void setOnFailureListener(@Nullable AtomicBoolean listener) {
onFailureListener.set(listener);
if (isFailed && listener != null) {
Comment thread
arunpandianp marked this conversation as resolved.
listener.set(true);
}
}

public boolean isCommitPending() {
Expand All @@ -268,6 +289,10 @@ public void queueCommit(WorkItemCommitRequest commitRequest, ComputationState co
processingContext.workCommitter().accept(Commit.create(commitRequest, computationState, this));
}

public Consumer<Commit> workCommitter() {
return processingContext.workCommitter();
}

public WindmillStateReader createWindmillStateReader() {
return WindmillStateReader.forWork(this);
}
Expand Down Expand Up @@ -390,10 +415,6 @@ private boolean isCommitPending() {
abstract Instant startTime();
}

public String getComputationId() {
return processingContext.computationId();
}

public KeyGroup getKeyGroup() {
return keyGroup;
}
Expand Down
Loading
Loading