From d307601a50d657833d7835463805d95a0dbf50e5 Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 19:59:16 +0530 Subject: [PATCH 1/6] Prevent threaded Dask workers in DistRDF backend --- .../distrdf/python/DistRDF/Backends/Dask/Backend.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 84a9a0bbd4abc..3ff54999a06c4 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -104,6 +104,17 @@ def __init__(self, daskclient: Optional[Client] = None): # N is the number of cores on the local machine. self.client = (daskclient if daskclient is not None else Client(LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, processes=True))) + + workers = self.client.scheduler_info()["workers"] + + for worker in workers.values(): + threads = worker.get("nthreads", 1) + + if threads > 1: + raise RuntimeError( + "DistRDF with Dask does not support threaded workers. " + "Please use processes=True and threads_per_worker=1." + ) def optimize_npartitions(self) -> int: """ From ec5e56a24a48c075ed2717437f660de0902ee20e Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 20:14:39 +0530 Subject: [PATCH 2/6] Prevent threaded Dask workers in DistRDF backend --- bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 3ff54999a06c4..1274d13ac3d13 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -111,10 +111,10 @@ def __init__(self, daskclient: Optional[Client] = None): threads = worker.get("nthreads", 1) if threads > 1: - raise RuntimeError( - "DistRDF with Dask does not support threaded workers. " - "Please use processes=True and threads_per_worker=1." - ) + raise RuntimeError( + "DistRDF with Dask does not support threaded workers. " + "Please use processes=True and threads_per_worker=1." + ) def optimize_npartitions(self) -> int: """ From 0c91ac57a1af0488baaf0cc7b9db066bf78444b3 Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Sat, 23 May 2026 20:19:12 +0530 Subject: [PATCH 3/6] Prevent threaded Dask workers in DistRDF backend --- bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py index 1274d13ac3d13..5e3da6ad5a836 100644 --- a/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py +++ b/bindings/distrdf/python/DistRDF/Backends/Dask/Backend.py @@ -105,7 +105,10 @@ def __init__(self, daskclient: Optional[Client] = None): self.client = (daskclient if daskclient is not None else Client(LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, processes=True))) - workers = self.client.scheduler_info()["workers"] + workers = self.client.scheduler_info().get("workers", None) + + if workers is None: + return for worker in workers.values(): threads = worker.get("nthreads", 1) @@ -114,7 +117,7 @@ def __init__(self, daskclient: Optional[Client] = None): raise RuntimeError( "DistRDF with Dask does not support threaded workers. " "Please use processes=True and threads_per_worker=1." - ) + ) def optimize_npartitions(self) -> int: """ From 5bdb97d9e337f968144d16b4d463ac9bcd43b4fb Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Mon, 1 Jun 2026 22:12:17 +0530 Subject: [PATCH 4/6] Add test for missing Dask worker information --- .../python/distrdf/backends/check_backend.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/roottest/python/distrdf/backends/check_backend.py b/roottest/python/distrdf/backends/check_backend.py index b7ae271ca26ea..34e3fc8b737fd 100644 --- a/roottest/python/distrdf/backends/check_backend.py +++ b/roottest/python/distrdf/backends/check_backend.py @@ -59,6 +59,27 @@ def test_optimize_npartitions(self, payload): backend = Backend.SparkBackend(sparkcontext=connection) assert backend.optimize_npartitions() == 2 + def test_dask_backend_handles_missing_workers(self, payload): + """ + Check that DaskBackend initialization succeeds when scheduler_info + does not provide worker information. + """ + connection, backend = payload + + if backend != "dask": + return + + from ROOT._distrdf.Backends.Dask import Backend + + original_scheduler_info = connection.scheduler_info + + try: + connection.scheduler_info = lambda: {} + backend = Backend.DaskBackend(daskclient=connection) + assert backend.client is connection + finally: + connection.scheduler_info = original_scheduler_info + class TestInitialization: """Check initialization method in the Dask backend""" From 37cac3f3d5cb77dd5eb83e690fa7bb8cb0ce750a Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Fri, 12 Jun 2026 21:29:18 +0530 Subject: [PATCH 5/6] Add tests for Dask worker validation --- .../python/distrdf/backends/check_backend.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/roottest/python/distrdf/backends/check_backend.py b/roottest/python/distrdf/backends/check_backend.py index 34e3fc8b737fd..d91ea82e47ec9 100644 --- a/roottest/python/distrdf/backends/check_backend.py +++ b/roottest/python/distrdf/backends/check_backend.py @@ -4,7 +4,6 @@ import ROOT import ROOT._distrdf - class TestBackendInit: """ Tests to ensure that the instance variables of `DaskBackend` class have the @@ -75,11 +74,42 @@ def test_dask_backend_handles_missing_workers(self, payload): try: connection.scheduler_info = lambda: {} + backend = Backend.DaskBackend(daskclient=connection) assert backend.client is connection + + df = ROOT.RDataFrame(10, executor=connection) + assert df.Count().GetValue() == 10 + finally: connection.scheduler_info = original_scheduler_info + def test_dask_backend_rejects_threaded_workers(self): + """ + Check that DaskBackend rejects threaded workers. + """ + from dask.distributed import Client + from dask.distributed import LocalCluster + from ROOT._distrdf.Backends.Dask import Backend + + cluster = LocalCluster( + n_workers=1, + threads_per_worker=2, + processes=False + ) + + client = Client(cluster) + + try: + with pytest.raises( + RuntimeError, + match="DistRDF with Dask does not support threaded workers" + ): + Backend.DaskBackend(daskclient=client) + finally: + client.close() + cluster.close() + class TestInitialization: """Check initialization method in the Dask backend""" From ed91ab08a314ae432685a6da9bf6a1b3aaeda589 Mon Sep 17 00:00:00 2001 From: JAGANNATHANJP Date: Wed, 17 Jun 2026 23:14:21 +0530 Subject: [PATCH 6/6] Fix ruff formatting in Dask backend tests --- .../python/distrdf/backends/check_backend.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/roottest/python/distrdf/backends/check_backend.py b/roottest/python/distrdf/backends/check_backend.py index d91ea82e47ec9..f8d15ee313dff 100644 --- a/roottest/python/distrdf/backends/check_backend.py +++ b/roottest/python/distrdf/backends/check_backend.py @@ -4,6 +4,7 @@ import ROOT import ROOT._distrdf + class TestBackendInit: """ Tests to ensure that the instance variables of `DaskBackend` class have the @@ -88,29 +89,21 @@ def test_dask_backend_rejects_threaded_workers(self): """ Check that DaskBackend rejects threaded workers. """ - from dask.distributed import Client - from dask.distributed import LocalCluster + from dask.distributed import Client, LocalCluster from ROOT._distrdf.Backends.Dask import Backend - cluster = LocalCluster( - n_workers=1, - threads_per_worker=2, - processes=False - ) - + cluster = LocalCluster(n_workers=1, threads_per_worker=2, processes=False) client = Client(cluster) try: - with pytest.raises( - RuntimeError, - match="DistRDF with Dask does not support threaded workers" - ): + with pytest.raises(RuntimeError, match="DistRDF with Dask does not support threaded workers"): Backend.DaskBackend(daskclient=client) finally: client.close() cluster.close() + class TestInitialization: """Check initialization method in the Dask backend"""