Skip to content

Commit c9e7aca

Browse files
authored
Add worker name as prefix to ThreadPoolExecutor name (#9120)
1 parent 8e604a0 commit c9e7aca

2 files changed

Lines changed: 34 additions & 3 deletions

File tree

distributed/tests/test_worker.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2148,6 +2148,31 @@ def get_thread_name():
21482148
assert "Dask-Foo-Threads" in gpu_result
21492149

21502150

2151+
@gen_cluster(client=True, nthreads=[])
2152+
async def test_executor_inherit_threadname_from_worker(c, s):
2153+
def get_thread_name():
2154+
return threading.current_thread().name
2155+
2156+
async with Worker(
2157+
s.address,
2158+
nthreads=1,
2159+
name="WorkerName",
2160+
):
2161+
result = await c.gather(c.submit(get_thread_name, pure=False))
2162+
assert "WorkerName-Dask-Default-Threads" in result
2163+
2164+
async with Worker(
2165+
s.address,
2166+
nthreads=1,
2167+
name="ALongWorkerNameThatNoOneWillProbablyEverAssignButThisTestsTheRobustnessOfLogic",
2168+
):
2169+
result = await c.gather(c.submit(get_thread_name, pure=False))
2170+
assert (
2171+
"ALongWorkerNameThatNoOneWillProbablyEverAssignButThisTestsTheRobustnessOfLogic-Dask-Default-Threads"
2172+
in result
2173+
)
2174+
2175+
21512176
@gen_cluster(client=True)
21522177
async def test_bad_executor_annotation(c, s, a, b):
21532178
with dask.annotate(executor="bad"):

distributed/worker.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -647,10 +647,15 @@ def __init__(
647647
if scheduler_sni:
648648
self.connection_args["server_hostname"] = scheduler_sni
649649

650+
self.name = name
651+
652+
executor_pool_prefix = f"{self.name}-" if self.name else ""
650653
# Common executors always available
651654
self.executors = {
652655
"offload": utils._offload_executor,
653-
"actor": ThreadPoolExecutor(1, thread_name_prefix="Dask-Actor-Threads"),
656+
"actor": ThreadPoolExecutor(
657+
1, thread_name_prefix=f"{executor_pool_prefix}Dask-Actor-Threads"
658+
),
654659
}
655660

656661
# Find the default executor
@@ -660,13 +665,14 @@ def __init__(
660665
self.executors.update(executor)
661666
elif executor is not None:
662667
self.executors["default"] = executor
668+
663669
if "default" not in self.executors:
664670
self.executors["default"] = ThreadPoolExecutor(
665-
nthreads, thread_name_prefix="Dask-Default-Threads"
671+
nthreads,
672+
thread_name_prefix=f"{executor_pool_prefix}Dask-Default-Threads",
666673
)
667674

668675
self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
669-
self.name = name
670676
self.scheduler_delay = 0
671677
self.stream_comms = {}
672678

0 commit comments

Comments
 (0)