|
13 | 13 | BytesBytesCodec, |
14 | 14 | Codec, |
15 | 15 | CodecPipeline, |
| 16 | + GetResult, |
16 | 17 | ) |
17 | 18 | from zarr.core.common import concurrent_map |
18 | 19 | from zarr.core.config import config |
@@ -251,47 +252,58 @@ async def read_batch( |
251 | 252 | batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], |
252 | 253 | out: NDBuffer, |
253 | 254 | drop_axes: tuple[int, ...] = (), |
254 | | - ) -> None: |
| 255 | + ) -> tuple[GetResult, ...]: |
| 256 | + results: list[GetResult] = [] |
255 | 257 | if self.supports_partial_decode: |
| 258 | + batch_info_list = list(batch_info) |
256 | 259 | chunk_array_batch = await self.decode_partial_batch( |
257 | 260 | [ |
258 | 261 | (byte_getter, chunk_selection, chunk_spec) |
259 | | - for byte_getter, chunk_spec, chunk_selection, *_ in batch_info |
| 262 | + for byte_getter, chunk_spec, chunk_selection, *_ in batch_info_list |
260 | 263 | ] |
261 | 264 | ) |
262 | 265 | for chunk_array, (_, chunk_spec, _, out_selection, _) in zip( |
263 | | - chunk_array_batch, batch_info, strict=False |
| 266 | + chunk_array_batch, batch_info_list, strict=False |
264 | 267 | ): |
265 | 268 | if chunk_array is not None: |
266 | 269 | if drop_axes: |
267 | 270 | chunk_array = chunk_array.squeeze(axis=drop_axes) |
268 | 271 | out[out_selection] = chunk_array |
| 272 | + results.append(GetResult(status="present")) |
269 | 273 | else: |
270 | 274 | out[out_selection] = fill_value_or_default(chunk_spec) |
| 275 | + results.append(GetResult(status="missing")) |
271 | 276 | else: |
| 277 | + batch_info_list = list(batch_info) |
272 | 278 | chunk_bytes_batch = await concurrent_map( |
273 | | - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], |
| 279 | + [ |
| 280 | + (byte_getter, array_spec.prototype) |
| 281 | + for byte_getter, array_spec, *_ in batch_info_list |
| 282 | + ], |
274 | 283 | lambda byte_getter, prototype: byte_getter.get(prototype), |
275 | 284 | config.get("async.concurrency"), |
276 | 285 | ) |
277 | 286 | chunk_array_batch = await self.decode_batch( |
278 | 287 | [ |
279 | 288 | (chunk_bytes, chunk_spec) |
280 | 289 | for chunk_bytes, (_, chunk_spec, *_) in zip( |
281 | | - chunk_bytes_batch, batch_info, strict=False |
| 290 | + chunk_bytes_batch, batch_info_list, strict=False |
282 | 291 | ) |
283 | 292 | ], |
284 | 293 | ) |
285 | 294 | for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip( |
286 | | - chunk_array_batch, batch_info, strict=False |
| 295 | + chunk_array_batch, batch_info_list, strict=False |
287 | 296 | ): |
288 | 297 | if chunk_array is not None: |
289 | 298 | tmp = chunk_array[chunk_selection] |
290 | 299 | if drop_axes: |
291 | 300 | tmp = tmp.squeeze(axis=drop_axes) |
292 | 301 | out[out_selection] = tmp |
| 302 | + results.append(GetResult(status="present")) |
293 | 303 | else: |
294 | 304 | out[out_selection] = fill_value_or_default(chunk_spec) |
| 305 | + results.append(GetResult(status="missing")) |
| 306 | + return tuple(results) |
295 | 307 |
|
296 | 308 | def _merge_chunk_array( |
297 | 309 | self, |
@@ -471,15 +483,19 @@ async def read( |
471 | 483 | batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], |
472 | 484 | out: NDBuffer, |
473 | 485 | drop_axes: tuple[int, ...] = (), |
474 | | - ) -> None: |
475 | | - await concurrent_map( |
| 486 | + ) -> tuple[GetResult, ...]: |
| 487 | + batch_results = await concurrent_map( |
476 | 488 | [ |
477 | 489 | (single_batch_info, out, drop_axes) |
478 | 490 | for single_batch_info in batched(batch_info, self.batch_size) |
479 | 491 | ], |
480 | 492 | self.read_batch, |
481 | 493 | config.get("async.concurrency"), |
482 | 494 | ) |
| 495 | + results: list[GetResult] = [] |
| 496 | + for batch in batch_results: |
| 497 | + results.extend(batch) |
| 498 | + return tuple(results) |
483 | 499 |
|
484 | 500 | async def write( |
485 | 501 | self, |
|
0 commit comments