Skip to content

Commit 4ba599c

Browse files
committed
[Drain] Expose drain to dofn processElement and onTimer - add missing implementation for SDF and WindowExpiration processContext
1 parent a1f260b commit 4ba599c

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,6 +2422,11 @@ public BoundedWindow window() {
24222422
return currentWindow;
24232423
}
24242424

2425+
@Override
2426+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
2427+
return currentTimer.causedByDrain();
2428+
}
2429+
24252430
@Override
24262431
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
24272432
return currentTimer.getHoldTimestamp();

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.beam.sdk.util.UserCodeException;
4242
import org.apache.beam.sdk.util.construction.PTransformTranslation;
4343
import org.apache.beam.sdk.util.construction.ParDoTranslation;
44+
import org.apache.beam.sdk.values.CausedByDrain;
4445
import org.apache.beam.sdk.values.KV;
4546
import org.apache.beam.sdk.values.PCollectionView;
4647
import org.apache.beam.sdk.values.TupleTag;
@@ -278,6 +279,11 @@ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
278279
return getCurrentElementOrFail().getPaneInfo();
279280
}
280281

282+
@Override
283+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
284+
return getCurrentElementOrFail().causedByDrain();
285+
}
286+
281287
@Override
282288
public Object schemaElement(int index) {
283289
SerializableFunction<InputT, Object> converter =

0 commit comments

Comments
 (0)