File tree Expand file tree Collapse file tree
core/src/main/java/org/apache/beam/sdk
harness/src/main/java/org/apache/beam/fn/harness Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -126,7 +126,7 @@ public static <K> Timer<K> cleared(
126126 */
127127 public abstract @ Nullable PaneInfo getPaneInfo ();
128128
129- public abstract @ Nullable CausedByDrain causedByDrain ();
129+ public abstract CausedByDrain causedByDrain ();
130130
131131 @ Override
132132 public final boolean equals (@ Nullable Object other ) {
Original file line number Diff line number Diff line change @@ -148,6 +148,7 @@ public Builder<T> setRecordOffset(@Nullable Long recordOffset) {
148148
149149 @ Override
150150 public Builder <T > setCausedByDrain (CausedByDrain causedByDrain ) {
151+ checkStateNotNull (causedByDrain , "CausedByDrain is null" );
151152 this .causedByDrain = causedByDrain ;
152153 return this ;
153154 }
@@ -202,6 +203,7 @@ public PaneInfo getPaneInfo() {
202203
203204 @ Override
204205 public CausedByDrain causedByDrain () {
206+ checkStateNotNull (causedByDrain , "CausedByDrain not set" );
205207 return causedByDrain ;
206208 }
207209
Original file line number Diff line number Diff line change @@ -1202,7 +1202,8 @@ private <K> void processTimer(
12021202 checkNotNull (timerBundleTracker );
12031203 try {
12041204 currentKey = timer .getUserKey ();
1205- causedByDrain = timer .causedByDrain ();
1205+ causedByDrain = Preconditions .checkNotNull (timer .causedByDrain ());
1206+
12061207 // add drain
12071208 Iterator <BoundedWindow > windowIterator =
12081209 (Iterator <BoundedWindow >) timer .getWindows ().iterator ();
You can’t perform that action at this time.
0 commit comments