Skip to content

Commit f3d92f6

Browse files
committed
update docs, warn when config is used to set concurrency limits
1 parent 17c7226 commit f3d92f6

5 files changed

Lines changed: 77 additions & 25 deletions

File tree

changes/3547.misc.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
Moved concurrency limits to a global per-event loop setting instead of per-array call.
1+
Moved concurrency-limiting functionality to store classes. The global configuration object no longer
2+
controls concurrency limits. Concurrency limits, if applicable, must now be specified when constructing a store.

docs/user-guide/config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Configuration options include the following:
3030
- Default Zarr format `default_zarr_version`
3131
- Default array order in memory `array.order`
3232
- Whether empty chunks are written to storage `array.write_empty_chunks`
33-
- Async and threading options, e.g. `async.concurrency` and `threading.max_workers`
33+
- Threading options, e.g. `threading.max_workers`
3434
- Selections of implementations of codecs, codec pipelines and buffers
3535
- Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more.
3636

docs/user-guide/performance.md

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,18 @@ scenarios.
191191
### Concurrent I/O operations
192192

193193
Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks.
194-
The level of concurrency is controlled by the `async.concurrency` configuration setting, which
195-
determines the maximum number of concurrent I/O operations.
196-
197-
The default value is 10, which is a conservative value. You may get improved performance by tuning
198-
the concurrency limit. You can adjust this value based on your specific needs:
194+
Concurrency is controlled at the **store level** — each store instance can have its own concurrency
195+
limit, set via the `concurrency_limit` parameter when creating the store.
199196

200197
```python
201198
import zarr
202199

203-
# Set concurrency for the current session
204-
zarr.config.set({'async.concurrency': 128})
200+
# Local filesystem store with custom concurrency limit
201+
store = zarr.storage.LocalStore("data/my_array.zarr", concurrency_limit=64)
205202

206-
# Or use environment variable
207-
# export ZARR_ASYNC_CONCURRENCY=128
203+
# Remote store with higher concurrency for network I/O
204+
from obstore.store import S3Store
205+
store = zarr.storage.ObjectStore(S3Store.from_url("s3://bucket/path"), concurrency_limit=128)
208206
```
209207

210208
Higher concurrency values can improve throughput when:
@@ -217,32 +215,36 @@ Lower concurrency values may be beneficial when:
217215
- Memory is constrained (each concurrent operation requires buffer space)
218216
- Using Zarr within a parallel computing framework (see below)
219217

218+
Set `concurrency_limit=None` to disable the concurrency limit entirely.
219+
220220
### Using Zarr with Dask
221221

