Skip to content

Commit 78fdc95

Browse files
authored
[Dataflow Streaming] [Multi Key] Prepare StreamingModeExecutionContext and StreamingWorkScheduler for multi-key execution. (#38814)
1 parent e34cc96 commit 78fdc95

14 files changed

Lines changed: 1266 additions & 373 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 1,
3+
"modification": 2,
44
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 301 additions & 69 deletions
Large diffs are not rendered by default.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
public abstract class WindmillReaderIteratorBase<T>
3636
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
3737
private final StreamingModeExecutionContext context;
38-
private final Windmill.WorkItem work;
39-
private int bundleIndex = 0;
40-
private int messageIndex = -1;
38+
private Windmill.WorkItem work;
39+
private int bundleIndex;
40+
private int messageIndex;
4141
private @Nullable WindowedValue<T> current = null;
4242
private final ValueProvider<Boolean> skipUndecodableElements;
4343
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
@@ -47,6 +47,8 @@ protected WindmillReaderIteratorBase(
4747
this.context = context;
4848
this.skipUndecodableElements = skipUndecodableElements;
4949
this.work = context.getWorkItem();
50+
this.bundleIndex = 0;
51+
this.messageIndex = -1;
5052
}
5153

5254
@Override
@@ -57,22 +59,33 @@ public boolean start() throws IOException {
5759
@Override
5860
public boolean advance() throws IOException {
5961
if (context.workIsFailed()) {
60-
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
62+
throw new WorkItemCancelledException(checkNotNull(context.getWorkItem()).getShardingKey());
6163
}
6264

6365
while (true) {
6466
if (bundleIndex >= work.getMessageBundlesCount()) {
65-
current = null;
67+
// If elements are exhausted, try advancing the execution context to the next key in the
68+
// group
6669
context.finishKey();
70+
if (context.advance()) {
71+
// Transition succeeded! Update iterator references to the new work item
72+
resetWorkFromContext();
73+
continue;
74+
}
75+
76+
// All work items are exhausted.
77+
current = null;
6778
return false;
6879
}
80+
6981
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
7082
++messageIndex;
7183
if (messageIndex >= bundle.getMessagesCount()) {
7284
messageIndex = -1;
7385
++bundleIndex;
7486
continue;
7587
}
88+
7689
try {
7790
current = checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
7891
return true;
@@ -91,6 +104,12 @@ public boolean advance() throws IOException {
91104
}
92105
}
93106

107+
private void resetWorkFromContext() {
108+
this.work = context.getWorkItem();
109+
this.bundleIndex = 0;
110+
this.messageIndex = -1;
111+
}
112+
94113
protected abstract WindowedValue<T> decodeMessage(Windmill.Message message) throws IOException;
95114

96115
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 60 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.beam.runners.dataflow.util.CloudObject;
3131
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
3232
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
33-
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
3433
import org.apache.beam.sdk.annotations.Internal;
3534
import org.apache.beam.sdk.coders.Coder;
3635
import org.apache.beam.sdk.options.PipelineOptions;
@@ -49,7 +48,6 @@
4948
@Internal
5049
class WindowingWindmillReader<K, T> extends NativeReader<WindowedValue<KeyedWorkItem<K, T>>> {
5150

52-
private final Coder<K> keyCoder;
5351
private final Coder<T> valueCoder;
5452
private final Coder<? extends BoundedWindow> windowCoder;
5553
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -66,7 +64,6 @@ class WindowingWindmillReader<K, T> extends NativeReader<WindowedValue<KeyedWork
6664
this.windowCoder = inputCoder.getWindowCoder();
6765
WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T> keyedWorkItemCoder =
6866
(WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T>) inputCoder.getValueCoder();
69-
this.keyCoder = keyedWorkItemCoder.getKeyCoder();
7067
this.valueCoder = keyedWorkItemCoder.getElementCoder();
7168
this.context = context;
7269
this.skipUndecodableElements = skipUndecodableElements;
@@ -129,73 +126,77 @@ public static <K, T> WindowingWindmillReader<K, T> create(
129126
return new WindowingWindmillReader<>(coder, context, skipUndecodableElements);
130127
}
131128

129+
private KeyedWorkItem<K, T> createKeyedWorkItem() {
130+
@SuppressWarnings("unchecked")
131+
@Nullable
132+
K key = (K) context.getKey();
133+
return new WindmillKeyedWorkItem<>(
134+
key,
135+
context.getWorkItem(),
136+
windowCoder,
137+
windowsCoder,
138+
valueCoder,
139+
context.getWindmillTagEncoding(),
140+
context.getDrainMode(),
141+
skipUndecodableElements.isAccessible()
142+
&& Boolean.TRUE.equals(skipUndecodableElements.get()));
143+
}
144+
145+
private boolean isEmpty(KeyedWorkItem<K, T> keyedWorkItem) {
146+
return Iterables.isEmpty(keyedWorkItem.timersIterable())
147+
&& Iterables.isEmpty(keyedWorkItem.elementsIterable());
148+
}
149+
132150
@Override
133151
public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throws IOException {
134-
final K key =
135-
keyCoder.decode(
136-
checkStateNotNull(context.getSerializedKey()).newInput(), Coder.Context.OUTER);
137-
final WorkItem workItem = context.getWorkItem();
138-
KeyedWorkItem<K, T> keyedWorkItem =
139-
new WindmillKeyedWorkItem<>(
140-
key,
141-
workItem,
142-
windowCoder,
143-
windowsCoder,
144-
valueCoder,
145-
context.getWindmillTagEncoding(),
146-
context.getDrainMode(),
147-
skipUndecodableElements.isAccessible()
148-
&& Boolean.TRUE.equals(skipUndecodableElements.get()));
149-
final boolean isEmptyWorkItem =
150-
(Iterables.isEmpty(keyedWorkItem.timersIterable())
151-
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
152-
final WindowedValue<KeyedWorkItem<K, T>> value = new ValueInEmptyWindows<>(keyedWorkItem);
153-
154-
// Return a noop iterator when current workitem is an empty workitem.
155-
if (isEmptyWorkItem) {
156-
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
157-
@Override
158-
public boolean start() throws IOException {
159-
context.finishKey();
160-
return false;
161-
}
162-
163-
@Override
164-
public boolean advance() throws IOException {
165-
return false;
152+
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
153+
private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
154+
155+
@Override
156+
public boolean start() throws IOException {
157+
if (context.workIsFailed()) {
158+
throw new WorkItemCancelledException(
159+
checkStateNotNull(context.getWorkItem()).getShardingKey());
166160
}
167-
168-
@Override
169-
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
170-
throw new NoSuchElementException();
161+
KeyedWorkItem<K, T> firstKeyedWorkItem = createKeyedWorkItem();
162+
if (isEmpty(firstKeyedWorkItem)) {
163+
return advance(); // Try to transition immediately if the first key is empty!
171164
}
172-
};
173-
} else {
174-
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
175-
private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
176-
177-
@Override
178-
public boolean start() throws IOException {
179-
current = value;
180-
return true;
165+
current = new ValueInEmptyWindows<>(firstKeyedWorkItem);
166+
return true;
167+
}
168+
169+
@Override
170+
public boolean advance() throws IOException {
171+
if (context.workIsFailed()) {
172+
throw new WorkItemCancelledException(
173+
checkStateNotNull(context.getWorkItem()).getShardingKey());
181174
}
182175

183-
@Override
184-
public boolean advance() throws IOException {
185-
current = null;
176+
while (true) {
186177
context.finishKey();
178+
if (context.advance()) {
179+
KeyedWorkItem<K, T> newKeyedWorkItem = createKeyedWorkItem();
180+
if (isEmpty(newKeyedWorkItem)) {
181+
continue;
182+
}
183+
current = new ValueInEmptyWindows<>(newKeyedWorkItem);
184+
return true;
185+
}
186+
187+
current = null;
187188
return false;
188189
}
190+
}
189191

190-
@Override
191-
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
192-
if (current == null) {
193-
throw new NoSuchElementException();
194-
}
195-
return value;
192+
@Override
193+
public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
194+
if (current == null) {
195+
throw new NoSuchElementException();
196196
}
197-
};
198-
}
197+
return current;
198+
}
199+
};
199200
}
200201

201202
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,16 @@
1818
package org.apache.beam.runners.dataflow.worker.streaming;
1919

2020
import com.google.auto.value.AutoValue;
21-
import java.util.HashMap;
2221
import java.util.Optional;
2322
import javax.annotation.concurrent.NotThreadSafe;
2423
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
25-
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
2624
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
2725
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
28-
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
29-
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
30-
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
31-
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
26+
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.KeyTransitionListener;
27+
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
3228
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
3329
import org.apache.beam.sdk.annotations.Internal;
3430
import org.apache.beam.sdk.coders.Coder;
35-
import org.checkerframework.checker.nullness.qual.Nullable;
3631
import org.slf4j.Logger;
3732
import org.slf4j.LoggerFactory;
3833

@@ -58,7 +53,7 @@ public static ComputationWorkExecutor.Builder builder() {
5853

5954
public abstract DataflowWorkExecutor workExecutor();
6055

61-
public abstract StreamingModeExecutionContext context();
56+
abstract StreamingModeExecutionContext context();
6257

6358
public abstract Optional<Coder<?>> keyCoder();
6459

@@ -67,15 +62,24 @@ public static ComputationWorkExecutor.Builder builder() {
6762
/**
6863
* Executes DoFns for the Work. Blocks the calling thread until DoFn(s) have completed execution.
6964
*/
70-
public final void executeWork(
71-
@Nullable Object key,
65+
public final StreamingModeExecutionContext executeWork(
7266
Work work,
7367
WindmillStateReader stateReader,
74-
SideInputStateFetcher sideInputStateFetcher,
75-
Windmill.WorkItemCommitRequest.Builder outputBuilder)
68+
BoundedQueueExecutor workQueueExecutor,
69+
BoundedQueueExecutorWorkHandle budgetHandle,
70+
KeyTransitionListener keyTransitionListener)
7671
throws Exception {
77-
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
72+
context()
73+
.start(
74+
work,
75+
stateReader,
76+
workExecutor(),
77+
workQueueExecutor,
78+
budgetHandle,
79+
keyCoder().orElse(null),
80+
keyTransitionListener);
7881
workExecutor().execute();
82+
return context();
7983
}
8084

8185
/**
@@ -84,25 +88,14 @@ public final void executeWork(
8488
*/
8589
public final void invalidate() {
8690
context().invalidateCache();
91+
context().reset();
8792
try {
8893
workExecutor().close();
8994
} catch (Exception e) {
9095
LOG.warn("Failed to close map task executor: ", e);
9196
}
9297
}
9398

94-
public final long computeSourceBytesProcessed(String sourceBytesCounterName) {
95-
HashMap<String, ElementCounter> counters =
96-
((DataflowMapTaskExecutor) workExecutor())
97-
.getReadOperation()
98-
.receivers[0]
99-
.getOutputCounters();
100-
101-
return Optional.ofNullable(counters.get(sourceBytesCounterName))
102-
.map(counter -> ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset())
103-
.orElse(0L);
104-
}
105-
10699
@AutoValue.Builder
107100
public abstract static class Builder {
108101
public abstract Builder setWorkExecutor(DataflowWorkExecutor workExecutor);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.Objects;
2929
import java.util.Optional;
3030
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
3133
import java.util.function.Consumer;
3234
import java.util.function.Supplier;
3335
import javax.annotation.concurrent.NotThreadSafe;
@@ -82,6 +84,8 @@ public final class Work implements RefreshableWork {
8284
private volatile TimedState currentState;
8385
private volatile boolean isFailed;
8486
private volatile String processingThreadName = "";
87+
private final AtomicReference<@Nullable AtomicBoolean> onFailureListener =
88+
new AtomicReference<>(null);
8589
private final boolean drainMode;
8690

8791
private Work(
@@ -191,6 +195,10 @@ public long getSerializedWorkItemSize() {
191195
return serializedWorkItemSize;
192196
}
193197

198+
public String getComputationId() {
199+
return processingContext.computationId();
200+
}
201+
194202
@Override
195203
public ShardedKey getShardedKey() {
196204
return shardedKey;
@@ -244,6 +252,19 @@ public void setProcessingThreadName(String processingThreadName) {
244252
@Override
245253
public void setFailed() {
246254
this.isFailed = true;
255+
AtomicBoolean listener = onFailureListener.get();
256+
if (listener != null) {
257+
listener.set(true);
258+
}
259+
}
260+
261+
// Sets the passed in boolean to true if the work fails
262+
// Supports registering only one boolean at a time.
263+
public void setOnFailureListener(@Nullable AtomicBoolean listener) {
264+
onFailureListener.set(listener);
265+
if (isFailed && listener != null) {
266+
listener.set(true);
267+
}
247268
}
248269

249270
public boolean isCommitPending() {
@@ -268,6 +289,10 @@ public void queueCommit(WorkItemCommitRequest commitRequest, ComputationState co
268289
processingContext.workCommitter().accept(Commit.create(commitRequest, computationState, this));
269290
}
270291

292+
public Consumer<Commit> workCommitter() {
293+
return processingContext.workCommitter();
294+
}
295+
271296
public WindmillStateReader createWindmillStateReader() {
272297
return WindmillStateReader.forWork(this);
273298
}
@@ -390,10 +415,6 @@ private boolean isCommitPending() {
390415
abstract Instant startTime();
391416
}
392417

393-
public String getComputationId() {
394-
return processingContext.computationId();
395-
}
396-
397418
public KeyGroup getKeyGroup() {
398419
return keyGroup;
399420
}

0 commit comments

Comments
 (0)