Skip to content

Commit 21fc1d5

Browse files
committed
remove more references to the global concurrency limit
1 parent 5233bcd commit 21fc1d5

7 files changed

Lines changed: 103 additions & 767 deletions

File tree

src/zarr/core/common.py

Lines changed: 1 addition & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import functools
54
import math
65
import operator
7-
import threading
86
import warnings
9-
import weakref
107
from collections.abc import Iterable, Mapping, Sequence
118
from enum import Enum
12-
from itertools import starmap
139
from typing import (
1410
TYPE_CHECKING,
1511
Any,
@@ -29,7 +25,7 @@
2925
from zarr.errors import ZarrRuntimeWarning
3026

3127
if TYPE_CHECKING:
32-
from collections.abc import Awaitable, Callable, Iterator
28+
from collections.abc import Iterator
3329

3430

3531
ZARR_JSON = "zarr.json"
@@ -96,139 +92,6 @@ def ceildiv(a: float, b: float) -> int:
9692
return math.ceil(a / b)
9793

9894

99-
T = TypeVar("T", bound=tuple[Any, ...])
100-
V = TypeVar("V")
101-
102-
103-
# Global semaphore management for per-process concurrency limiting
104-
# Use WeakKeyDictionary to automatically clean up semaphores when event loops are garbage collected
105-
_global_semaphores: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, asyncio.Semaphore] = (
106-
weakref.WeakKeyDictionary()
107-
)
108-
# Use threading.Lock instead of asyncio.Lock to coordinate across event loops
109-
_global_semaphore_lock = threading.Lock()
110-
111-
112-
def get_global_semaphore() -> asyncio.Semaphore:
113-
"""
114-
Get the global semaphore for the current event loop.
115-
116-
This ensures that all concurrent operations across the process share the same
117-
concurrency limit, preventing excessive concurrent task creation when multiple
118-
arrays or operations are running simultaneously.
119-
120-
The semaphore is lazily created per event loop and uses the configured
121-
`async.concurrency` value from zarr config. The semaphore is cached per event
122-
loop, so subsequent calls return the same semaphore instance.
123-
124-
Note: Config changes after the first call will not affect the semaphore limit.
125-
To apply new config values, use :func:`reset_global_semaphores` to clear the cache.
126-
127-
Returns
128-
-------
129-
asyncio.Semaphore
130-
The global semaphore for this event loop.
131-
132-
Raises
133-
------
134-
RuntimeError
135-
If called outside of an async context (no running event loop).
136-
137-
See Also
138-
--------
139-
reset_global_semaphores : Clear the global semaphore cache
140-
"""
141-
loop = asyncio.get_running_loop()
142-
143-
# Acquire lock FIRST to prevent TOCTOU race condition
144-
with _global_semaphore_lock:
145-
if loop not in _global_semaphores:
146-
limit = zarr_config.get("async.concurrency")
147-
_global_semaphores[loop] = asyncio.Semaphore(limit)
148-
return _global_semaphores[loop]
149-
150-
151-
def reset_global_semaphores() -> None:
152-
"""
153-
Clear all cached global semaphores.
154-
155-
This is useful when you want config changes to take effect, or for testing.
156-
The next call to :func:`get_global_semaphore` will create a new semaphore
157-
using the current configuration.
158-
159-
Warning: This should only be called when no async operations are in progress,
160-
as it will invalidate all existing semaphore references.
161-
162-
Examples
163-
--------
164-
>>> import zarr
165-
>>> zarr.config.set({"async.concurrency": 50})
166-
>>> reset_global_semaphores() # Apply new config
167-
"""
168-
with _global_semaphore_lock:
169-
_global_semaphores.clear()
170-
171-
172-
async def concurrent_map(
173-
items: Iterable[T],
174-
func: Callable[..., Awaitable[V]],
175-
limit: int | None = None,
176-
*,
177-
use_global_semaphore: bool = True,
178-
) -> list[V]:
179-
"""
180-
Execute an async function concurrently over multiple items with concurrency limiting.
181-
182-
Parameters
183-
----------
184-
items : Iterable[T]
185-
Items to process, where each item is a tuple of arguments to pass to func.
186-
func : Callable[..., Awaitable[V]]
187-
Async function to execute for each item.
188-
limit : int | None, optional
189-
If provided and use_global_semaphore is False, creates a local semaphore
190-
with this limit. If None, no concurrency limiting is applied.
191-
use_global_semaphore : bool, default True
192-
If True, uses the global per-process semaphore for concurrency limiting,
193-
ensuring all concurrent operations share the same limit. If False, uses
194-
the `limit` parameter for local limiting (legacy behavior).
195-
196-
Returns
197-
-------
198-
list[V]
199-
Results from executing func on all items.
200-
"""
201-
if use_global_semaphore:
202-
if limit is not None:
203-
raise ValueError(
204-
"Cannot specify both use_global_semaphore=True and a limit value. "
205-
"Either use the global semaphore (use_global_semaphore=True, limit=None) "
206-
"or specify a local limit (use_global_semaphore=False, limit=<int>)."
207-
)
208-
# Use the global semaphore for process-wide concurrency limiting
209-
sem = get_global_semaphore()
210-
211-
async def run(item: tuple[Any]) -> V:
212-
async with sem:
213-
return await func(*item)
214-
215-
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
216-
217-
elif limit is None:
218-
# No concurrency limiting
219-
return await asyncio.gather(*list(starmap(func, items)))
220-
221-
else:
222-
# Legacy mode: create local semaphore with specified limit
223-
sem = asyncio.Semaphore(limit)
224-
225-
async def run(item: tuple[Any]) -> V:
226-
async with sem:
227-
return await func(*item)
228-
229-
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
230-
231-
23295
E = TypeVar("E", bound=Enum)
23396

