Skip to content

Commit f91b5af

Browse files
committed
[Dataflow] Add Operation::finishKey() and move timers into it
Moving the processTimers call from finish() to finishKey(). In upcoming changes there'll be multiple streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. finishKey() will be called by the NativeIterator classes after iterating through all elements from a work item. Batch processes timers in BatchModeUngroupingParDoFn and does not rely on the processTimers() in ParDoOperation::finish(). So removing the processTimers() call from ParDoOperation::finish() is safe. Batch also does not use the new finishKey() method.
1 parent 22c43bf commit f91b5af

24 files changed

Lines changed: 170 additions & 39 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 1,
3+
"modification": 2,
44
}
55

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 1,
3+
"modification": 2,
44
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,12 @@ public NativeReader<?> create(
117117

118118
@Override
119119
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
120-
return new PubsubReaderIterator(context.getWorkItem());
120+
return new PubsubReaderIterator();
121121
}
122122

123123
class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
124-
protected PubsubReaderIterator(Windmill.WorkItem work) {
125-
super(work, skipUndecodableElements);
126-
}
127-
128-
@Override
129-
public boolean advance() throws IOException {
130-
if (context.workIsFailed()) {
131-
return false;
132-
}
133-
return super.advance();
124+
protected PubsubReaderIterator() {
125+
super(context, skipUndecodableElements);
134126
}
135127

136128
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
5757
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
5858
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
59+
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
5960
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
6061
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
6162
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
@@ -157,6 +158,8 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
157158
*/
158159
private @Nullable UnboundedReader<?> activeReader;
159160

161+
private @Nullable WorkExecutor workExecutor;
162+
160163
public StreamingModeExecutionContext(
161164
CounterFactory counterFactory,
162165
String computationId,
@@ -240,9 +243,11 @@ public void start(
240243
Work work,
241244
WindmillStateReader stateReader,
242245
SideInputStateFetcher sideInputStateFetcher,
243-
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
246+
Windmill.WorkItemCommitRequest.Builder outputBuilder,
247+
WorkExecutor workExecutor) {
244248
this.key = key;
245249
this.work = work;
250+
this.workExecutor = workExecutor;
246251
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
247252
this.sideInputStateFetcher = sideInputStateFetcher;
248253
StreamingGlobalConfig config = globalConfigHandle.getConfig();
@@ -270,6 +275,11 @@ public void start(
270275
}
271276
}
272277

278+
public void finishKey() throws Exception {
279+
checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
280+
workExecutor.finishKey();
281+
}
282+
273283
/**
274284
* Ensure that the processing time is greater than any fired processing time timers. Otherwise, a
275285
* trigger could ignore the timer and orphan the window.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,12 @@ public NativeReader<?> create(
109109

110110
@Override
111111
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
112-
return new UngroupedWindmillReaderIterator(context.getWorkItem());
112+
return new UngroupedWindmillReaderIterator();
113113
}
114114

115115
class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
116-
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
117-
super(work, skipUndecodableElements);
118-
}
119-
120-
@Override
121-
public boolean advance() throws IOException {
122-
if (context.workIsFailed()) {
123-
return false;
124-
}
125-
return super.advance();
116+
UngroupedWindmillReaderIterator() {
117+
super(context, skipUndecodableElements);
126118
}
127119

128120
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*/
3535
public abstract class WindmillReaderIteratorBase<T>
3636
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
37+
private final StreamingModeExecutionContext context;
3738
private final Windmill.WorkItem work;
3839
private int bundleIndex = 0;
3940
private int messageIndex = -1;
@@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase<T>
4243
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
4344

4445
protected WindmillReaderIteratorBase(
45-
Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
46+
StreamingModeExecutionContext context, ValueProvider<Boolean> skipUndecodableElements) {
47+
this.context = context;
4648
this.skipUndecodableElements = skipUndecodableElements;
47-
this.work = work;
49+
this.work = context.getWorkItem();
4850
}
4951

5052
@Override
@@ -54,9 +56,18 @@ public boolean start() throws IOException {
5456

5557
@Override
5658
public boolean advance() throws IOException {
59+
if (context.workIsFailed()) {
60+
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
61+
}
62+
5763
while (true) {
5864
if (bundleIndex >= work.getMessageBundlesCount()) {
5965
current = null;
66+
try {
67+
context.finishKey();
68+
} catch (Exception e) {
69+
throw new RuntimeException(e);
70+
}
6071
return false;
6172
}
6273
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ public boolean start() throws IOException {
182182
@Override
183183
public boolean advance() throws IOException {
184184
current = null;
185+
try {
186+
context.finishKey();
187+
} catch (Exception e) {
188+
throw new RuntimeException(e);
189+
}
185190
return false;
186191
}
187192

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public void execute() throws Exception {
8989
LOG.debug("Source operation execution complete");
9090
}
9191

92+
@Override
93+
public void finishKey() throws Exception {}
94+
9295
@Override
9396
public SourceOperationResponse getResponse() {
9497
return response;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public final void executeWork(
7474
SideInputStateFetcher sideInputStateFetcher,
7575
Windmill.WorkItemCommitRequest.Builder outputBuilder)
7676
throws Exception {
77-
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
77+
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
7878
workExecutor().execute();
7979
}
8080

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public void process(Object elem) throws Exception {
4343
}
4444
}
4545

46+
@Override
47+
public void finishKey() throws Exception {}
48+
4649
@Override
4750
public boolean supportsRestart() {
4851
return true;

0 commit comments

Comments
 (0)