Skip to content

Commit d553b78

Browse files
authored
[Dataflow Streaming] Add Operation::finishKey() and move processTimers into it (#38430)
* [Dataflow] Add Operation::finishKey() and move timers into it Moving the processTimers call from finish() to finishKey(). In upcoming changes there'll be multiple streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. finishKey() will be called by the NativeIterator classes after iterating through all elements from a work item. Batch processes timers in BatchModeUngroupingParDoFn and does not rely on the processTimers() in ParDoOperation::finish(). So removing the processTimers() call from ParDoOperation::finish() is safe. Batch also does not use the new finishKey() method.
1 parent eb29f86 commit d553b78

23 files changed

Lines changed: 195 additions & 48 deletions

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,12 @@ public NativeReader<?> create(
117117

118118
@Override
119119
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
120-
return new PubsubReaderIterator(context.getWorkItem());
120+
return new PubsubReaderIterator();
121121
}
122122

123123
class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
124-
protected PubsubReaderIterator(Windmill.WorkItem work) {
125-
super(work, skipUndecodableElements);
126-
}
127-
128-
@Override
129-
public boolean advance() throws IOException {
130-
if (context.workIsFailed()) {
131-
return false;
132-
}
133-
return super.advance();
124+
protected PubsubReaderIterator() {
125+
super(context, skipUndecodableElements);
134126
}
135127

136128
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2021
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2223

@@ -56,6 +57,7 @@
5657
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
5758
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
5859
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
60+
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
5961
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
6062
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
6163
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
@@ -157,6 +159,9 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
157159
*/
158160
private @Nullable UnboundedReader<?> activeReader;
159161

162+
private @Nullable WorkExecutor workExecutor;
163+
private boolean finishKeyCalled = false;
164+
160165
public StreamingModeExecutionContext(
161166
CounterFactory counterFactory,
162167
String computationId,
@@ -240,9 +245,12 @@ public void start(
240245
Work work,
241246
WindmillStateReader stateReader,
242247
SideInputStateFetcher sideInputStateFetcher,
243-
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
248+
Windmill.WorkItemCommitRequest.Builder outputBuilder,
249+
WorkExecutor workExecutor) {
244250
this.key = key;
245251
this.work = work;
252+
this.workExecutor = workExecutor;
253+
this.finishKeyCalled = false;
246254
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
247255
this.sideInputStateFetcher = sideInputStateFetcher;
248256
StreamingGlobalConfig config = globalConfigHandle.getConfig();
@@ -271,6 +279,17 @@ public void start(
271279
}
272280
}
273281

282+
public void finishKey() {
283+
checkState(!finishKeyCalled, "finishKey was already called");
284+
checkStateNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
285+
try {
286+
workExecutor.finishKey();
287+
} catch (Exception e) {
288+
throw new RuntimeException(e);
289+
}
290+
this.finishKeyCalled = true;
291+
}
292+
274293
/**
275294
* Ensure that the processing time is greater than any fired processing time timers. Otherwise, a
276295
* trigger could ignore the timer and orphan the window.
@@ -452,6 +471,7 @@ public void invalidateCache() {
452471
}
453472

454473
public Map<Long, Pair<Instant, Runnable>> flushState() {
474+
checkState(finishKeyCalled, "finishKey must be called before flushState");
455475
Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();
456476

457477
for (StepContext stepContext : getAllStepContexts()) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,12 @@ public NativeReader<?> create(
110110

111111
@Override
112112
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
113-
return new UngroupedWindmillReaderIterator(context.getWorkItem());
113+
return new UngroupedWindmillReaderIterator();
114114
}
115115

116116
class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
117-
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
118-
super(work, skipUndecodableElements);
119-
}
120-
121-
@Override
122-
public boolean advance() throws IOException {
123-
if (context.workIsFailed()) {
124-
return false;
125-
}
126-
return super.advance();
117+
UngroupedWindmillReaderIterator() {
118+
super(context, skipUndecodableElements);
127119
}
128120

129121
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*/
3535
public abstract class WindmillReaderIteratorBase<T>
3636
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
37+
private final StreamingModeExecutionContext context;
3738
private final Windmill.WorkItem work;
3839
private int bundleIndex = 0;
3940
private int messageIndex = -1;
@@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase<T>
4243
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
4344

4445
protected WindmillReaderIteratorBase(
45-
Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
46+
StreamingModeExecutionContext context, ValueProvider<Boolean> skipUndecodableElements) {
47+
this.context = context;
4648
this.skipUndecodableElements = skipUndecodableElements;
47-
this.work = work;
49+
this.work = context.getWorkItem();
4850
}
4951

5052
@Override
@@ -54,9 +56,14 @@ public boolean start() throws IOException {
5456

5557
@Override
5658
public boolean advance() throws IOException {
59+
if (context.workIsFailed()) {
60+
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
61+
}
62+
5763
while (true) {
5864
if (bundleIndex >= work.getMessageBundlesCount()) {
5965
current = null;
66+
context.finishKey();
6067
return false;
6168
}
6269
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
156156
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
157157
@Override
158158
public boolean start() throws IOException {
159+
context.finishKey();
159160
return false;
160161
}
161162

@@ -182,6 +183,7 @@ public boolean start() throws IOException {
182183
@Override
183184
public boolean advance() throws IOException {
184185
current = null;
186+
context.finishKey();
185187
return false;
186188
}
187189

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public void execute() throws Exception {
8989
LOG.debug("Source operation execution complete");
9090
}
9191

92+
@Override
93+
public void finishKey() throws Exception {}
94+
9295
@Override
9396
public SourceOperationResponse getResponse() {
9497
return response;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,7 @@ public boolean start() throws IOException {
824824
}
825825
try {
826826
if (!reader.start()) {
827+
context.finishKey();
827828
return false;
828829
}
829830
} catch (Exception e) {
@@ -841,10 +842,13 @@ public boolean advance() throws IOException {
841842
// that there are regular checkpoints and that state does not become too large.
842843
BackOff backoff = backoffFactory.backoff();
843844
while (true) {
845+
if (context.workIsFailed()) {
846+
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
847+
}
844848
if (elemsRead >= maxElems
845849
|| Instant.now().isAfter(endTime)
846-
|| context.isSinkFullHintSet()
847-
|| context.workIsFailed()) {
850+
|| context.isSinkFullHintSet()) {
851+
context.finishKey();
848852
return false;
849853
}
850854
try {
@@ -857,6 +861,7 @@ public boolean advance() throws IOException {
857861
}
858862
long nextBackoff = backoff.nextBackOffMillis();
859863
if (nextBackoff == BackOff.STOP) {
864+
context.finishKey();
860865
return false;
861866
}
862867
Uninterruptibles.sleepUninterruptibly(nextBackoff, TimeUnit.MILLISECONDS);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public final void executeWork(
7474
SideInputStateFetcher sideInputStateFetcher,
7575
Windmill.WorkItemCommitRequest.Builder outputBuilder)
7676
throws Exception {
77-
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
77+
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
7878
workExecutor().execute();
7979
}
8080

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public void process(Object elem) throws Exception {
4343
}
4444
}
4545

46+
@Override
47+
public void finishKey() throws Exception {}
48+
4649
@Override
4750
public boolean supportsRestart() {
4851
return true;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ public void execute() throws Exception {
112112
// TODO: support for success / failure ports?
113113
}
114114

115+
@Override
116+
public void finishKey() throws Exception {
117+
for (Operation op : operations) {
118+
op.finishKey();
119+
}
120+
}
121+
115122
@Override
116123
public NativeReader.Progress getWorkerProgress() throws Exception {
117124
return getReadOperation().getProgress();

0 commit comments

Comments
 (0)