From cb063ef534e45d7c8093d23d2e1e4700c5b25ddb Mon Sep 17 00:00:00 2001 From: rishav23 Date: Fri, 22 May 2026 15:16:26 +0530 Subject: [PATCH 1/2] [SPARK-50520][PySpark] Respect timeout semantics in approximate RDD actions --- python/pyspark/core/rdd.py | 3 ++- python/pyspark/tests/test_rdd.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/core/rdd.py b/python/pyspark/core/rdd.py index 803bb6d5b882b..573cc7ae165c5 100644 --- a/python/pyspark/core/rdd.py +++ b/python/pyspark/core/rdd.py @@ -4843,7 +4843,8 @@ def sumApprox( jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd() assert self.ctx._jvm is not None jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) - r = jdrdd.sumApprox(timeout, confidence).getFinalValue() + partial = jdrdd.sumApprox(timeout, confidence) + r = partial.initialValue() return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) def meanApprox( diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 4c43d6181cc36..507d13c74d815 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -638,6 +638,18 @@ def test_distinct(self): self.assertEqual(result.getNumPartitions(), 5) self.assertEqual(result.count(), 3) + def test_count_approx_respects_timeout(self): + rdd = self.sc.range(1000000, numSlices=8) + start = time.time() + result = rdd.countApprox(timeout=100) + elapsed = time.time() - start + self.assertLess(elapsed, 10) + self.assertIsNotNone(result) + + def test_count_approx_returns_exact_when_completed(self): + rdd = self.sc.parallelize(range(1000), 8) + self.assertEqual(rdd.countApprox(timeout=5000), 1000) + def test_external_group_by_key(self): self.sc._conf.set("spark.python.worker.memory", "1m") N = 2000001 From 0a46c2bc524a5719bef7f3b5a3bee45fb6598cb1 Mon Sep 17 00:00:00 2001 From: rishav23 Date: Fri, 22 May 2026 15:46:43 +0530 Subject: [PATCH 2/2] [SPARK-50520][PySpark] Trigger CI --- python/pyspark/core/rdd.py | 3 ++- python/pyspark/tests/test_rdd.py | 26 ++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/python/pyspark/core/rdd.py b/python/pyspark/core/rdd.py index 573cc7ae165c5..190857bf3cecc 100644 --- a/python/pyspark/core/rdd.py +++ b/python/pyspark/core/rdd.py @@ -4882,7 +4882,8 @@ def meanApprox( jrdd = self.map(float)._to_java_object_rdd() assert self.ctx._jvm is not None jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) - r = jdrdd.meanApprox(timeout, confidence).getFinalValue() + partial = jdrdd.meanApprox(timeout, confidence) + r = partial.initialValue() return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) def countApproxDistinct(self: "RDD[T]", relativeSD: float = 0.05) -> int: diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 507d13c74d815..354830666ef10 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -639,17 +639,35 @@ def test_distinct(self): self.assertEqual(result.count(), 3) def test_count_approx_respects_timeout(self): - rdd = self.sc.range(1000000, numSlices=8) + def slow(x): + time.sleep(1) + return x + + rdd = self.sc.parallelize(range(20), 20).map(slow) start = time.time() - result = rdd.countApprox(timeout=100) + rdd.countApprox(timeout=100) elapsed = time.time() - start - self.assertLess(elapsed, 10) - self.assertIsNotNone(result) + self.assertLess(elapsed, 2) + # Cancel the background approximate job before subsequent tests run. + self.sc.cancelAllJobs() def test_count_approx_returns_exact_when_completed(self): rdd = self.sc.parallelize(range(1000), 8) self.assertEqual(rdd.countApprox(timeout=5000), 1000) + def test_mean_approx_respects_timeout(self): + def slow(x): + time.sleep(1) + return float(x) + + rdd = self.sc.parallelize(range(20), 20).map(slow) + start = time.time() + rdd.meanApprox(timeout=100) + elapsed = time.time() - start + self.assertLess(elapsed, 2) + # Cancel the background approximate job before subsequent tests run. + self.sc.cancelAllJobs() + def test_external_group_by_key(self): self.sc._conf.set("spark.python.worker.memory", "1m") N = 2000001