Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5154679
[Dataflow Streaming][Multikey] Support MultiKey commits in windmill c…
arunpandianp Jun 2, 2026
73faa68
[Dataflow Streaming] [Multi Key] StreamingModeExecutionContext refact…
arunpandianp Jun 4, 2026
53bc9a6
trigger postsubmit tests
arunpandianp Jun 4, 2026
7720cf8
fix tests
arunpandianp Jun 4, 2026
72740d2
fix tests
arunpandianp Jun 4, 2026
0f96e72
improve work synchronization
arunpandianp Jun 4, 2026
51b9257
cleanup logic
arunpandianp Jun 5, 2026
9a4e7be
cleanup logic
arunpandianp Jun 5, 2026
3f36afd
address comments
arunpandianp Jun 8, 2026
f3cc628
improve WindowingWindmillReader
arunpandianp Jun 8, 2026
58e0ef9
spotless fix
arunpandianp Jun 8, 2026
3dceab0
[Dataflow Streaming] Fix nullness supression in StreamingModeExecutio…
arunpandianp Jun 8, 2026
e199438
make windmillTagEncoding final
arunpandianp Jun 8, 2026
700dfbc
address comments
arunpandianp Jun 8, 2026
bc5bee2
Move SideInputStateFetcherFactory from start to constructor
arunpandianp Jun 8, 2026
6d7f28e
Merge branch 'contextnullness' into multikey_context_review
arunpandianp Jun 8, 2026
c4a52c1
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 8, 2026
1107a60
Merge beam/master into multikey_context_review
arunpandianp Jun 8, 2026
47eb7d6
Address comment
arunpandianp Jun 8, 2026
24505dd
Address comment
arunpandianp Jun 8, 2026
4e0d174
Fix UnderInitialization
arunpandianp Jun 9, 2026
5f9fef5
Merge branch 'multikey_commit' into tmp
arunpandianp Jun 11, 2026
0e8157f
Merge branch 'multikey_context_review' into tmp
arunpandianp Jun 11, 2026
93de23a
Multikey commit failure handling and integration
arunpandianp Jun 11, 2026
9de3d84
Fix tests and add sink byte limit for batching
arunpandianp Jun 11, 2026
0df9f54
spotless
arunpandianp Jun 11, 2026
18ae844
Merge remote-tracking branch 'beam/master' into multikey_commit
arunpandianp Jun 29, 2026
ad177c0
Merge remote-tracking branch 'beam/master' into multikey_failure
arunpandianp Jun 30, 2026
557f95e
Merge branch 'multikey_commit' into multikey_failure
arunpandianp Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ boolean isSinkFullHintSet() {
// the state size might grow unbounded.
}

protected final long getBytesSinked() {
return bytesSinked;
}

