|
13 | 13 | BytesBytesCodec, |
14 | 14 | Codec, |
15 | 15 | CodecPipeline, |
| 16 | + GetResult, |
16 | 17 | ) |
17 | 18 | from zarr.core.common import concurrent_map |
18 | 19 | from zarr.core.config import config |
@@ -248,47 +249,58 @@ async def read_batch( |
248 | 249 | batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], |
249 | 250 | out: NDBuffer, |
250 | 251 | drop_axes: tuple[int, ...] = (), |
251 | | - ) -> None: |
| 252 | + ) -> tuple[GetResult, ...]: |
| 253 | + results: list[GetResult] = [] |
252 | 254 | if self.supports_partial_decode: |
| 255 | + batch_info_list = list(batch_info) |
253 | 256 | chunk_array_batch = await self.decode_partial_batch( |
254 | 257 | [ |
255 | 258 | (byte_getter, chunk_selection, chunk_spec) |
256 | | - for byte_getter, chunk_spec, chunk_selection, *_ in batch_info |
| 259 | + for byte_getter, chunk_spec, chunk_selection, *_ in batch_info_list |
257 | 260 | ] |
258 | 261 | ) |
259 | 262 | for chunk_array, (_, chunk_spec, _, out_selection, _) in zip( |
260 | | - chunk_array_batch, batch_info, strict=False |
| 263 | + chunk_array_batch, batch_info_list, strict=False |
261 | 264 | ): |
262 | 265 | if chunk_array is not None: |
263 | 266 | if drop_axes: |
264 | 267 | chunk_array = chunk_array.squeeze(axis=drop_axes) |
265 | 268 | out[out_selection] = chunk_array |
| 269 | + results.append(GetResult(status="present")) |
266 | 270 | else: |
267 | 271 | out[out_selection] = fill_value_or_default(chunk_spec) |
| 272 | + results.append(GetResult(status="missing")) |
268 | 273 | else: |
| 274 | + batch_info_list = list(batch_info) |
269 | 275 | chunk_bytes_batch = await concurrent_map( |
270 | | - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], |
| 276 | + [ |
| 277 | + (byte_getter, array_spec.prototype) |
| 278 | + for byte_getter, array_spec, *_ in batch_info_list |
| 279 | + ], |
271 | 280 | lambda byte_getter, prototype: byte_getter.get(prototype), |
272 | 281 | config.get("async.concurrency"), |
273 | 282 | ) |
274 | 283 | chunk_array_batch = await self.decode_batch( |
275 | 284 | [ |
276 | 285 | (chunk_bytes, chunk_spec) |
277 | 286 | for chunk_bytes, (_, chunk_spec, *_) in zip( |
278 | | - chunk_bytes_batch, batch_info, strict=False |
| 287 | + chunk_bytes_batch, batch_info_list, strict=False |
279 | 288 | ) |
280 | 289 | ], |
281 | 290 | ) |
282 | 291 | for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip( |
283 | | - chunk_array_batch, batch_info, strict=False |
| 292 | + chunk_array_batch, batch_info_list, strict=False |
284 | 293 | ): |
285 | 294 | if chunk_array is not None: |
286 | 295 | tmp = chunk_array[chunk_selection] |
287 | 296 | if drop_axes: |
288 | 297 | tmp = tmp.squeeze(axis=drop_axes) |
289 | 298 | out[out_selection] = tmp |
| 299 | + results.append(GetResult(status="present")) |
290 | 300 | else: |
291 | 301 | out[out_selection] = fill_value_or_default(chunk_spec) |
| 302 | + results.append(GetResult(status="missing")) |
| 303 | + return tuple(results) |
292 | 304 |
|
293 | 305 | def _merge_chunk_array( |
294 | 306 | self, |
@@ -468,15 +480,19 @@ async def read( |
468 | 480 | batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], |
469 | 481 | out: NDBuffer, |
470 | 482 | drop_axes: tuple[int, ...] = (), |
471 | | - ) -> None: |
472 | | - await concurrent_map( |
| 483 | + ) -> tuple[GetResult, ...]: |
| 484 | + batch_results = await concurrent_map( |
473 | 485 | [ |
474 | 486 | (single_batch_info, out, drop_axes) |
475 | 487 | for single_batch_info in batched(batch_info, self.batch_size) |
476 | 488 | ], |
477 | 489 | self.read_batch, |
478 | 490 | config.get("async.concurrency"), |
479 | 491 | ) |
| 492 | + results: list[GetResult] = [] |
| 493 | + for batch in batch_results: |
| 494 | + results.extend(batch) |
| 495 | + return tuple(results) |
480 | 496 |
|
481 | 497 | async def write( |
482 | 498 | self, |
|
0 commit comments