Skip to content

Commit 5b0dfce

Browse files
committed
Optlevel now tunes chunksize/run_item_budget for sidecars in full indexes
1 parent ef30c59 commit 5b0dfce

3 files changed

Lines changed: 158 additions & 19 deletions

File tree

src/blosc2/ctable.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3820,7 +3820,7 @@ def _build_index_persistent(
38203820
if kind == "full":
38213821
with tempfile.TemporaryDirectory(prefix="blosc2-index-ooc-", dir=resolved_tmpdir) as td:
38223822
full = _build_full_descriptor_ooc(
3823-
proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj
3823+
proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj, optlevel
38243824
)
38253825
full["build_method"] = "global-sort"
38263826
if kind == "opsi":
@@ -3862,7 +3862,7 @@ def _build_index_persistent(
38623862
full = None
38633863
opsi = None
38643864
if kind == "full":
3865-
full = _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj)
3865+
full = _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj, optlevel)
38663866
full["build_method"] = "global-sort"
38673867
if kind == "opsi":
38683868
opsi = _build_opsi_descriptor(

src/blosc2/indexing.py

Lines changed: 109 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
_GATHER_MMAP_HANDLES: dict[str, object] = {}
7676
_HOT_CACHE_GLOBAL_SCOPE = ("global", 0)
7777

78-
FULL_OOC_RUN_ITEMS = 8_000_000
78+
FULL_OOC_RUN_ITEMS = 10_000_000
7979
FULL_OOC_MERGE_BUFFER_ITEMS = 500_000
8080
FULL_SELECTIVE_OOC_MAX_SPANS = 128
8181
FULL_RUN_BOUNDED_FALLBACK_RUNS = 8
@@ -1377,7 +1377,9 @@ def _rebuild_full_navigation_sidecars(
13771377
cparams: dict | None = None,
13781378
) -> None:
13791379
chunk_len, block_len = _sidecar_storage_geometry(
1380-
full.get("values_path"), int(array.chunks[0]), int(array.blocks[0])
1380+
full.get("values_path"),
1381+
int(full.get("sidecar_chunk_len", array.chunks[0])),
1382+
int(full.get("sidecar_block_len", array.blocks[0])),
13811383
)
13821384
l1 = _compute_sorted_boundaries(sorted_values, np.dtype(sorted_values.dtype), chunk_len)
13831385
l2 = _compute_sorted_boundaries(sorted_values, np.dtype(sorted_values.dtype), block_len)
@@ -1462,6 +1464,13 @@ def _stream_copy_sidecar_array(
14621464
del source, dest
14631465

14641466

1467+
def _full_sidecar_geometry(array: blosc2.NDArray, optlevel: int) -> tuple[tuple[int], tuple[int], int]:
1468+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
1469+
block_len = int(array.blocks[0])
1470+
chunk_len = _opsi_storage_chunk_len(int(array.chunks[0]), block_len, chunk_multiplier)
1471+
return (chunk_len,), (block_len,), chunk_multiplier
1472+
1473+
14651474
def _stream_copy_temp_run_to_full_sidecars(
14661475
array: blosc2.NDArray,
14671476
token: str,
@@ -1472,37 +1481,40 @@ def _stream_copy_temp_run_to_full_sidecars(
14721481
persistent: bool,
14731482
tracker: TempRunTracker | None = None,
14741483
cparams: dict | None = None,
1484+
optlevel: int = 5,
14751485
) -> None:
14761486
if not persistent:
14771487
raise ValueError("temp-run streaming only supports persistent runs")
14781488

14791489
values_path = _sidecar_path(array, token, kind, "full.values")
14801490
positions_path = _sidecar_path(array, token, kind, "full.positions")
1491+
sidecar_chunks, sidecar_blocks, chunk_multiplier = _full_sidecar_geometry(array, optlevel)
14811492
_remove_sidecar_path(values_path)
14821493
_remove_sidecar_path(positions_path)
14831494
_stream_copy_sidecar_array(
14841495
run.values_path,
14851496
values_path,
14861497
run.length,
14871498
dtype,
1488-
(int(array.chunks[0]),),
1489-
(int(array.blocks[0]),),
1499+
sidecar_chunks,
1500+
sidecar_blocks,
14901501
cparams,
14911502
)
14921503
_stream_copy_sidecar_array(
14931504
run.positions_path,
14941505
positions_path,
14951506
run.length,
14961507
np.dtype(np.int64),
1497-
(int(array.chunks[0]),),
1498-
(int(array.blocks[0]),),
1508+
sidecar_chunks,
1509+
sidecar_blocks,
14991510
cparams,
15001511
)
15011512
_tracker_register_delete(tracker, run.values_path, run.positions_path)
15021513
run.values_path.unlink(missing_ok=True)
15031514
run.positions_path.unlink(missing_ok=True)
15041515
full["values_path"] = values_path
15051516
full["positions_path"] = positions_path
1517+
full["chunk_multiplier"] = chunk_multiplier
15061518
full["runs"] = []
15071519
full["next_run_id"] = 0
15081520
_rebuild_full_navigation_sidecars_from_path(
@@ -1529,17 +1541,40 @@ def _build_full_descriptor(
15291541
values: np.ndarray,
15301542
persistent: bool,
15311543
cparams: dict | None = None,
1544+
optlevel: int = 5,
15321545
) -> dict:
15331546
sorted_values, positions = _keysort_values_with_positions(values)
1547+
sidecar_chunks, sidecar_blocks, chunk_multiplier = _full_sidecar_geometry(array, optlevel)
15341548
values_sidecar = _store_array_sidecar(
1535-
array, token, kind, "full", "values", sorted_values, persistent, cparams=cparams
1549+
array,
1550+
token,
1551+
kind,
1552+
"full",
1553+
"values",
1554+
sorted_values,
1555+
persistent,
1556+
chunks=sidecar_chunks,
1557+
blocks=sidecar_blocks,
1558+
cparams=cparams,
15361559
)
15371560
positions_sidecar = _store_array_sidecar(
1538-
array, token, kind, "full", "positions", positions, persistent, cparams=cparams
1561+
array,
1562+
token,
1563+
kind,
1564+
"full",
1565+
"positions",
1566+
positions,
1567+
persistent,
1568+
chunks=sidecar_chunks,
1569+
blocks=sidecar_blocks,
1570+
cparams=cparams,
15391571
)
15401572
full = {
15411573
"values_path": values_sidecar["path"],
15421574
"positions_path": positions_sidecar["path"],
1575+
"chunk_multiplier": chunk_multiplier,
1576+
"sidecar_chunk_len": sidecar_chunks[0],
1577+
"sidecar_block_len": sidecar_blocks[0],
15431578
"runs": [],
15441579
"next_run_id": 0,
15451580
}
@@ -3564,6 +3599,10 @@ def _build_opsi_descriptor(
35643599
raise
35653600

35663601

3602+
def _full_ooc_run_item_budget_for_optlevel(optlevel: int) -> int:
3603+
return FULL_OOC_RUN_ITEMS * _index_chunk_multiplier_for_optlevel(optlevel)
3604+
3605+
35673606
def _build_full_descriptor_ooc(
35683607
array: blosc2.NDArray,
35693608
target: dict,
@@ -3573,34 +3612,62 @@ def _build_full_descriptor_ooc(
35733612
persistent: bool,
35743613
workdir: Path,
35753614
cparams: dict | None = None,
3615+
optlevel: int = 5,
35763616
) -> dict:
35773617
size = int(array.shape[0])
35783618
tracker = TempRunTracker()
35793619
if size == 0:
35803620
sorted_values = np.empty(0, dtype=dtype)
35813621
positions = np.empty(0, dtype=np.int64)
3622+
sidecar_chunks, sidecar_blocks, chunk_multiplier = _full_sidecar_geometry(array, optlevel)
35823623
values_sidecar = _store_array_sidecar(
3583-
array, token, kind, "full", "values", sorted_values, persistent, cparams=cparams
3624+
array,
3625+
token,
3626+
kind,
3627+
"full",
3628+
"values",
3629+
sorted_values,
3630+
persistent,
3631+
chunks=sidecar_chunks,
3632+
blocks=sidecar_blocks,
3633+
cparams=cparams,
35843634
)
35853635
positions_sidecar = _store_array_sidecar(
3586-
array, token, kind, "full", "positions", positions, persistent, cparams=cparams
3636+
array,
3637+
token,
3638+
kind,
3639+
"full",
3640+
"positions",
3641+
positions,
3642+
persistent,
3643+
chunks=sidecar_chunks,
3644+
blocks=sidecar_blocks,
3645+
cparams=cparams,
35873646
)
35883647
full = {
35893648
"values_path": values_sidecar["path"],
35903649
"positions_path": positions_sidecar["path"],
3650+
"chunk_multiplier": chunk_multiplier,
3651+
"sidecar_chunk_len": sidecar_chunks[0],
3652+
"sidecar_block_len": sidecar_blocks[0],
35913653
"runs": [],
35923654
"next_run_id": 0,
3655+
"ooc_run_items": 0,
3656+
"ooc_run_item_budget": 0,
3657+
"ooc_run_item_budget_source": "empty",
35933658
}
35943659
_rebuild_full_navigation_sidecars(array, token, kind, full, sorted_values, persistent, cparams)
35953660
return full
35963661
run_items_env = os.getenv("BLOSC2_FULL_OOC_RUN_ITEMS")
3662+
run_item_budget_source = "optlevel"
35973663
if run_items_env is not None:
35983664
try:
35993665
run_item_budget = max(1, int(run_items_env))
3666+
run_item_budget_source = "env"
36003667
except ValueError:
3601-
run_item_budget = FULL_OOC_RUN_ITEMS
3668+
run_item_budget = _full_ooc_run_item_budget_for_optlevel(optlevel)
36023669
else:
3603-
run_item_budget = FULL_OOC_RUN_ITEMS
3670+
run_item_budget = _full_ooc_run_item_budget_for_optlevel(optlevel)
36043671
run_items = max(int(array.chunks[0]), min(size, run_item_budget))
36053672
runs = []
36063673
for run_id, start in enumerate(range(0, size, run_items)):
@@ -3652,22 +3719,47 @@ def _build_full_descriptor_ooc(
36523719
"temp_backend": "blosc2",
36533720
"temp_peak_disk_bytes": tracker.peak_disk_bytes,
36543721
"temp_total_written_bytes": tracker.total_written_bytes,
3722+
"ooc_run_items": run_items,
3723+
"ooc_run_item_budget": run_item_budget,
3724+
"ooc_run_item_budget_source": run_item_budget_source,
36553725
}
36563726
if persistent:
36573727
_stream_copy_temp_run_to_full_sidecars(
3658-
array, token, kind, full, final_run, dtype, persistent, tracker, cparams
3728+
array, token, kind, full, final_run, dtype, persistent, tracker, cparams, optlevel
36593729
)
36603730
else:
36613731
sorted_values = blosc2.open(str(final_run.values_path), mode="r", mmap_mode=_INDEX_MMAP_MODE)[:]
36623732
positions = blosc2.open(str(final_run.positions_path), mode="r", mmap_mode=_INDEX_MMAP_MODE)[:]
3733+
sidecar_chunks, sidecar_blocks, chunk_multiplier = _full_sidecar_geometry(array, optlevel)
36633734
values_sidecar = _store_array_sidecar(
3664-
array, token, kind, "full", "values", sorted_values, persistent, cparams=cparams
3735+
array,
3736+
token,
3737+
kind,
3738+
"full",
3739+
"values",
3740+
sorted_values,
3741+
persistent,
3742+
chunks=sidecar_chunks,
3743+
blocks=sidecar_blocks,
3744+
cparams=cparams,
36653745
)
36663746
positions_sidecar = _store_array_sidecar(
3667-
array, token, kind, "full", "positions", positions, persistent, cparams=cparams
3747+
array,
3748+
token,
3749+
kind,
3750+
"full",
3751+
"positions",
3752+
positions,
3753+
persistent,
3754+
chunks=sidecar_chunks,
3755+
blocks=sidecar_blocks,
3756+
cparams=cparams,
36683757
)
36693758
full["values_path"] = values_sidecar["path"]
36703759
full["positions_path"] = positions_sidecar["path"]
3760+
full["chunk_multiplier"] = chunk_multiplier
3761+
full["sidecar_chunk_len"] = sidecar_chunks[0]
3762+
full["sidecar_block_len"] = sidecar_blocks[0]
36713763
_rebuild_full_navigation_sidecars(array, token, kind, full, sorted_values, persistent, cparams)
36723764
del sorted_values, positions
36733765
_tracker_register_delete(tracker, final_run.values_path, final_run.positions_path)
@@ -3782,7 +3874,7 @@ def create_index(
37823874
prefix="blosc2-index-ooc-", dir=_resolve_full_index_tmpdir(array, tmpdir)
37833875
) as tmpdir:
37843876
full = _build_full_descriptor_ooc(
3785-
array, target, token, kind, dtype, persistent, Path(tmpdir), cparams
3877+
array, target, token, kind, dtype, persistent, Path(tmpdir), cparams, optlevel
37863878
)
37873879
full["build_method"] = "global-sort"
37883880
if kind == "opsi":
@@ -3822,7 +3914,7 @@ def create_index(
38223914
full = None
38233915
opsi = None
38243916
if kind == "full":
3825-
full = _build_full_descriptor(array, token, kind, values, persistent, cparams)
3917+
full = _build_full_descriptor(array, token, kind, values, persistent, cparams, optlevel)
38263918
full["build_method"] = "global-sort"
38273919
if kind == "opsi":
38283920
opsi = _build_opsi_descriptor(

tests/ndarray/test_indexing.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,53 @@ def test_forced_ooc_full_index_merge_preserves_sorted_sidecars(monkeypatch, tmp_
12361236
np.testing.assert_array_equal(values_sidecar[:], data[positions_sidecar[:]])
12371237

12381238

1239+
@pytest.mark.parametrize(
1240+
("optlevel", "expected_budget"),
1241+
[
1242+
(1, 256),
1243+
(3, 256),
1244+
(4, 512),
1245+
(6, 512),
1246+
(7, 1024),
1247+
(9, 1024),
1248+
],
1249+
)
1250+
def test_full_ooc_run_items_follow_optlevel(monkeypatch, tmp_path, optlevel, expected_budget):
1251+
path = tmp_path / f"full_ooc_optlevel_{optlevel}.b2nd"
1252+
data = np.arange(4096, dtype=np.int64)
1253+
arr = blosc2.asarray(data, urlpath=path, mode="w", chunks=(256,), blocks=(64,))
1254+
indexing = __import__("blosc2.indexing", fromlist=["FULL_OOC_RUN_ITEMS"])
1255+
monkeypatch.setattr(indexing, "FULL_OOC_RUN_ITEMS", 512)
1256+
1257+
descriptor = arr.create_index(kind=blosc2.IndexKind.FULL, optlevel=optlevel)
1258+
full = descriptor["full"]
1259+
1260+
assert full["ooc_run_item_budget"] == expected_budget
1261+
assert full["ooc_run_items"] == expected_budget
1262+
assert full["ooc_run_item_budget_source"] == "optlevel"
1263+
assert full["chunk_multiplier"] == expected_budget // 256
1264+
assert full["sidecar_chunk_len"] == expected_budget
1265+
assert full["sidecar_block_len"] == arr.blocks[0]
1266+
values_sidecar = blosc2.open(full["values_path"], mode="r")
1267+
assert values_sidecar.chunks[0] == expected_budget
1268+
assert values_sidecar.blocks[0] == arr.blocks[0]
1269+
1270+
1271+
@pytest.mark.parametrize("optlevel", [1, 5, 9])
1272+
def test_full_ooc_run_items_env_overrides_optlevel(monkeypatch, tmp_path, optlevel):
1273+
path = tmp_path / f"full_ooc_env_{optlevel}.b2nd"
1274+
data = np.arange(4096, dtype=np.int64)
1275+
arr = blosc2.asarray(data, urlpath=path, mode="w", chunks=(256,), blocks=(64,))
1276+
monkeypatch.setenv("BLOSC2_FULL_OOC_RUN_ITEMS", "768")
1277+
1278+
descriptor = arr.create_index(kind=blosc2.IndexKind.FULL, optlevel=optlevel)
1279+
full = descriptor["full"]
1280+
1281+
assert full["ooc_run_item_budget"] == 768
1282+
assert full["ooc_run_items"] == 768
1283+
assert full["ooc_run_item_budget_source"] == "env"
1284+
1285+
12391286
def test_create_index_full_ooc_defaults_tmpdir_to_array_directory(monkeypatch, tmp_path):
12401287
path = tmp_path / "default_tmpdir_full.b2nd"
12411288
data = np.arange(4096, dtype=np.int64)

0 commit comments

Comments
 (0)