Skip to content

Commit bf8c07d

Browse files
committed
remove unused or unneeded routines
1 parent e86cedd commit bf8c07d

6 files changed

Lines changed: 27 additions & 150 deletions

File tree

src/zarr/core/sync.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from zarr.core.config import config
1414

1515
if TYPE_CHECKING:
16-
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
16+
from collections.abc import AsyncIterator, Coroutine
1717
from typing import Any
1818

1919
logger = logging.getLogger(__name__)
@@ -211,17 +211,3 @@ async def iter_to_list() -> list[T]:
211211
return [item async for item in async_iterator]
212212

213213
return self._sync(iter_to_list())
214-
215-
216-
async def _with_semaphore(
217-
func: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore | None = None
218-
) -> T:
219-
"""
220-
Await the result of invoking the no-argument-callable ``func`` within the context manager
221-
provided by a Semaphore, if one is provided. Otherwise, just await the result of invoking
222-
``func``.
223-
"""
224-
if semaphore is None:
225-
return await func()
226-
async with semaphore:
227-
return await func()

src/zarr/storage/_local.py

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
)
2020
from zarr.core.buffer import Buffer
2121
from zarr.core.buffer.core import default_buffer_prototype
22-
from zarr.storage._utils import ConcurrencyLimiter, with_concurrency_limit
2322

2423
if TYPE_CHECKING:
2524
from collections.abc import AsyncIterator, Iterable, Iterator
@@ -86,7 +85,7 @@ def _put(path: Path, value: Buffer, exclusive: bool = False) -> int:
8685
return f.write(view)
8786

8887

89-
class LocalStore(Store, ConcurrencyLimiter):
88+
class LocalStore(Store):
9089
"""
9190
Store for the local file system.
9291
@@ -96,9 +95,6 @@ class LocalStore(Store, ConcurrencyLimiter):
9695
Directory to use as root of store.
9796
read_only : bool
9897
Whether the store is read-only
99-
concurrency_limit : int, optional
100-
Maximum number of concurrent I/O operations. Default is 100.
101-
Set to None for unlimited concurrency.
10298
10399
Attributes
104100
----------
@@ -119,24 +115,21 @@ def __init__(
119115
root: Path | str,
120116
*,
121117
read_only: bool = False,
122-
concurrency_limit: int | None = 100,
123118
) -> None:
124119
if isinstance(root, str):
125120
root = Path(root)
126121
if not isinstance(root, Path):
127122
raise TypeError(
128123
f"'root' must be a string or Path instance. Got an instance of {type(root)} instead."
129124
)
130-
Store.__init__(self, read_only=read_only)
131-
ConcurrencyLimiter.__init__(self, concurrency_limit)
125+
super().__init__(read_only=read_only)
132126
self.root = root
133127

134128
def with_read_only(self, read_only: bool = False) -> Self:
135129
# docstring inherited
136130
return type(self)(
137131
root=self.root,
138132
read_only=read_only,
139-
concurrency_limit=self.concurrency_limit,
140133
)
141134

142135
@classmethod
@@ -199,7 +192,6 @@ def __repr__(self) -> str:
199192
def __eq__(self, other: object) -> bool:
200193
return isinstance(other, type(self)) and self.root == other.root
201194

202-
@with_concurrency_limit
203195
async def get(
204196
self,
205197
key: str,
@@ -225,19 +217,8 @@ async def get_partial_values(
225217
key_ranges: Iterable[tuple[str, ByteRequest | None]],
226218
) -> list[Buffer | None]:
227219
# docstring inherited
228-
# We directly call the I/O functions here, wrapped with the semaphore,
229-
# to avoid deadlock from calling the decorated get() method.
230-
231-
async def _get_with_limit(key: str, byte_range: ByteRequest | None) -> Buffer | None:
232-
path = self.root / key
233-
try:
234-
async with self._limit():
235-
return await asyncio.to_thread(_get, path, prototype, byte_range)
236-
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
237-
return None
238-
239220
return await asyncio.gather(
240-
*[_get_with_limit(key, byte_range) for key, byte_range in key_ranges]
221+
*[self.get(key, prototype, byte_range) for key, byte_range in key_ranges]
241222
)
242223

243224
async def set(self, key: str, value: Buffer) -> None:
@@ -251,7 +232,6 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
251232
except FileExistsError:
252233
pass
253234

254-
@with_concurrency_limit
255235
async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None:
256236
if not self._is_open:
257237
await self._open()
@@ -264,7 +244,6 @@ async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None:
264244
path = self.root / key
265245
await asyncio.to_thread(_put, path, value, exclusive=exclusive)
266246

267-
@with_concurrency_limit
268247
async def delete(self, key: str) -> None:
269248
"""
270249
Remove a key from the store.

src/zarr/testing/store_concurrency.py

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -75,56 +75,36 @@ def test_concurrency_limit_custom(self, store_kwargs: dict[str, Any]) -> None:
7575
assert store.concurrency_limit is None
7676
assert store._semaphore is None
7777

