Skip to content

Commit aa88b60

Browse files
damccormjrmccluskey
authored andcommitted
Consider windows with bands outside of [Min, Max] equivalent (#35460)
1 parent a3808ea commit aa88b60

1 file changed

Lines changed: 18 additions & 5 deletions

File tree

sdks/python/apache_beam/utils/windowed_value.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -432,13 +432,26 @@ def end(self):
432432
return self._end_object
433433

434434
def __hash__(self):
435-
return hash((self._start_micros, self._end_micros))
435+
# Cut off window at start/end timestamps for comparison purposes since some
436+
# portable runners do this already, and timestamps outside of the bands of
437+
# Min/Max timestamps are functionally equal to Min/Max.
438+
start = max(self._start_micros, MIN_TIMESTAMP.micros)
439+
end = min(self._end_micros, MAX_TIMESTAMP.micros)
440+
return hash((start, end))
436441

437442
def __eq__(self, other):
438-
return (
439-
type(self) == type(other) and
440-
self._start_micros == other._start_micros and
441-
self._end_micros == other._end_micros)
443+
if type(self) != type(other):
444+
return False
445+
446+
# Cut off window at start/end timestamps for comparison purposes since some
447+
# portable runners do this already, and timestamps outside of the bands of
448+
# Min/Max timestamps are functionally equal to Min/Max.
449+
self_start = max(self._start_micros, MIN_TIMESTAMP.micros)
450+
self_end = min(self._end_micros, MAX_TIMESTAMP.micros)
451+
other_start = max(other._start_micros, MIN_TIMESTAMP.micros)
452+
other_end = min(other._end_micros, MAX_TIMESTAMP.micros)
453+
454+
return (self_start == other_start and self_end == other_end)
442455

443456
def __repr__(self):
444457
return '[%s, %s)' % (float(self.start), float(self.end))

0 commit comments

Comments
 (0)