/**
* Sets a flag to indicate that a sink has enough data written to it. This hint is read by
* upstream producers to stop producing if they can. Mainly used in streaming.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public final class StreamingDataflowWorker {
"windmill_bounded_queue_executor_use_fair_monitor";
// Don't use. Experiment guarding multi key bundles. The feature is work in progress and
// incomplete.
private static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";
public static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";

private final WindmillStateCache stateCache;
private AtomicReference<StreamingWorkerStatusPages> statusPages = new AtomicReference<>();
Expand Down Expand Up @@ -257,6 +257,7 @@ private StreamingDataflowWorker(
this.streamingWorkScheduler =
StreamingWorkScheduler.create(
options,
DataflowRunner.hasExperiment(options, UNSTABLE_ENABLE_MULTI_KEY_BUNDLE),
clock,
readerCache,
mapTaskExecutorFactory,
Expand Down Expand Up @@ -1206,9 +1207,14 @@ private void onCompleteCommit(CompleteCommit completeCommit) {
computationStateCache
.getIfPresent(completeCommit.computationId())
.ifPresent(
state ->
state -> {
if (completeCommit.retryableFailure()) {
state.reExecuteActiveWork(completeCommit.shardedKey(), completeCommit.workId());
} else {
state.completeWorkAndScheduleNextWorkForKey(
completeCommit.shardedKey(), completeCommit.workId()));
completeCommit.shardedKey(), completeCommit.workId());
}
});
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
Expand Down Expand Up @@ -83,6 +85,8 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -121,6 +125,12 @@ public class StreamingModeExecutionContext
extends DataflowExecutionContext<StreamingModeExecutionContext.StepContext> {

private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class);
private static final String WINDMILL_MAX_KEY_GROUP_BATCH_SIZE =
"windmill_max_key_group_batch_size";
private static final String WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS =
"windmill_max_key_group_batch_time_ms";
private static final String WINDMILL_MAX_KEY_GROUP_BATCH_SINK_BYTES =
"windmill_max_key_group_batch_sink_bytes";

private final String computationId;
private final ImmutableMap<String, String> stateNameMap;
Expand Down Expand Up @@ -181,7 +191,7 @@ public class StreamingModeExecutionContext

// Key switch listener to delegate MDC logging context and thread name updates
public interface KeyTransitionListener {
void onKeyTransition(Work oldWork, Work newWork);
void onKeyTransition(@Nullable Work oldWork, Work newWork);
}

@SuppressWarnings("UnusedVariable")
Expand All @@ -197,6 +207,13 @@ public interface KeyTransitionListener {
private long stateBytesRead = 0;
private final String sourceBytesProcessCounterName;

private final int maxKeyGroupBatchSize;
private final long maxKeyGroupBatchTimeNanos;
private final boolean multiKeyBundleEnabled;
private final long maxKeyGroupBatchSinkBytes;
private int workItemsPolled = 0;
private long bundleStartTimeNanos = 0;

public StreamingModeExecutionContext(
CounterFactory counterFactory,
String computationId,
Expand All @@ -213,6 +230,7 @@ public StreamingModeExecutionContext(
boolean hotKeyLoggingEnabled,
String stepName,
String sourceBytesProcessCounterName,
PipelineOptions options,
SideInputStateFetcherFactory sideInputStateFetcherFactory) {
super(
counterFactory,
Expand All @@ -232,7 +250,33 @@ public StreamingModeExecutionContext(
this.hotKeyLoggingEnabled = hotKeyLoggingEnabled;
this.stepName = checkNotNull(stepName);
this.sourceBytesProcessCounterName = checkNotNull(sourceBytesProcessCounterName);
this.sideInputStateFetcherFactory = sideInputStateFetcherFactory;
this.sideInputStateFetcherFactory = checkNotNull(sideInputStateFetcherFactory);

// Initialize batch limits from pipeline options
this.maxKeyGroupBatchSize =
tryParseInt(
ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_SIZE),
100,
WINDMILL_MAX_KEY_GROUP_BATCH_SIZE);

long batchTimeMs =
tryParseLong(
ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS),
100,
WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS);
this.maxKeyGroupBatchTimeNanos = TimeUnit.MILLISECONDS.toNanos(batchTimeMs);

this.multiKeyBundleEnabled =
ExperimentalOptions.hasExperiment(
options, StreamingDataflowWorker.UNSTABLE_ENABLE_MULTI_KEY_BUNDLE);

this.maxKeyGroupBatchSinkBytes =
tryParseLong(
ExperimentalOptions.getExperimentValue(
options, WINDMILL_MAX_KEY_GROUP_BATCH_SINK_BYTES),
StreamingDataflowWorker.MAX_SINK_BYTES,
WINDMILL_MAX_KEY_GROUP_BATCH_SINK_BYTES);

StreamingGlobalConfig config = globalConfigHandle.getConfig();
this.operationalLimits = config.operationalLimits();
this.windmillTagEncoding =
Expand All @@ -241,6 +285,41 @@ public StreamingModeExecutionContext(
: WindmillTagEncodingV1.instance();
}

private static int tryParseInt(@Nullable String value, int defaultValue, String experimentName) {
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse experiment {} value '{}' as integer, falling back to default: {}",
experimentName,
value,
defaultValue,
e);
return defaultValue;
}
}

private static long tryParseLong(
@Nullable String value, long defaultValue, String experimentName) {
if (value == null) {
return defaultValue;
}
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse experiment {} value '{}' as long, falling back to default: {}",
experimentName,
value,
defaultValue,
e);
return defaultValue;
}
}

@VisibleForTesting
public final long getBacklogBytes() {
return backlogBytes;
Expand Down Expand Up @@ -337,6 +416,9 @@ public void start(
this.budgetHandle = budgetHandle;
this.keyTransitionListener = keyTransitionListener;

this.workItemsPolled = 1;
this.bundleStartTimeNanos = System.nanoTime();

StreamingGlobalConfig config = globalConfigHandle.getConfig();
// Snapshot the limits for entire bundle processing.
this.operationalLimits = config.operationalLimits();
Expand Down Expand Up @@ -687,10 +769,47 @@ private final long computeSourceBytesProcessed(String sourceBytesCounterName) {
}

public boolean advance() {
// TODO: get more work from workQueueExecutor and merge into the bundle here
if (!multiKeyBundleEnabled) {
return false;
}
if (workIsFailed()) {
throw new WorkItemCancelledException(checkStateNotNull(work).getWorkItem().getShardingKey());
}

BoundedQueueExecutor executor = checkStateNotNull(workQueueExecutor);
BoundedQueueExecutorWorkHandle handle = checkStateNotNull(budgetHandle);
Work activeWork = checkStateNotNull(work);

if (activeWork.getKeyGroup().equals(Work.KeyGroup.DEFAULT) || shouldStopBatching()) {
return false;
}

@Nullable
ExecutableWork additionalWork =
executor.pollWork(computationId, activeWork.getKeyGroup(), handle);
if (additionalWork != null) {
flushStateInternal();
Work newWork = additionalWork.work();
++workItemsPolled;
checkStateNotNull(keyTransitionListener).onKeyTransition(activeWork, newWork);
startForNewKey(newWork);
return true;
}

return false;
}

private boolean shouldStopBatching() {
if (workItemsPolled >= maxKeyGroupBatchSize) {
return true;
}
long elapsedNanos = System.nanoTime() - bundleStartTimeNanos;
if (elapsedNanos >= maxKeyGroupBatchTimeNanos) {
return true;
}
return getBytesSinked() >= maxKeyGroupBatchSinkBytes;
}

private void startForNewKey(Work newWork, WindmillStateReader reader) throws CoderException {
newWork.setState(Work.State.PROCESSING);
if (keyTransitionListener != null && this.work != null && this.work != newWork) {
Expand Down Expand Up @@ -726,8 +845,8 @@ private void startForNewKey(Work newWork, WindmillStateReader reader) throws Cod
WindmillStateCache.ForKey cacheForKey =
stateCache.forKey(
getComputationKey(), newWork.getWorkItem().getCacheToken(), getWorkToken());
this.activeStateReader = reader;
startStepContexts(reader, processingTime, cacheForKey, newWork.watermarks());
this.activeStateReader = newWork.createWindmillStateReader(this::workIsFailed);
startStepContexts(this.activeStateReader, processingTime, cacheForKey, newWork.watermarks());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@
*/
package org.apache.beam.runners.dataflow.worker;