23497

src/zarr/core/group.py

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
NodeType,
4545
ShapeLike,
4646
ZarrFormat,
47-
get_global_semaphore,
4847
parse_shapelike,
4948
)
5049
from zarr.core.config import config
@@ -1441,13 +1440,10 @@ async def _members(
14411440
)
14421441

14431442
raise ValueError(msg)
1444-
# Use global semaphore for process-wide concurrency limiting
1445-
semaphore = get_global_semaphore()
14461443
async for member in _iter_members_deep(
14471444
self,
14481445
max_depth=max_depth,
14491446
skip_keys=skip_keys,
1450-
semaphore=semaphore,
14511447
use_consolidated_for_children=use_consolidated_for_children,
14521448
):
14531449
yield member
@@ -3324,13 +3320,11 @@ async def create_nodes(
33243320
The created nodes in the order they are created.
33253321
"""
33263322

3327-
# Use global semaphore for process-wide concurrency limiting
3328-
semaphore = get_global_semaphore()
33293323
create_tasks: list[Coroutine[None, None, str]] = []
33303324

33313325
for key, value in nodes.items():
33323326
# make the key absolute
3333-
create_tasks.extend(_persist_metadata(store, key, value, semaphore=semaphore))
3327+
create_tasks.extend(_persist_metadata(store, key, value))
33343328

33353329
created_object_keys = []
33363330

@@ -3476,28 +3470,16 @@ def _ensure_consistent_zarr_format(
34763470
)
34773471

34783472

3479-
async def _getitem_semaphore(
3480-
node: AsyncGroup, key: str, semaphore: asyncio.Semaphore | None
3481-
) -> AnyAsyncArray | AsyncGroup:
3473+
async def _getitem(node: AsyncGroup, key: str) -> AnyAsyncArray | AsyncGroup:
34823474
"""
3483-
Wrap Group.getitem with an optional semaphore.
3484-
3485-
If the semaphore parameter is an
3486-
asyncio.Semaphore instance, then the getitem operation is performed inside an async context
3487-
manager provided by that semaphore. If the semaphore parameter is None, then getitem is invoked
3488-
without a context manager.
3475+
Fetch a child node from a group by key.
34893476
"""
3490-
if semaphore is not None:
3491-
async with semaphore:
3492-
return await node.getitem(key)
3493-
else:
3494-
return await node.getitem(key)
3477+
return await node.getitem(key)
34953478

34963479

34973480
async def _iter_members(
34983481
node: AsyncGroup,
34993482
skip_keys: tuple[str, ...],
3500-
semaphore: asyncio.Semaphore | None,
35013483
) -> AsyncGenerator[tuple[str, AnyAsyncArray | AsyncGroup], None]:
35023484
"""
35033485
Iterate over the arrays and groups contained in a group.
@@ -3508,8 +3490,6 @@ async def _iter_members(
35083490
The group to traverse.
35093491
skip_keys : tuple[str, ...]
35103492
A tuple of keys to skip when iterating over the possible members of the group.
3511-
semaphore : asyncio.Semaphore | None
3512-
An optional semaphore to use for concurrency control.
35133493
35143494
Yields
35153495
------
@@ -3520,10 +3500,7 @@ async def _iter_members(
35203500
keys = [key async for key in node.store.list_dir(node.path)]
35213501
keys_filtered = tuple(filter(lambda v: v not in skip_keys, keys))
35223502

3523-
node_tasks = tuple(
3524-
asyncio.create_task(_getitem_semaphore(node, key, semaphore), name=key)
3525-
for key in keys_filtered
3526-
)
3503+
node_tasks = tuple(asyncio.create_task(_getitem(node, key), name=key) for key in keys_filtered)
35273504

35283505
for fetched_node_coro in asyncio.as_completed(node_tasks):
35293506
try:
@@ -3550,7 +3527,6 @@ async def _iter_members_deep(
35503527
*,
35513528
max_depth: int | None,
35523529
skip_keys: tuple[str, ...],
3553-
semaphore: asyncio.Semaphore | None = None,
35543530
use_consolidated_for_children: bool = True,
35553531
) -> AsyncGenerator[tuple[str, AnyAsyncArray | AsyncGroup], None]:
35563532
"""
@@ -3565,8 +3541,6 @@ async def _iter_members_deep(
35653541
The maximum depth of recursion.
35663542
skip_keys : tuple[str, ...]
35673543
A tuple of keys to skip when iterating over the possible members of the group.
3568-
semaphore : asyncio.Semaphore | None
3569-
An optional semaphore to use for concurrency control.
35703544
use_consolidated_for_children : bool, default True
35713545
Whether to use the consolidated metadata of child groups loaded
35723546
from the store. Note that this only affects groups loaded from the
@@ -3585,7 +3559,7 @@ async def _iter_members_deep(
35853559
new_depth = None
35863560
else:
35873561
new_depth = max_depth - 1
3588-
async for name, node in _iter_members(group, skip_keys=skip_keys, semaphore=semaphore):
3562+
async for name, node in _iter_members(group, skip_keys=skip_keys):
35893563
is_group = isinstance(node, AsyncGroup)
35903564
if (
35913565
is_group
@@ -3599,9 +3573,7 @@ async def _iter_members_deep(
35993573
yield name, node
36003574
if is_group and do_recursion:
36013575
node = cast("AsyncGroup", node)
3602-
to_recurse[name] = _iter_members_deep(
3603-
node, max_depth=new_depth, skip_keys=skip_keys, semaphore=semaphore
3604-
)
3576+
to_recurse[name] = _iter_members_deep(node, max_depth=new_depth, skip_keys=skip_keys)
36053577

36063578
for prefix, subgroup_iter in to_recurse.items():
36073579
async for name, node in subgroup_iter:
@@ -3811,9 +3783,7 @@ async def get_node(store: Store, path: str, zarr_format: ZarrFormat) -> AnyAsync
38113783
raise ValueError(f"Unexpected zarr format: {zarr_format}") # pragma: no cover
38123784

38133785

3814-
async def _set_return_key(
3815-
*, store: Store, key: str, value: Buffer, semaphore: asyncio.Semaphore | None = None
3816-
) -> str:
3786+
async def _set_return_key(*, store: Store, key: str, value: Buffer) -> str:
38173787
"""
38183788
Write a value to storage at the given key. The key is returned.
38193789
Useful when saving values via routines that return results in execution order,
@@ -3828,31 +3798,23 @@ async def _set_return_key(
38283798
The key to save the value to.
38293799
value : Buffer
38303800
The value to save.
3831-
semaphore : asyncio.Semaphore | None
3832-
An optional semaphore to use to limit the number of concurrent writes.
38333801
"""
3834-
3835-
if semaphore is not None:
3836-
async with semaphore:
3837-
await store.set(key, value)
3838-
else:
3839-
await store.set(key, value)
3802+
await store.set(key, value)
38403803
return key
38413804

38423805

38433806
def _persist_metadata(
38443807
store: Store,
38453808
path: str,
38463809
metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata,
3847-
semaphore: asyncio.Semaphore | None = None,
38483810
) -> tuple[Coroutine[None, None, str], ...]:
38493811
"""
38503812
Prepare to save a metadata document to storage, returning a tuple of coroutines that must be awaited.
38513813
"""
38523814

38533815
to_save = metadata.to_buffer_dict(default_buffer_prototype())
38543816
return tuple(
3855-
_set_return_key(store=store, key=_join_paths([path, key]), value=value, semaphore=semaphore)
3817+
_set_return_key(store=store, key=_join_paths([path, key]), value=value)
38563818
for key, value in to_save.items()
38573819
)
38583820

0 commit comments

Comments
 (0)