@@ -624,15 +624,18 @@ private void startBundle() {
624624
625625 private void processElementForParDo (WindowedValue <InputT > elem ) {
626626 currentElement = elem ;
627+ causedByDrain = currentElement .causedByDrain ();
627628 try {
628629 doFnInvoker .invokeProcessElement (processContext );
629630 } finally {
630631 currentElement = null ;
632+ causedByDrain = null ;
631633 }
632634 }
633635
634636 private void processElementForWindowObservingParDo (WindowedValue <InputT > elem ) {
635637 currentElement = elem ;
638+ causedByDrain = currentElement .causedByDrain ();
636639 try {
637640 Iterator <BoundedWindow > windowIterator =
638641 (Iterator <BoundedWindow >) elem .getWindows ().iterator ();
@@ -643,12 +646,14 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
643646 } finally {
644647 currentElement = null ;
645648 currentWindow = null ;
649+ causedByDrain = null ;
646650 }
647651 }
648652
649653 private void processElementForWindowObservingSizedElementAndRestriction (
650654 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
651655 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
656+ causedByDrain = elem .causedByDrain ();
652657 windowCurrentIndex = -1 ;
653658 windowStopIndex = currentElement .getWindows ().size ();
654659 currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
@@ -660,6 +665,7 @@ private void processElementForWindowObservingSizedElementAndRestriction(
660665 windowCurrentIndex = -1 ;
661666 windowStopIndex = 0 ;
662667 currentElement = null ;
668+ causedByDrain = null ;
663669 currentWindows = null ;
664670 currentRestriction = null ;
665671 currentWatermarkEstimatorState = null ;
@@ -1202,7 +1208,8 @@ private <K> void processTimer(
12021208 checkNotNull (timerBundleTracker );
12031209 try {
12041210 currentKey = timer .getUserKey ();
1205- causedByDrain = timer .causedByDrain ();
1211+ causedByDrain = Preconditions .checkNotNull (timer .causedByDrain ());
1212+
12061213 // add drain
12071214 Iterator <BoundedWindow > windowIterator =
12081215 (Iterator <BoundedWindow >) timer .getWindows ().iterator ();
@@ -1286,6 +1293,7 @@ private <K> void processOnWindowExpiration(Timer<K> timer) {
12861293 try {
12871294 currentKey = timer .getUserKey ();
12881295 currentTimer = timer ;
1296+ causedByDrain = timer .causedByDrain ();
12891297 Iterator <BoundedWindow > windowIterator =
12901298 (Iterator <BoundedWindow >) timer .getWindows ().iterator ();
12911299 while (windowIterator .hasNext ()) {
@@ -1296,6 +1304,7 @@ private <K> void processOnWindowExpiration(Timer<K> timer) {
12961304 currentKey = null ;
12971305 currentTimer = null ;
12981306 currentWindow = null ;
1307+ causedByDrain = null ;
12991308 }
13001309 }
13011310
@@ -2315,7 +2324,7 @@ public OutputBuilder<OutputT> builder(OutputT value) {
23152324 .setWindow (currentWindow )
23162325 .setTimestamp (currentTimer .getHoldTimestamp ())
23172326 .setPaneInfo (currentTimer .getPaneInfo ())
2318- .setCausedByDrain (causedByDrain )
2327+ .setCausedByDrain (currentTimer . causedByDrain () )
23192328 .setReceiver (
23202329 windowedValue -> {
23212330 checkOnWindowExpirationTimestamp (windowedValue .getTimestamp ());
@@ -2336,7 +2345,7 @@ public <T> void output(TupleTag<T> tag, T output) {
23362345 output ,
23372346 currentTimer .getHoldTimestamp (),
23382347 currentWindow ,
2339- currentTimer .getPaneInfo ()));
2348+ currentTimer .getPaneInfo (), null , null , currentTimer . causedByDrain () ));
23402349 }
23412350
23422351 @ Override
@@ -2786,7 +2795,7 @@ public OutputBuilder<T> builder(T value) {
27862795 .setValue (value )
27872796 .setTimestamp (currentTimer .getHoldTimestamp ())
27882797 .setWindow (currentWindow )
2789- .setCausedByDrain (causedByDrain )
2798+ .setCausedByDrain (currentTimer . causedByDrain () )
27902799 .setPaneInfo (currentTimer .getPaneInfo ())
27912800 .setReceiver (windowedValue -> context .outputWindowedValue (windowedValue ));
27922801 }
@@ -2819,7 +2828,7 @@ public OutputBuilder<Row> builder(Row value) {
28192828 .setTimestamp (currentTimer .getHoldTimestamp ())
28202829 .setWindow (currentWindow )
28212830 .setPaneInfo (currentTimer .getPaneInfo ())
2822- .setCausedByDrain (causedByDrain )
2831+ .setCausedByDrain (currentTimer . causedByDrain () )
28232832 .setReceiver (
28242833 windowedValue ->
28252834 context .outputWindowedValue (
0 commit comments