import javax.annotation.Nullable;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Indicates that the key token was invalid when data was attempted to be fetched. */
public class KeyTokenInvalidException extends RuntimeException {
public KeyTokenInvalidException(String key) {
super("Unable to fetch data due to token mismatch for key " + key);
/**
* Indicates that the work is no longer valid and should be canceled. It is thrown as a signal for
* upper layers to mark the work as failed.
*/
public class WorkCancelingException extends RuntimeException {

public WorkCancelingException(long sharding_key) {
super("Work canceling exception for key " + sharding_key);
}

public WorkCancelingException(Throwable cause) {
super(cause);
}

/** Returns whether an exception was caused by a {@link KeyTokenInvalidException}. */
public static boolean isKeyTokenInvalidException(@Nullable Throwable t) {
while (t != null) {
if (t instanceof KeyTokenInvalidException) {
/** Returns whether an exception was caused by a {@link WorkCancelingException}. */
public static boolean isWorkCancelingException(Throwable t) {
@Nullable Throwable throwable = t;
while (throwable != null) {
if (throwable instanceof WorkCancelingException) {
return true;
}
t = t.getCause();
throwable = throwable.getCause();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,10 @@
*/
package org.apache.beam.runners.dataflow.worker;

/** Indicates that the work item was cancelled and should not be retried. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
/** Indicates that the work item was canceled. */
public class WorkItemCancelledException extends RuntimeException {

public WorkItemCancelledException(long sharding_key) {
super("Work item cancelled for key " + sharding_key);
}

public WorkItemCancelledException(String message, Throwable cause) {
super(message, cause);
}

public WorkItemCancelledException(Throwable cause) {
super(cause);
}

/** Returns whether an exception was caused by a {@link WorkItemCancelledException}. */
public static boolean isWorkItemCancelledException(Throwable t) {
while (t != null) {
if (t instanceof WorkItemCancelledException) {
return true;
}
t = t.getCause();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ static ActiveWorkState create(WindmillStateCache.ForComputation computationState
return new ActiveWorkState(new HashMap<>(), computationStateCache);
}

synchronized Optional<ExecutableWork> getActiveWork(ShardedKey shardedKey, WorkId workId) {
LinkedHashMap<WorkId, ExecutableWork> workQueue = activeWork.get(shardedKey.shardingKey());
return workQueue == null ? Optional.empty() : Optional.ofNullable(workQueue.get(workId));
}

@VisibleForTesting
static ActiveWorkState forTesting(
Map<Long, LinkedHashMap<WorkId, ExecutableWork>> activeWork,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/**
* A handle to use when requesting pulling more work from @BoundedQueueExecutor
* via @BoundedQueueExecutor.pollWork
*/
public interface BoundedQueueExecutorWorkHandle {}
public interface BoundedQueueExecutorWorkHandle {
// Returns all work that are tracked by the handle
ImmutableList<Work> getWorkBatch();
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, WorkId
.ifPresent(this::forceExecute);
}

public void reExecuteActiveWork(ShardedKey shardedKey, WorkId workId) {
activeWorkState.getActiveWork(shardedKey, workId).ifPresent(this::forceExecute);
}

public void invalidateStuckCommits(Instant stuckCommitDeadline) {
activeWorkState.invalidateStuckCommits(
stuckCommitDeadline, this::completeWorkAndScheduleNextWorkForKey);
Expand Down
Loading
Loading