Skip to content

Commit 75dcb56

Browse files
committed
refactor: just remove the concurrency limits entirely
1 parent e96e7ce commit 75dcb56

File tree

9 files changed

+23
-487
lines changed

9 files changed

+23
-487
lines changed

docs/user-guide/performance.md

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -190,29 +190,7 @@ scenarios.
190190

191191
### Concurrent I/O operations
192192

193-
For latency-sensitve storage backends like HTTP and cloud object storage, Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks.
194-
Concurrency is controlled at the **store level**. Many of the stores defined in `zarr-python` accept a concurrency limit on construction via the `concurrency_limit` parameter.
195-
196-
```python
197-
import zarr
198-
199-
# Local filesystem store with custom concurrency limit
200-
store = zarr.storage.LocalStore("data/my_array.zarr", concurrency_limit=64)
201-
202-
# Remote store with higher concurrency for network I/O
203-
from obstore.store import S3Store
204-
store = zarr.storage.ObjectStore(S3Store.from_url("s3://bucket/path"), concurrency_limit=128)
205-
```
206-
207-
Higher concurrency values can improve throughput when:
208-
- Working with remote storage (e.g., S3, GCS) where network latency is high
209-
- Reading/writing many small chunks in parallel
210-
- The storage backend can handle many concurrent requests
211-
212-
Lower concurrency values may be beneficial when:
213-
- Working with local storage with limited I/O bandwidth
214-
- Memory is constrained (each concurrent operation requires buffer space)
215-
- Using Zarr within a parallel computing framework (see below)
193+
For latency-sensitive storage backends like HTTP and cloud object storage, Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks. Zarr does not impose its own concurrency limits — storage backends are expected to manage their own concurrency constraints (e.g., connection pool sizes, rate limits). If you need to limit concurrency for a particular backend, configure it at the storage layer (e.g., via fsspec or obstore options).
216194

217195
### Thread pool size (`threading.max_workers`)
218196

@@ -238,32 +216,21 @@ concurrently.
238216

239217
### Using Zarr with Dask
240218