222-
[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and Zarr's concurrency settings.
222+
[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and the store's concurrency limit.
223223

224-
**Important**: When using many Dask threads, you may need to reduce both Zarr's `async.concurrency` and `threading.max_workers` settings to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as:
224+
**Important**: When using many Dask threads, you may need to reduce the store's `concurrency_limit` and Zarr's `threading.max_workers` setting to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as:
225225

226226
```
227-
total_concurrency ≈ dask_threads × zarr_async_concurrency
227+
total_concurrency ≈ dask_threads × store_concurrency_limit
228228
```
229229

230-
For example, if you're running Dask with 10 threads and Zarr's default concurrency of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues.
230+
For example, if you're running Dask with 10 threads and a store concurrency limit of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues.
231231

232-
**Recommendation**: When using Dask with many threads, configure Zarr's concurrency settings:
232+
**Recommendation**: When using Dask with many threads, configure concurrency settings:
233233

234234
```python
235235
import zarr
236236
import dask.array as da
237237

238-
# If using Dask with many threads (e.g., 8-16), reduce Zarr's concurrency settings
238+
# Create store with reduced concurrency limit for Dask workloads
239+
store = zarr.storage.LocalStore("data/large_array.zarr", concurrency_limit=4)
240+
241+
# Also limit Zarr's internal thread pool
239242
zarr.config.set({
240-
'async.concurrency': 4, # Limit concurrent async operations
241243
'threading.max_workers': 4, # Limit Zarr's internal thread pool
242244
})
243245

244246
# Open Zarr array
245-
z = zarr.open_array('data/large_array.zarr', mode='r')
247+
z = zarr.open_array(store=store, mode='r')
246248

247249
# Create Dask array from Zarr array
248250
arr = da.from_array(z, chunks=z.chunks)
@@ -253,8 +255,8 @@ result = arr.mean(axis=0).compute()
253255

254256
**Configuration guidelines for Dask workloads**:
255257

256-
- `async.concurrency`: Controls the maximum number of concurrent async I/O operations. Start with a lower value (e.g., 4-8) when using many Dask threads.
257-
- `threading.max_workers`: Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler.
258+
- `concurrency_limit` (per-store): Controls the maximum number of concurrent async I/O operations for a given store. Start with a lower value (e.g., 4-8) when using many Dask threads.
259+
- `threading.max_workers` (global config): Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler.
258260

259261
You may need to experiment with different values to find the optimal balance for your workload. Monitor your system's resource usage and adjust these settings based on whether your storage system or CPU is the bottleneck.
260262

src/zarr/core/config.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,32 @@
2929

3030
from __future__ import annotations
3131

32+
import os
33+
import warnings
3234
from typing import TYPE_CHECKING, Any, Literal, cast
3335

3436
from donfig import Config as DConfig
3537

3638
if TYPE_CHECKING:
39+
from collections.abc import Mapping
40+
3741
from donfig.config_obj import ConfigSet
3842

43+
# Config keys that have been moved from global config to per-store parameters.
44+
# Maps old config key to a warning message.
45+
_warn_on_set: dict[str, str] = {
46+
"async.concurrency": (
47+
"The 'async.concurrency' configuration key has no effect. "
48+
"Concurrency limits are now set per-store via the 'concurrency_limit' "
49+
"parameter. For example: zarr.storage.LocalStore(..., concurrency_limit=10)."
50+
),
51+
}
52+
53+
# Environment variable forms of the keys above (ZARR_ASYNC__CONCURRENCY -> async.concurrency)
54+
_warn_on_set_env: dict[str, str] = {
55+
"ZARR_ASYNC__CONCURRENCY": _warn_on_set["async.concurrency"],
56+
}
57+
3958

4059
class BadConfigError(ValueError):
4160
_msg = "bad Config: %r"
@@ -55,6 +74,25 @@ class Config(DConfig): # type: ignore[misc]
5574
5675
"""
5776

77+
def set(self, arg: Mapping[str, Any] | None = None, **kwargs: Any) -> ConfigSet:
78+
# Check for keys that now belong to per-store config
79+
if arg is not None:
80+
for key in arg:
81+
if key in _warn_on_set:
82+
warnings.warn(_warn_on_set[key], UserWarning, stacklevel=2)
83+
for key in kwargs:
84+
normalized = key.replace("__", ".")
85+
if normalized in _warn_on_set:
86+
warnings.warn(_warn_on_set[normalized], UserWarning, stacklevel=2)
87+
return super().set(arg, **kwargs)
88+
89+
def refresh(self, **kwargs: Any) -> None:
90+
# Warn if env vars are being used for removed config keys
91+
for env_key, message in _warn_on_set_env.items():
92+
if env_key in os.environ:
93+
warnings.warn(message, UserWarning, stacklevel=2)
94+
super().refresh(**kwargs)
95+
5896
def reset(self) -> None:
5997
self.clear()
6098
self.refresh()
@@ -98,7 +136,7 @@ def enable_gpu(self) -> ConfigSet:
98136
"write_empty_chunks": False,
99137
"target_shard_size_bytes": None,
100138
},
101-
"async": {"concurrency": 10, "timeout": None},
139+
"async": {"timeout": None},
102140
"threading": {"max_workers": None},
103141
"json_indent": 2,
104142
"codec_pipeline": {

tests/test_config.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_config_defaults_set() -> None:
5555
"write_empty_chunks": False,
5656
"target_shard_size_bytes": None,
5757
},
58-
"async": {"concurrency": 10, "timeout": None},
58+
"async": {"timeout": None},
5959
"threading": {"max_workers": None},
6060
"json_indent": 2,
6161
"codec_pipeline": {
@@ -101,15 +101,14 @@ def test_config_defaults_set() -> None:
101101
]
102102
)
103103
assert config.get("array.order") == "C"
104-
assert config.get("async.concurrency") == 10
105104
assert config.get("async.timeout") is None
106105
assert config.get("codec_pipeline.batch_size") == 1
107106
assert config.get("json_indent") == 2
108107

109108

110109
@pytest.mark.parametrize(
111110
("key", "old_val", "new_val"),
112-
[("array.order", "C", "F"), ("async.concurrency", 10, 128), ("json_indent", 2, 0)],
111+
[("array.order", "C", "F"), ("json_indent", 2, 0)],
113112
)
114113
def test_config_defaults_can_be_overridden(key: str, old_val: Any, new_val: Any) -> None:
115114
assert config.get(key) == old_val
@@ -347,3 +346,15 @@ def test_deprecated_config(key: str) -> None:
347346
with pytest.raises(ValueError):
348347
with zarr.config.set({key: "foo"}):
349348
pass
349+
350+
351+
def test_async_concurrency_config_warns() -> None:
352+
"""Test that setting async.concurrency emits a warning directing users to per-store config."""
353+
with pytest.warns(UserWarning, match="async.concurrency.*no effect"):
354+
with zarr.config.set({"async.concurrency": 20}):
355+
pass
356+
357+
# Also test the kwarg form
358+
with pytest.warns(UserWarning, match="async.concurrency.*no effect"):
359+
with zarr.config.set(async__concurrency=20):
360+
pass

0 commit comments

Comments
 (0)