Skip to content

Commit 3d9ae3b

Browse files
committed
[Drain] Expose drain to dofn processElement and onTimer - add missing implementation for WindowObservingProcessBundleContextBase and SDF.
1 parent cf3d6ed commit 3d9ae3b

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,6 +1709,11 @@ public BoundedWindow window() {
17091709
return currentWindow;
17101710
}
17111711

1712+
@Override
1713+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1714+
return currentElement.causedByDrain();
1715+
}
1716+
17121717
@Override
17131718
public Object sideInput(String tagId) {
17141719
return sideInput(sideInputMapping.get(tagId));
@@ -2389,6 +2394,11 @@ public BoundedWindow window() {
23892394
return currentWindow;
23902395
}
23912396

2397+
@Override
2398+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn){
2399+
return currentTimer.causedByDrain();
2400+
}
2401+
23922402
@Override
23932403
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
23942404
return currentTimer.getHoldTimestamp();

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.fn.harness;
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
import org.apache.beam.sdk.values.CausedByDrain;
2122
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2223

2324
import com.google.auto.service.AutoService;
@@ -278,6 +279,11 @@ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
278279
return getCurrentElementOrFail().getPaneInfo();
279280
}
280281

282+
@Override
283+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
284+
return getCurrentElementOrFail().causedByDrain();
285+
}
286+
281287
@Override
282288
public Object schemaElement(int index) {
283289
SerializableFunction<InputT, Object> converter =

0 commit comments

Comments
 (0)