Skip to content

Commit 0beb565

Browse files
committed
[Drain] Expose drain to dofn processElement and onTimer - add missing implementation for WindowObservingProcessBundleContextBase and SDF
1 parent a1f260b commit 0beb565

2 files changed

Lines changed: 16 additions & 0 deletions

File tree

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,11 @@ public BoundedWindow window() {
17181718
return currentWindow;
17191719
}
17201720

1721+
@Override
1722+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1723+
return currentElement.causedByDrain();
1724+
}
1725+
17211726
@Override
17221727
public Object sideInput(String tagId) {
17231728
return sideInput(sideInputMapping.get(tagId));
@@ -2422,6 +2427,11 @@ public BoundedWindow window() {
24222427
return currentWindow;
24232428
}
24242429

2430+
@Override
2431+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn){
2432+
return currentTimer.causedByDrain();
2433+
}
2434+
24252435
@Override
24262436
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
24272437
return currentTimer.getHoldTimestamp();

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.fn.harness;
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
import org.apache.beam.sdk.values.CausedByDrain;
2122
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2223

2324
import com.google.auto.service.AutoService;
@@ -278,6 +279,11 @@ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
278279
return getCurrentElementOrFail().getPaneInfo();
279280
}
280281

282+
@Override
283+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
284+
return getCurrentElementOrFail().causedByDrain();
285+
}
286+
281287
@Override
282288
public Object schemaElement(int index) {
283289
SerializableFunction<InputT, Object> converter =

0 commit comments

Comments
 (0)