@@ -264,6 +264,10 @@ def _approx_quantile_generator(size, num_of_quantiles, absoluteError):
264264 quantiles .append (size - 1 )
265265 return quantiles
266266
267+ @staticmethod
268+ def _sum_and_second (x ):
269+ return (sum (x ), x [1 ])
270+
267271 def test_quantiles_globaly (self ):
268272 with TestPipeline () as p :
269273 pc = p | Create (list (range (101 )))
@@ -490,22 +494,32 @@ def test_batched_quantiles(self):
490494 3 , input_batched = True ))
491495 with_key = (
492496 pc | 'Globally with key' >> beam .ApproximateQuantiles .Globally (
493- 3 , key = sum , input_batched = True ))
497+ 3 ,
498+ key = ApproximateQuantilesTest ._sum_and_second ,
499+ input_batched = True ))
494500 key_with_reversed = (
495501 pc | 'Globally with key and reversed' >>
496502 beam .ApproximateQuantiles .Globally (
497- 3 , key = sum , reverse = True , input_batched = True ))
503+ 3 ,
504+ key = ApproximateQuantilesTest ._sum_and_second ,
505+ reverse = True ,
506+ input_batched = True ))
498507 assert_that (
499508 globally ,
500509 equal_to ([[(0.0 , 500 ), (49.9 , 1 ), (99.9 , 499 )]]),
501510 label = 'checkGlobally' )
511+ # When key is present, both (72.5, 225) and (22.5, 275) produce the exact same
512+ # sum (297.5). If we just use key=sum, tie-breaking is sensitive to bundle merging
513+ # order and shared class-level jitter state, leading to flaky test failures.
514+ # With the secondary key (defined in _sum_and_second), we can break ties
515+ # deterministically.
502516 assert_that (
503517 with_key ,
504518 equal_to ([[(50.0 , 0 ), (72.5 , 225 ), (99.9 , 499 )]]),
505519 label = 'checkGloballyWithKey' )
506520 assert_that (
507521 key_with_reversed ,
508- equal_to ([[(99.9 , 499 ), (72 .5 , 225 ), (50.0 , 0 )]]),
522+ equal_to ([[(99.9 , 499 ), (22 .5 , 275 ), (50.0 , 0 )]]),
509523 label = 'checkGloballyWithKeyAndReversed' )
510524
511525 def test_batched_weighted_quantiles (self ):
0 commit comments