Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions sdks/python/apache_beam/transforms/stats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ def _approx_quantile_generator(size, num_of_quantiles, absoluteError):
quantiles.append(size - 1)
return quantiles

@staticmethod
def _sum_and_second(x):
return (sum(x), x[1])
Comment thread
shunping marked this conversation as resolved.

def test_quantiles_globaly(self):
with TestPipeline() as p:
pc = p | Create(list(range(101)))
Expand Down Expand Up @@ -490,22 +494,32 @@ def test_batched_quantiles(self):
3, input_batched=True))
with_key = (
pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally(
3, key=sum, input_batched=True))
3,
key=ApproximateQuantilesTest._sum_and_second,
input_batched=True))
key_with_reversed = (
pc | 'Globally with key and reversed' >>
beam.ApproximateQuantiles.Globally(
3, key=sum, reverse=True, input_batched=True))
3,
key=ApproximateQuantilesTest._sum_and_second,
reverse=True,
input_batched=True))
assert_that(
globally,
equal_to([[(0.0, 500), (49.9, 1), (99.9, 499)]]),
label='checkGlobally')
# When key is present, both (72.5, 225) and (22.5, 275) produce the exact same
# sum (297.5). If we just use key=sum, tie-breaking is sensitive to bundle merging
# order and shared class-level jitter state, leading to flaky test failures.
# With the secondary key (defined in _sum_and_second), we can break ties
# deterministically.
assert_that(
with_key,
equal_to([[(50.0, 0), (72.5, 225), (99.9, 499)]]),
label='checkGloballyWithKey')
assert_that(
key_with_reversed,
equal_to([[(99.9, 499), (72.5, 225), (50.0, 0)]]),
equal_to([[(99.9, 499), (22.5, 275), (50.0, 0)]]),
label='checkGloballyWithKeyAndReversed')

def test_batched_weighted_quantiles(self):
Expand Down
Loading