78-
async def test_concurrency_limit_enforced(self, store: S) -> None:
79-
"""Test that the concurrency limit is actually enforced during execution.
78+
async def test_concurrency_limit_enforced(self, store_kwargs: dict[str, Any]) -> None:
79+
"""Test that store operations acquire the semaphore.
8080
81-
This test verifies that when many operations are submitted concurrently,
82-
only up to the concurrency limit are actually executing at once.
81+
Exhausts all semaphore slots externally, then verifies that a store
82+
operation blocks until a slot is released.
8383
"""
84-
semaphore = self._get_semaphore(store)
85-
if semaphore is None:
86-
pytest.skip("Store has no concurrency limit")
87-
88-
assert isinstance(store, ConcurrencyLimiter)
89-
assert store.concurrency_limit is not None # type: ignore[unreachable]
90-
limit = store.concurrency_limit
91-
92-
# We'll monitor the semaphore's available count
93-
# When it reaches 0, that means `limit` operations are running
94-
min_available = limit
95-
96-
async def monitored_operation(key: str, value: B) -> None:
97-
nonlocal min_available
98-
# Check semaphore state right after we're scheduled
99-
await asyncio.sleep(0) # Yield to ensure we're in the queue
100-
available = semaphore._value
101-
min_available = min(min_available, available)
84+
if not issubclass(self.store_cls, ConcurrencyLimiter):
85+
pytest.skip("Store does not support concurrency limits")
10286

103-
# Now do the actual operation (which will acquire the semaphore)
104-
await store.set(key, value)
87+
limit = 3
88+
store = self.store_cls(**{**store_kwargs, "concurrency_limit": limit}) # type: ignore[unreachable]
89+
await store._ensure_open()
10590

106-
# Launch more operations than the limit to ensure contention
107-
num_ops = limit * 2
108-
items = [
109-
(f"limit_test_key_{i}", self.buffer_cls.from_bytes(f"value_{i}".encode()))
110-
for i in range(num_ops)
111-
]
91+
sem = store._semaphore
92+
assert sem is not None
11293

113-
await asyncio.gather(*[monitored_operation(k, v) for k, v in items])
94+
# Exhaust all slots
95+
for _ in range(limit):
96+
await sem.acquire()
97+
assert sem._value == 0 # type: ignore[attr-defined]
11498

115-
# The semaphore should have been fully utilized (reached 0 or close to it)
116-
# This indicates that `limit` operations were running concurrently
117-
assert min_available < limit, (
118-
f"Semaphore was never fully utilized. "
119-
f"Min available: {min_available}, Limit: {limit}. "
120-
f"This suggests operations aren't running concurrently."
121-
)
99+
# A store operation should block because no slots are available
100+
buf = self.buffer_cls.from_bytes(b"x")
101+
task = asyncio.create_task(store.set("key", buf))
102+
await asyncio.sleep(0) # let the task attempt to acquire
103+
assert not task.done()
122104

123-
# Ideally it should reach 0, but allow some slack for timing
124-
assert min_available <= 5, (
125-
f"Semaphore only reached {min_available} available slots. "
126-
f"Expected close to 0 with limit {limit}."
127-
)
105+
# Release one slot — the task should now complete
106+
sem.release()
107+
await asyncio.wait_for(task, timeout=5.0)
128108

129109
async def test_batch_write_no_deadlock(self, store: S) -> None:
130110
"""Test that batch writes don't deadlock when exceeding concurrency limit."""

tests/test_global_concurrency.py

Lines changed: 0 additions & 43 deletions
This file was deleted.

tests/test_store/test_local.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from zarr.storage import LocalStore
1616
from zarr.storage._local import _atomic_write
1717
from zarr.testing.store import StoreTests
18-
from zarr.testing.store_concurrency import StoreConcurrencyTests
1918
from zarr.testing.utils import assert_bytes_equal
2019

2120
if TYPE_CHECKING:
@@ -206,14 +205,3 @@ def test_atomic_write_exclusive_preexisting(tmp_path: pathlib.Path) -> None:
206205
assert path.read_bytes() == b"xyz"
207206
assert list(path.parent.iterdir()) == [path] # no temp files
208207

209-
210-
class TestLocalStoreConcurrency(StoreConcurrencyTests[LocalStore, cpu.Buffer]):
211-
"""Test LocalStore concurrency limiting behavior."""
212-
213-
store_cls = LocalStore
214-
buffer_cls = cpu.Buffer
215-
expected_concurrency_limit = 100 # LocalStore default
216-
217-
@pytest.fixture
218-
def store_kwargs(self, tmpdir: str) -> dict[str, str]:
219-
return {"root": str(tmpdir)}

tests/test_store/test_memory.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from zarr.errors import ZarrUserWarning
1515
from zarr.storage import GpuMemoryStore, MemoryStore
1616
from zarr.testing.store import StoreTests
17-
from zarr.testing.store_concurrency import StoreConcurrencyTests
1817
from zarr.testing.utils import gpu_test
1918

2019
if TYPE_CHECKING:
@@ -182,15 +181,3 @@ def test_from_dict(self) -> None:
182181
result = GpuMemoryStore.from_dict(d)
183182
for v in result._store_dict.values():
184183
assert type(v) is gpu.Buffer
185-
186-
187-
class TestMemoryStoreConcurrency(StoreConcurrencyTests[MemoryStore, cpu.Buffer]):
188-
"""Test MemoryStore concurrency limiting behavior."""
189-
190-
store_cls = MemoryStore
191-
buffer_cls = cpu.Buffer
192-
expected_concurrency_limit = None # MemoryStore has no limit (fast in-memory ops)
193-
194-
@pytest.fixture
195-
def store_kwargs(self) -> dict[str, Any]:
196-
return {"store_dict": None}

0 commit comments

Comments
 (0)