@@ -624,15 +624,18 @@ private void startBundle() {
624624
625625 private void processElementForParDo (WindowedValue <InputT > elem ) {
626626 currentElement = elem ;
627+ causedByDrain = currentElement .causedByDrain ();
627628 try {
628629 doFnInvoker .invokeProcessElement (processContext );
629630 } finally {
630631 currentElement = null ;
632+ causedByDrain = null ;
631633 }
632634 }
633635
634636 private void processElementForWindowObservingParDo (WindowedValue <InputT > elem ) {
635637 currentElement = elem ;
638+ causedByDrain = currentElement .causedByDrain ();
636639 try {
637640 Iterator <BoundedWindow > windowIterator =
638641 (Iterator <BoundedWindow >) elem .getWindows ().iterator ();
@@ -643,12 +646,14 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
643646 } finally {
644647 currentElement = null ;
645648 currentWindow = null ;
649+ causedByDrain = null ;
646650 }
647651 }
648652
649653 private void processElementForWindowObservingSizedElementAndRestriction (
650654 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
651655 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
656+ causedByDrain = elem .causedByDrain ();
652657 windowCurrentIndex = -1 ;
653658 windowStopIndex = currentElement .getWindows ().size ();
654659 currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
@@ -660,6 +665,7 @@ private void processElementForWindowObservingSizedElementAndRestriction(
660665 windowCurrentIndex = -1 ;
661666 windowStopIndex = 0 ;
662667 currentElement = null ;
668+ causedByDrain = null ;
663669 currentWindows = null ;
664670 currentRestriction = null ;
665671 currentWatermarkEstimatorState = null ;
@@ -1202,7 +1208,8 @@ private <K> void processTimer(
12021208 checkNotNull (timerBundleTracker );
12031209 try {
12041210 currentKey = timer .getUserKey ();
1205- causedByDrain = timer .causedByDrain ();
1211+ causedByDrain = Preconditions .checkNotNull (timer .causedByDrain ());
1212+
12061213 // add drain
12071214 Iterator <BoundedWindow > windowIterator =
12081215 (Iterator <BoundedWindow >) timer .getWindows ().iterator ();
0 commit comments