241-
[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and the store's concurrency limit.
219+
[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and Zarr's internal thread pool.
242220

243-
**Important**: When using many Dask threads, you may need to reduce the store's `concurrency_limit` and Zarr's `threading.max_workers` setting to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as:
244-
245-
```
246-
total_concurrency ≈ dask_threads × store_concurrency_limit
247-
```
248-
249-
For example, if you're running Dask with 10 threads and a store concurrency limit of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues.
250-
251-
**Recommendation**: When using Dask with many threads, configure concurrency settings:
221+
**Recommendation**: When using Dask with many threads, reduce Zarr's internal thread pool to avoid thread contention:
252222

253223
```python
254224
import zarr
255225
import dask.array as da
256226

257-
# Create store with reduced concurrency limit for Dask workloads
258-
store = zarr.storage.LocalStore("data/large_array.zarr", concurrency_limit=4)
259-
260-
# Also limit Zarr's internal thread pool
227+
# Limit Zarr's internal thread pool
261228
zarr.config.set({
262229
'threading.max_workers': 4, # Limit Zarr's internal thread pool
263230
})
264231

265232
# Open Zarr array
266-
z = zarr.open_array(store=store, mode='r')
233+
z = zarr.open_array("data/large_array.zarr", mode='r')
267234

268235
# Create Dask array from Zarr array
269236
arr = da.from_array(z, chunks=z.chunks)
@@ -272,13 +239,6 @@ arr = da.from_array(z, chunks=z.chunks)
272239
result = arr.mean(axis=0).compute()
273240
```
274241

275-
**Configuration guidelines for Dask workloads**:
276-
277-
- `concurrency_limit` (per-store): Controls the maximum number of concurrent async I/O operations for a given store. Start with a lower value (e.g., 4-8) when using many Dask threads.
278-
- `threading.max_workers` (global config): Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler.
279-
280-
You may need to experiment with different values to find the optimal balance for your workload. Monitor your system's resource usage and adjust these settings based on whether your storage system or CPU is the bottleneck.
281-
282242
### Thread safety and process safety
283243

284244
Zarr arrays are designed to be thread-safe for concurrent reads and writes from multiple threads within the same process. However, proper synchronization is required when writing to overlapping regions from multiple threads.

src/zarr/abc/codec.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,6 @@ async def decode_partial(
246246
-------
247247
Iterable[NDBuffer | None]
248248
"""
249-
# Store handles concurrency limiting internally
250249
return await asyncio.gather(*[self._decode_partial_single(*info) for info in batch_info])
251250

252251

@@ -280,7 +279,6 @@ async def encode_partial(
280279
The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data.
281280
The chunk spec contains information about the chunk.
282281
"""
283-
# Store handles concurrency limiting internally
284282
await asyncio.gather(*[self._encode_partial_single(*info) for info in batch_info])
285283

286284

src/zarr/core/config.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,13 @@
2929

3030
from __future__ import annotations
3131

32-
import os
33-
import warnings
3432
from typing import TYPE_CHECKING, Any, Literal, cast
3533

3634
from donfig import Config as DConfig
3735

3836
if TYPE_CHECKING:
39-
from collections.abc import Mapping
40-
4137
from donfig.config_obj import ConfigSet
4238

43-
# Config keys that have been moved from global config to per-store parameters.
44-
# Maps old config key to a warning message.
45-
_warn_on_set: dict[str, str] = {
46-
"async.concurrency": (
47-
"The 'async.concurrency' configuration key has no effect. "
48-
"Concurrency limits are now set per-store via the 'concurrency_limit' "
49-
"parameter. For example: zarr.storage.LocalStore(..., concurrency_limit=10)."
50-
),
51-
}
52-
53-
# Environment variable forms of the keys above (ZARR_ASYNC__CONCURRENCY -> async.concurrency)
54-
_warn_on_set_env: dict[str, str] = {
55-
"ZARR_ASYNC__CONCURRENCY": _warn_on_set["async.concurrency"],
56-
}
57-
5839

5940
class BadConfigError(ValueError):
6041
_msg = "bad Config: %r"
@@ -74,25 +55,6 @@ class Config(DConfig): # type: ignore[misc]
7455
7556
"""
7657

77-
def set(self, arg: Mapping[str, Any] | None = None, **kwargs: Any) -> ConfigSet:
78-
# Check for keys that now belong to per-store config
79-
if arg is not None:
80-
for key in arg:
81-
if key in _warn_on_set:
82-
warnings.warn(_warn_on_set[key], UserWarning, stacklevel=2)
83-
for key in kwargs:
84-
normalized = key.replace("__", ".")
85-
if normalized in _warn_on_set:
86-
warnings.warn(_warn_on_set[normalized], UserWarning, stacklevel=2)
87-
return super().set(arg, **kwargs)
88-
89-
def refresh(self, **kwargs: Any) -> None:
90-
# Warn if env vars are being used for removed config keys
91-
for env_key, message in _warn_on_set_env.items():
92-
if env_key in os.environ:
93-
warnings.warn(message, UserWarning, stacklevel=2)
94-
super().refresh(**kwargs)
95-
9658
def reset(self) -> None:
9759
self.clear()
9860
self.refresh()

src/zarr/storage/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
from zarr.storage._logging import LoggingStore
1111
from zarr.storage._memory import GpuMemoryStore, MemoryStore
1212
from zarr.storage._obstore import ObjectStore
13-
from zarr.storage._utils import ConcurrencyLimiter
1413
from zarr.storage._wrapper import WrapperStore
1514
from zarr.storage._zip import ZipStore
1615

1716
__all__ = [
18-
"ConcurrencyLimiter",
1917
"FsspecStore",
2018
"GpuMemoryStore",
2119
"LocalStore",

src/zarr/storage/_fsspec.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import json
54
import warnings
65
from contextlib import suppress
@@ -18,7 +17,6 @@
1817
from zarr.core.buffer import Buffer
1918
from zarr.errors import ZarrUserWarning
2019
from zarr.storage._common import _dereference_path
21-
from zarr.storage._utils import ConcurrencyLimiter, with_concurrency_limit
2220

2321
if TYPE_CHECKING:
2422
from collections.abc import AsyncIterator, Iterable
@@ -69,7 +67,7 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
6967
return AsyncFileSystemWrapper(fs, asynchronous=True)
7068

7169

72-
class FsspecStore(Store, ConcurrencyLimiter):
70+
class FsspecStore(Store):
7371
"""
7472
Store for remote data based on FSSpec.
7573
@@ -84,9 +82,6 @@ class FsspecStore(Store, ConcurrencyLimiter):
8482
filesystem scheme.
8583
allowed_exceptions : tuple[type[Exception], ...]
8684
When fetching data, these cases will be deemed to correspond to missing keys.
87-
concurrency_limit : int, optional
88-
Maximum number of concurrent I/O operations. Default is 50.
89-
Set to None for unlimited concurrency.
9085
9186
Attributes
9287
----------
@@ -130,10 +125,8 @@ def __init__(
130125
read_only: bool = False,
131126
path: str = "/",
132127
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
133-
concurrency_limit: int | None = 50,
134128
) -> None:
135-
Store.__init__(self, read_only=read_only)
136-
ConcurrencyLimiter.__init__(self, concurrency_limit)
129+
super().__init__(read_only=read_only)
137130
self.fs = fs
138131
self.path = path
139132
self.allowed_exceptions = allowed_exceptions
@@ -259,7 +252,6 @@ def with_read_only(self, read_only: bool = False) -> FsspecStore:
259252
path=self.path,
260253
allowed_exceptions=self.allowed_exceptions,
261254
read_only=read_only,
262-
concurrency_limit=self.concurrency_limit,
263255
)
264256

265257
async def clear(self) -> None:
@@ -282,7 +274,6 @@ def __eq__(self, other: object) -> bool:
282274
and self.fs == other.fs
283275
)
284276

285-
@with_concurrency_limit
286277
async def get(
287278
self,
288279
key: str,
@@ -325,7 +316,6 @@ async def get(
325316
else:
326317
return value
327318

328-
@with_concurrency_limit
329319
async def set(
330320
self,
331321
key: str,
@@ -346,24 +336,6 @@ async def set(
346336
raise NotImplementedError
347337
await self.fs._pipe_file(path, value.to_bytes())
348338

349-
async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
350-
# Override to avoid deadlock from calling decorated set() method
351-
if not self._is_open:
352-
await self._open()
353-
self._check_writable()
354-
355-
async def _set_with_limit(key: str, value: Buffer) -> None:
356-
if not isinstance(value, Buffer):
357-
raise TypeError(
358-
f"FsspecStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
359-
)
360-
path = _dereference_path(self.path, key)
361-
async with self._limit():
362-
await self.fs._pipe_file(path, value.to_bytes())
363-
364-
await asyncio.gather(*[_set_with_limit(key, value) for key, value in values])
365-
366-
@with_concurrency_limit
367339
async def delete(self, key: str) -> None:
368340
# docstring inherited
369341
self._check_writable()

0 commit comments

Comments
 (0)