-
-
Notifications
You must be signed in to change notification settings - Fork 423
Expand file tree
/
Copy pathsync.py
More file actions
269 lines (215 loc) · 9.49 KB
/
Copy pathsync.py
File metadata and controls
269 lines (215 loc) · 9.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
from __future__ import annotations
import asyncio
import atexit
import logging
import os
import threading
from concurrent.futures import ThreadPoolExecutor, wait
from textwrap import dedent
from typing import TYPE_CHECKING, Any
from typing_extensions import ParamSpec
from zarr._constants import IS_WASM
from zarr.core.config import config
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
from typing import Any
logger = logging.getLogger(__name__)
P = ParamSpec("P")
# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py
iothread: list[threading.Thread | None] = [None] # dedicated IO thread
loop: list[asyncio.AbstractEventLoop | None] = [
None
] # global event loop for any non-async instance
_lock: threading.Lock | None = None # global lock placeholder
_executor: ThreadPoolExecutor | None = None # global executor placeholder
class SyncError(Exception):
pass
def _get_lock() -> threading.Lock:
"""Allocate or return a threading lock.
The lock is allocated on first use to allow setting one lock per forked process.
"""
global _lock
if not _lock:
_lock = threading.Lock()
return _lock
def _get_executor() -> ThreadPoolExecutor:
"""Return Zarr Thread Pool Executor
The executor is allocated on first use.
"""
global _executor
if not _executor:
max_workers = config.get("threading.max_workers", None)
logger.debug("Creating Zarr ThreadPoolExecutor with max_workers=%s", max_workers)
_executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="zarr_pool")
_get_loop().set_default_executor(_executor)
return _executor
def cleanup_resources() -> None:
global _executor
if _executor:
_executor.shutdown(wait=True, cancel_futures=True)
_executor = None
if loop[0] is not None:
with _get_lock():
# Stop the event loop safely
loop[0].call_soon_threadsafe(loop[0].stop) # Stop loop from another thread
if iothread[0] is not None:
iothread[0].join(timeout=0.2) # Add a timeout to avoid hanging
if iothread[0].is_alive():
logger.warning(
"Thread did not finish cleanly; forcefully closing the event loop."
)
# Forcefully close the event loop to release resources
loop[0].close()
# dereference the loop and iothread
loop[0] = None
iothread[0] = None
atexit.register(cleanup_resources)
def reset_resources_after_fork() -> None:
"""
Ensure that global resources are reset after a fork. Without this function,
forked processes will retain invalid references to the parent process's resources.
"""
global loop, iothread, _executor
# These lines are excluded from coverage because this function only runs in a child process,
# which is not observed by the test coverage instrumentation. Despite the apparent lack of
# test coverage, this function should be adequately tested by any test that uses Zarr IO with
# multiprocessing.
loop[0] = None # pragma: no cover
iothread[0] = None # pragma: no cover
_executor = None # pragma: no cover
# this is only available on certain operating systems
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=reset_resources_after_fork)
async def _runner[T](coro: Coroutine[Any, Any, T]) -> T | BaseException:
"""
Await a coroutine and return the result of running it. If awaiting the coroutine raises an
exception, the exception will be returned.
"""
try:
return await coro
except Exception as ex:
return ex
def sync[T](
coro: Coroutine[Any, Any, T],
loop: asyncio.AbstractEventLoop | None = None,
timeout: float | None = None,
) -> T:
"""
Make loop run coroutine until it returns. Runs in other thread
"""
# WASM environments (like Pyodide) cannot start new threads, so we need to handle
# coroutines differently. We integrate with the existing Pyodide WebLoop which
# schedules tasks on the browser's event loop using setTimeout():
# https://developer.mozilla.org/en-US/docs/Web/API/setTimeout
# https://pyodide.org/en/stable/usage/api/python-api/webloop.html
if IS_WASM: # pragma: no cover
# This code path is covered in the Pyodide/WASM CI job.
current_loop = asyncio.get_running_loop()
result = current_loop.run_until_complete(coro)
# Check if run_until_complete actually executed the coroutine or just returned a task
# In browsers without JSPI, run_until_complete is a no-op that will return the task/future.
if isinstance(result, (asyncio.Task, asyncio.Future)):
raise RuntimeError(
dedent("""
Cannot use synchronous Zarr API in browser-based environments without JSPI.
Zarr requires JavaScript Promise Integration (JSPI) to block for asynchronous
operations, which is not supported or enabled in your current environment.
The available solutions are to either use Zarr's async API instead via
`zarr.api.asynchronous`, or run your application in a JSPI-enabled environment.
For detailed requirements and environment setup instructions, please see:
- The [Pyodide JSPI Blog Post](https://blog.pyodide.org/posts/jspi/)
- The [Pyodide 0.28 Release Notes](https://blog.pyodide.org/posts/0.28-release/)
Note: If you are using Node.js v20 to v24, you must pass the `--experimental-wasm-jspi`
flag (JSPI is unflagged and enabled by default starting in Node.js v25 and later).
""")
)
return result
# This code path is the original thread-based implementation
# for non-WASM environments; it creates a dedicated I/O thread
# with its own event loop.
if loop is None:
# NB: if the loop is not running *yet*, it is OK to submit work
# and we will wait for it
loop = _get_loop()
if _executor is None and config.get("threading.max_workers", None) is not None:
# trigger executor creation and attach to loop
_ = _get_executor()
if not isinstance(loop, asyncio.AbstractEventLoop):
raise TypeError(f"loop cannot be of type {type(loop)}")
if loop.is_closed():
raise RuntimeError("Loop is not running")
try:
loop0 = asyncio.events.get_running_loop()
if loop0 is loop:
raise SyncError("Calling sync() from within a running loop")
except RuntimeError:
pass
future = asyncio.run_coroutine_threadsafe(_runner(coro), loop)
finished, unfinished = wait([future], return_when=asyncio.ALL_COMPLETED, timeout=timeout)
if len(unfinished) > 0:
raise TimeoutError(f"Coroutine {coro} failed to finish within {timeout} s")
assert len(finished) == 1
return_result = next(iter(finished)).result()
if isinstance(return_result, BaseException):
raise return_result
else:
return return_result
def _get_loop() -> asyncio.AbstractEventLoop:
"""Create or return the default fsspec IO loop
The loop will be running on a separate thread.
"""
if IS_WASM: # pragma: no cover
# This case is covered in the Pyodide/WASM CI job.
raise RuntimeError(
"Thread-based event loop not available in WASM environment. "
"Use zarr.api.asynchronous or ensure sync() handles WASM case."
)
if loop[0] is None:
with _get_lock():
# repeat the check just in case the loop got filled between the
# previous two calls from another thread
if loop[0] is None:
logger.debug("Creating Zarr event loop")
new_loop = asyncio.new_event_loop()
loop[0] = new_loop
iothread[0] = threading.Thread(target=new_loop.run_forever, name="zarr_io")
assert iothread[0] is not None
iothread[0].daemon = True
iothread[0].start()
assert loop[0] is not None
return loop[0]
async def _collect_aiterator[T](data: AsyncIterator[T]) -> tuple[T, ...]:
"""
Collect an entire async iterator into a tuple
"""
result = [x async for x in data]
return tuple(result)
def collect_aiterator[T](data: AsyncIterator[T]) -> tuple[T, ...]:
"""
Synchronously collect an entire async iterator into a tuple.
"""
return sync(_collect_aiterator(data))
class SyncMixin:
def _sync[T](self, coroutine: Coroutine[Any, Any, T]) -> T:
# TODO: refactor this to take *args and **kwargs and pass those to the method
# this should allow us to better type the sync wrapper
return sync(
coroutine,
timeout=config.get("async.timeout"),
)
def _sync_iter[T](self, async_iterator: AsyncIterator[T]) -> list[T]:
async def iter_to_list() -> list[T]:
return [item async for item in async_iterator]
return self._sync(iter_to_list())
async def _with_semaphore[T](
func: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore | None = None
) -> T:
"""
Await the result of invoking the no-argument-callable ``func`` within the context manager
provided by a Semaphore, if one is provided. Otherwise, just await the result of invoking
``func``.
"""
if semaphore is None:
return await func()
async with semaphore:
return await func()