Skip to content

Commit d50a537

Browse files
bendichterclaude
andcommitted
Add concurrent chunk fetching for external array links
Route remote external array links through zarr + LindiH5ZarrStore instead of h5py + LindiRemfile. This enables concurrent HTTP range requests via LindiH5ZarrStore.getitems(), which zarr calls when reading multiple chunks. The getitems() method separates serial metadata lookup (fast, uses h5py's B-tree cache) from parallel data fetches (N concurrent HTTP requests via ThreadPoolExecutor instead of N serial ones). Local external array links still use h5py directly since there's no concurrency benefit for local I/O. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f48e993 commit d50a537

3 files changed

Lines changed: 289 additions & 5 deletions

File tree

lindi/LindiH5ZarrStore/LindiH5ZarrStore.py

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import json
22
import base64
3+
import time
34
from typing import Tuple, Union, List, IO, Any, Dict, Callable
5+
from concurrent.futures import ThreadPoolExecutor, as_completed
46
import numpy as np
57
import zarr
68
from zarr.storage import Store, MemoryStore
9+
import requests
710
import h5py
811
from tqdm import tqdm
912
from ._util import (
@@ -150,7 +153,8 @@ def __init__(
150153
_opts: LindiH5ZarrStoreOpts,
151154
_url: Union[str, None] = None,
152155
_entities_to_close: List[Any],
153-
_local_cache: Union[LocalCache, None] = None
156+
_local_cache: Union[LocalCache, None] = None,
157+
_concurrent_max_workers: int = 8
154158
):
155159
"""
156160
Do not call the constructor directly. Instead, use the from_file class
@@ -161,6 +165,7 @@ def __init__(
161165
self._url = _url
162166
self._opts = _opts
163167
self._local_cache = _local_cache
168+
self._concurrent_max_workers = _concurrent_max_workers
164169
self._entities_to_close = _entities_to_close + [self._h5f]
165170

166171
# Some datasets do not correspond to traditional chunked datasets. For
@@ -325,6 +330,97 @@ def __contains__(self, key):
325330
return False
326331
return True
327332

333+
def getitems(self, keys, *, contexts=None):
334+
"""Fetch multiple keys, with concurrent HTTP fetches for remote chunks."""
335+
results = {}
336+
remote_chunks = [] # (key, byte_offset, byte_count)
337+
338+
for key in keys:
339+
parts = [p for p in key.split("/") if p]
340+
if not parts:
341+
continue
342+
key_name = parts[-1]
343+
344+
# Metadata keys — resolve synchronously
345+
if key_name in ('.zattrs', '.zgroup', '.zarray'):
346+
try:
347+
results[key] = self[key]
348+
except KeyError:
349+
pass
350+
continue
351+
352+
# Chunk keys — get byte range from h5py metadata
353+
key_parent = "/".join(parts[:-1])
354+
try:
355+
byte_offset, byte_count, inline_data = self._get_chunk_file_bytes_data(key_parent, key_name)
356+
except Exception:
357+
continue
358+
359+
if inline_data is not None:
360+
results[key] = inline_data
361+
continue
362+
363+
# Check local cache
364+
if self._local_cache is not None and self._url is not None:
365+
cached = self._local_cache.get_remote_chunk(url=self._url, offset=byte_offset, size=byte_count)
366+
if cached is not None:
367+
results[key] = cached
368+
continue
369+
370+
if self._url is not None and (self._url.startswith('http://') or self._url.startswith('https://')):
371+
remote_chunks.append((key, byte_offset, byte_count))
372+
else:
373+
# Local file — read synchronously (byte range already known)
374+
buf = _read_bytes(self._file, byte_offset, byte_count)
375+
self._try_cache_put(byte_offset, byte_count, buf)
376+
results[key] = buf
377+
378+
if not remote_chunks:
379+
return self._apply_padding_to_results(results)
380+
381+
# Pre-resolve URL for DANDI auth
382+
from ..LindiRemfile.LindiRemfile import _resolve_url
383+
resolved_url = _resolve_url(self._url)
384+
385+
# Single chunk — skip thread pool overhead
386+
if len(remote_chunks) == 1:
387+
key, offset, count = remote_chunks[0]
388+
val = _fetch_bytes_direct(resolved_url, offset, count)
389+
self._try_cache_put(offset, count, val)
390+
results[key] = val
391+
return self._apply_padding_to_results(results)
392+
393+
# Concurrent fetch
394+
max_workers = min(len(remote_chunks), self._concurrent_max_workers)
395+
with ThreadPoolExecutor(max_workers=max_workers) as pool:
396+
futures = {
397+
pool.submit(_fetch_bytes_direct, resolved_url, offset, count): (key, offset, count)
398+
for key, offset, count in remote_chunks
399+
}
400+
for future in as_completed(futures):
401+
key, offset, count = futures[future]
402+
val = future.result()
403+
self._try_cache_put(offset, count, val)
404+
results[key] = val
405+
406+
return self._apply_padding_to_results(results)
407+
408+
def _try_cache_put(self, byte_offset, byte_count, data):
409+
"""Write data to the local cache if available."""
410+
if self._local_cache is not None and self._url is not None:
411+
try:
412+
self._local_cache.put_remote_chunk(url=self._url, offset=byte_offset, size=byte_count, data=data)
413+
except ChunkTooLargeError:
414+
pass
415+
416+
def _apply_padding_to_results(self, results):
417+
for key in list(results.keys()):
418+
val = results[key]
419+
padded_size = _get_padded_size(self, key, val)
420+
if padded_size is not None:
421+
results[key] = _pad_chunk(val, padded_size)
422+
return results
423+
328424
def __delitem__(self, key):
329425
raise Exception("Deleting items is not allowed")
330426

@@ -889,3 +985,23 @@ def chunk_fname(self):
889985
@property
890986
def chunk_bytes(self):
891987
return self._chunk_bytes
988+
989+
990+
def _fetch_bytes_direct(resolved_url: str, offset: int, length: int) -> bytes:
991+
"""Fetch bytes from a resolved URL via HTTP range request. Thread-safe."""
992+
num_retries = 8
993+
for try_num in range(num_retries):
994+
try:
995+
range_header = f"bytes={offset}-{offset + length - 1}"
996+
headers = {
997+
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
998+
"Range": range_header
999+
}
1000+
response = requests.get(resolved_url, headers=headers)
1001+
response.raise_for_status()
1002+
return response.content
1003+
except Exception as e:
1004+
if try_num == num_retries - 1:
1005+
raise
1006+
time.sleep(0.1 * 2 ** try_num)
1007+
assert False, "unreachable" # loop always returns or raises

lindi/LindiH5pyFile/LindiH5pyDataset.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
if TYPE_CHECKING:
1414
from .LindiH5pyFile import LindiH5pyFile # pragma: no cover
15+
from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # pragma: no cover
1516

1617

1718
# This is a global list of external hdf5 clients, which are used by
@@ -20,6 +21,11 @@
2021
# TODO: figure out how to close these clients
2122
_external_hdf5_clients: Dict[str, h5py.File] = {}
2223

24+
# Cache of LindiH5ZarrStore instances for remote external array links,
25+
# keyed by URL. Similar to _external_hdf5_clients.
26+
# TODO: figure out how to close these stores (same issue as _external_hdf5_clients)
27+
_external_zarr_stores: Dict[str, "LindiH5ZarrStore"] = {}
28+
2329

2430
class LindiH5pyDataset(h5py.Dataset):
2531
def __init__(self, _zarr_array: zarr.Array, _file: "LindiH5pyFile"):
@@ -203,10 +209,17 @@ def _get_item_for_zarr(self, zarr_array: zarr.Array, selection: Any):
203209
url = external_array_link.get("url", None)
204210
name = external_array_link.get("name", None)
205211
if url is not None and name is not None:
206-
client = self._get_external_hdf5_client(url)
207-
dataset = client[name]
208-
assert isinstance(dataset, h5py.Dataset)
209-
return dataset[selection]
212+
is_remote = url.startswith("http://") or url.startswith("https://")
213+
if is_remote:
214+
# Use zarr + LindiH5ZarrStore for concurrent chunk fetching
215+
ext_zarr_array = self._get_external_zarr_array(url, name)
216+
return ext_zarr_array[selection]
217+
else:
218+
# Local files — use h5py directly (no concurrency benefit)
219+
client = self._get_external_hdf5_client(url)
220+
dataset = client[name]
221+
assert isinstance(dataset, h5py.Dataset)
222+
return dataset[selection]
210223
if self._compound_dtype is not None:
211224
# Compound dtype
212225
# In this case we index into the compound dtype using the name of the field
@@ -252,6 +265,21 @@ def _get_external_hdf5_client(self, url: str) -> h5py.File:
252265
_external_hdf5_clients[url] = h5py.File(ff, "r")
253266
return _external_hdf5_clients[url]
254267

268+
def _get_external_zarr_array(self, url: str, name: str) -> zarr.Array:
269+
"""Get a zarr array for concurrent reading of a remote external array link."""
270+
from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore
271+
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts
272+
273+
if url not in _external_zarr_stores:
274+
# Disable external array links (num_dataset_chunks_threshold=None)
275+
# so all chunks are served through the zarr store
276+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
277+
_external_zarr_stores[url] = LindiH5ZarrStore.from_file(
278+
url, opts=opts, local_cache=self._file._local_cache
279+
)
280+
store = _external_zarr_stores[url]
281+
return zarr.open_array(store=store, path=name, mode='r')
282+
255283
@property
256284
def ref(self):
257285
if self._readonly:
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import tempfile
2+
import numpy as np
3+
import h5py
4+
import zarr
5+
import lindi
6+
from lindi.LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore
7+
from lindi.LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts
8+
9+
10+
def test_getitems_local_chunks():
11+
"""Test getitems on LindiH5ZarrStore with a local chunked dataset."""
12+
with tempfile.TemporaryDirectory() as tmpdir:
13+
filename = f"{tmpdir}/test.h5"
14+
X = np.random.randn(100, 10)
15+
with h5py.File(filename, "w") as f:
16+
f.create_dataset("dataset1", data=X, chunks=(20, 10))
17+
18+
# Use num_dataset_chunks_threshold=None so chunks are served through store
19+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
20+
with LindiH5ZarrStore.from_file(filename, url=filename, opts=opts) as store:
21+
# Read via zarr to verify basic functionality
22+
arr = zarr.open_array(store=store, path="dataset1", mode="r")
23+
np.testing.assert_array_equal(arr[:], X)
24+
25+
# Test getitems with chunk keys
26+
keys = ["dataset1/0.0", "dataset1/1.0", "dataset1/2.0"]
27+
results = store.getitems(keys)
28+
assert len(results) == 3
29+
for key in keys:
30+
assert key in results
31+
32+
# Test getitems with metadata keys
33+
meta_keys = ["dataset1/.zarray", "dataset1/.zattrs"]
34+
meta_results = store.getitems(meta_keys)
35+
assert len(meta_results) == 2
36+
for key in meta_keys:
37+
assert key in meta_results
38+
39+
# Test getitems with non-existent keys (should be skipped)
40+
mixed_keys = ["dataset1/0.0", "nonexistent/0.0"]
41+
mixed_results = store.getitems(mixed_keys)
42+
assert "dataset1/0.0" in mixed_results
43+
assert "nonexistent/0.0" not in mixed_results
44+
45+
46+
def test_getitems_inline_data():
47+
"""Test getitems with a small dataset that is stored inline."""
48+
with tempfile.TemporaryDirectory() as tmpdir:
49+
filename = f"{tmpdir}/test.h5"
50+
X = np.array([1, 2, 3], dtype=np.float64)
51+
with h5py.File(filename, "w") as f:
52+
f.create_dataset("small", data=X)
53+
54+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
55+
with LindiH5ZarrStore.from_file(filename, url=filename, opts=opts) as store:
56+
# Small arrays should be inline
57+
keys = ["small/0"]
58+
results = store.getitems(keys)
59+
assert len(results) == 1
60+
61+
62+
def test_getitems_single_chunk_shortcut():
63+
"""Test that a single remote chunk skips the thread pool."""
64+
with tempfile.TemporaryDirectory() as tmpdir:
65+
filename = f"{tmpdir}/test.h5"
66+
X = np.random.randn(1000)
67+
with h5py.File(filename, "w") as f:
68+
f.create_dataset("data", data=X, chunks=(1000,))
69+
70+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
71+
with LindiH5ZarrStore.from_file(filename, url=filename, opts=opts) as store:
72+
keys = ["data/0"]
73+
results = store.getitems(keys)
74+
assert "data/0" in results
75+
76+
77+
def test_external_array_link_via_zarr_store():
78+
"""Test that external array links for local files still work correctly."""
79+
with tempfile.TemporaryDirectory() as tmpdir:
80+
filename = f"{tmpdir}/test.h5"
81+
X = np.random.randn(50, 12)
82+
with h5py.File(filename, "w") as f:
83+
f.create_dataset("dataset1", data=X, chunks=(10, 6))
84+
85+
# Create a LINDI reference with a low threshold so external array link is used
86+
with LindiH5ZarrStore.from_file(
87+
filename,
88+
url=filename,
89+
opts=LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=4),
90+
) as store:
91+
rfs = store.to_reference_file_system()
92+
93+
# Read back through LindiH5pyFile — local external links use h5py directly
94+
client = lindi.LindiH5pyFile.from_reference_file_system(rfs)
95+
X2 = client["dataset1"][:]
96+
np.testing.assert_array_equal(X, X2)
97+
98+
99+
def test_zarr_store_for_external_array():
100+
"""Test creating a LindiH5ZarrStore with num_dataset_chunks_threshold=None
101+
to serve all chunks (the pattern used for remote external array links)."""
102+
with tempfile.TemporaryDirectory() as tmpdir:
103+
filename = f"{tmpdir}/test.h5"
104+
X = np.random.randn(200, 10)
105+
with h5py.File(filename, "w") as f:
106+
f.create_dataset("dataset1", data=X, chunks=(20, 10))
107+
108+
# This is the same pattern used in _get_external_zarr_array
109+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
110+
with LindiH5ZarrStore.from_file(filename, opts=opts, url=filename) as store:
111+
arr = zarr.open_array(store=store, path="dataset1", mode="r")
112+
result = arr[:]
113+
np.testing.assert_array_equal(result, X)
114+
115+
# Test slicing
116+
result_slice = arr[10:30, 3:7]
117+
np.testing.assert_array_equal(result_slice, X[10:30, 3:7])
118+
119+
120+
def test_getitems_empty_keys():
121+
"""Test getitems with empty key list."""
122+
with tempfile.TemporaryDirectory() as tmpdir:
123+
filename = f"{tmpdir}/test.h5"
124+
with h5py.File(filename, "w") as f:
125+
f.create_dataset("data", data=np.array([1, 2, 3]))
126+
127+
opts = LindiH5ZarrStoreOpts(num_dataset_chunks_threshold=None)
128+
with LindiH5ZarrStore.from_file(filename, url=filename, opts=opts) as store:
129+
results = store.getitems([])
130+
assert results == {}
131+
132+
133+
if __name__ == "__main__":
134+
test_getitems_local_chunks()
135+
test_getitems_inline_data()
136+
test_getitems_single_chunk_shortcut()
137+
test_external_array_link_via_zarr_store()
138+
test_zarr_store_for_external_array()
139+
test_getitems_empty_keys()
140+
print("All tests passed!")

0 commit comments

Comments
 (0)