Skip to content

Commit ef30c59

Browse files
committed
Optlevel now tunes chunk sizes for sidecars for bucket and partial indexes too
1 parent fb5d342 commit ef30c59

2 files changed

Lines changed: 55 additions & 19 deletions

File tree

src/blosc2/indexing.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1599,7 +1599,8 @@ def _build_partial_descriptor(
15991599
persistent: bool,
16001600
cparams: dict | None = None,
16011601
) -> dict:
1602-
chunk_len = int(array.chunks[0])
1602+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
1603+
chunk_len = int(array.chunks[0]) * chunk_multiplier
16031604
nav_segment_len, nav_segment_divisor = _partial_nav_segment_len(
16041605
int(array.blocks[0]), chunk_len, optlevel
16051606
)
@@ -1626,6 +1627,7 @@ def _build_partial_descriptor(
16261627
)
16271628
partial["position_dtype"] = positions.dtype.str
16281629
partial["nav_segment_divisor"] = nav_segment_divisor
1630+
partial["chunk_multiplier"] = chunk_multiplier
16291631
return partial
16301632

16311633

@@ -2091,7 +2093,8 @@ def _build_partial_descriptor_ooc(
20912093
) -> dict:
20922094
if persistent:
20932095
size = int(array.shape[0])
2094-
chunk_len = int(array.chunks[0])
2096+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
2097+
chunk_len = int(array.chunks[0]) * chunk_multiplier
20952098
nav_segment_len, nav_segment_divisor = _partial_nav_segment_len(
20962099
int(array.blocks[0]), chunk_len, optlevel
20972100
)
@@ -2170,9 +2173,11 @@ def _build_partial_descriptor_ooc(
21702173
raise
21712174
partial["position_dtype"] = position_dtype.str
21722175
partial["nav_segment_divisor"] = nav_segment_divisor
2176+
partial["chunk_multiplier"] = chunk_multiplier
21732177
return partial
21742178

2175-
chunk_len = int(array.chunks[0])
2179+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
2180+
chunk_len = int(array.chunks[0]) * chunk_multiplier
21762181
nav_segment_len, nav_segment_divisor = _partial_nav_segment_len(
21772182
int(array.blocks[0]), chunk_len, optlevel
21782183
)
@@ -2198,6 +2203,7 @@ def _build_partial_descriptor_ooc(
21982203
)
21992204
partial["position_dtype"] = positions.dtype.str
22002205
partial["nav_segment_divisor"] = nav_segment_divisor
2206+
partial["chunk_multiplier"] = chunk_multiplier
22012207
return partial
22022208

22032209

@@ -2436,7 +2442,8 @@ def _build_bucket_descriptor(
24362442
persistent: bool,
24372443
cparams: dict | None = None,
24382444
) -> dict:
2439-
chunk_len = int(array.chunks[0])
2445+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
2446+
chunk_len = int(array.chunks[0]) * chunk_multiplier
24402447
nav_segment_len = int(array.blocks[0])
24412448
bucket_len = max(1, math.ceil(nav_segment_len / 64))
24422449
bucket_count = math.ceil(chunk_len / bucket_len)
@@ -2468,6 +2475,7 @@ def _build_bucket_descriptor(
24682475
)
24692476
bucket["bucket_count"] = bucket_count
24702477
bucket["bucket_len"] = bucket_len
2478+
bucket["chunk_multiplier"] = chunk_multiplier
24712479
bucket["value_lossy_bits"] = value_lossy_bits
24722480
bucket["bucket_dtype"] = bucket_positions.dtype.str
24732481
return bucket
@@ -2483,7 +2491,8 @@ def _build_bucket_descriptor_ooc(
24832491
persistent: bool,
24842492
cparams: dict | None = None,
24852493
) -> dict:
2486-
chunk_len = int(array.chunks[0])
2494+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
2495+
chunk_len = int(array.chunks[0]) * chunk_multiplier
24872496
nav_segment_len = int(array.blocks[0])
24882497
bucket_len = max(1, math.ceil(nav_segment_len / 64))
24892498
bucket_count = math.ceil(chunk_len / bucket_len)
@@ -2579,6 +2588,7 @@ def _build_bucket_descriptor_ooc(
25792588
raise
25802589
bucket["bucket_count"] = bucket_count
25812590
bucket["bucket_len"] = bucket_len
2591+
bucket["chunk_multiplier"] = chunk_multiplier
25822592
bucket["value_lossy_bits"] = value_lossy_bits
25832593
bucket["bucket_dtype"] = bucket_dtype.str
25842594
return bucket
@@ -2614,6 +2624,7 @@ def _build_bucket_descriptor_ooc(
26142624
)
26152625
bucket["bucket_count"] = bucket_count
26162626
bucket["bucket_len"] = bucket_len
2627+
bucket["chunk_multiplier"] = chunk_multiplier
26172628
bucket["value_lossy_bits"] = value_lossy_bits
26182629
bucket["bucket_dtype"] = bucket_positions.dtype.str
26192630
return bucket
@@ -3196,8 +3207,8 @@ def _opsi_write_block_boundaries(
31963207
_write_ndarray_linear_span(stage.maxs, first_block, maxs)
31973208

31983209

3199-
def _opsi_chunk_multiplier_for_optlevel(optlevel: int) -> int:
3200-
"""Return the OPSI logical/sidecar chunk multiplier for *optlevel*."""
3210+
def _index_chunk_multiplier_for_optlevel(optlevel: int) -> int:
3211+
"""Return the chunk-local index chunk multiplier for *optlevel*."""
32013212
optlevel = int(optlevel)
32023213
if optlevel <= 3:
32033214
return 1
@@ -3383,7 +3394,7 @@ def _build_opsi_descriptor(
33833394
optlevel: int = 5,
33843395
) -> dict:
33853396
max_cycles = max(0, int(max_cycles))
3386-
chunk_multiplier = _opsi_chunk_multiplier_for_optlevel(optlevel)
3397+
chunk_multiplier = _index_chunk_multiplier_for_optlevel(optlevel)
33873398
size = int(array.shape[0])
33883399
if size == 0:
33893400
values_sidecar = _store_array_sidecar(
@@ -5687,7 +5698,6 @@ def _process_bucket_chunk_batch(
56875698
where_x,
56885699
plan: IndexPlan,
56895700
total_len: int,
5690-
chunk_len: int,
56915701
return_positions: bool = False,
56925702
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
56935703
value_parts = []
@@ -5704,9 +5714,7 @@ def _process_bucket_chunk_batch(
57045714
continue
57055715
if _supports_block_reads(local_where_x):
57065716
span = np.empty(stop - start, dtype=local_where_x.dtype)
5707-
base_chunk_id = start // chunk_len
5708-
local_start = start - base_chunk_id * chunk_len
5709-
local_where_x.get_1d_span_numpy(span, base_chunk_id, local_start, stop - start)
5717+
_read_ndarray_linear_span(local_where_x, start, span)
57105718
else:
57115719
span = local_where_x[start:stop]
57125720
match = _bucket_match_from_span(span, plan)
@@ -6429,17 +6437,14 @@ def evaluate_bucket_query(
64296437
raise ValueError("bucket evaluation requires bucket masks and chunk geometry")
64306438

64316439
total_len = int(plan.base.shape[0])
6432-
chunk_len = int(plan.base.chunks[0])
64336440
where_x = where["_where_x"]
64346441
candidate_chunk_ids = np.flatnonzero(np.any(plan.bucket_masks, axis=1)).astype(np.intp, copy=False)
64356442
result_dtype = _where_output_dtype(where["_where_x"])
64366443

64376444
thread_count = _downstream_query_thread_count(len(candidate_chunk_ids), plan)
64386445
if thread_count <= 1:
64396446
parts = [
6440-
_process_bucket_chunk_batch(
6441-
candidate_chunk_ids, where_x, plan, total_len, chunk_len, return_positions
6442-
)
6447+
_process_bucket_chunk_batch(candidate_chunk_ids, where_x, plan, total_len, return_positions)
64436448
]
64446449
else:
64456450
batches = _chunk_batches(candidate_chunk_ids, thread_count)
@@ -6451,7 +6456,6 @@ def evaluate_bucket_query(
64516456
[where_x] * len(batches),
64526457
[plan] * len(batches),
64536458
[total_len] * len(batches),
6454-
[chunk_len] * len(batches),
64556459
[return_positions] * len(batches),
64566460
)
64576461
)

tests/ndarray/test_indexing.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,35 @@ def test_opsi_optlevel_controls_chunk_multiplier(optlevel, expected_multiplier):
9393
np.testing.assert_array_equal(np.sort(indexed), np.sort(scanned))
9494

9595

96+
@pytest.mark.parametrize("kind", ["bucket", "partial"])
97+
@pytest.mark.parametrize(
98+
("optlevel", "expected_multiplier"),
99+
[
100+
(1, 1),
101+
(3, 1),
102+
(4, 2),
103+
(6, 2),
104+
(7, 4),
105+
(9, 4),
106+
],
107+
)
108+
def test_chunk_local_indexes_optlevel_controls_chunk_multiplier(kind, optlevel, expected_multiplier):
109+
rng = np.random.default_rng(44)
110+
data = rng.integers(0, 100_000, size=20_000, dtype=np.int64)
111+
arr = blosc2.asarray(data, chunks=(1_000,), blocks=(200,))
112+
descriptor = arr.create_index(kind=_public_kind(kind), optlevel=optlevel)
113+
114+
meta = descriptor[kind]
115+
assert meta["chunk_multiplier"] == expected_multiplier
116+
assert meta["chunk_len"] == arr.chunks[0] * expected_multiplier
117+
assert meta["nav_segment_len"] >= 1
118+
119+
expr = ((arr >= 10_000) & (arr < 20_000)).where(arr)
120+
indexed = expr.compute()[:]
121+
scanned = expr.compute(_use_index=False)[:]
122+
np.testing.assert_array_equal(np.sort(indexed), np.sort(scanned))
123+
124+
96125
@pytest.mark.parametrize("kind", ["summary", "bucket", "partial", "full", "opsi"])
97126
def test_structured_field_index_matches_scan(kind):
98127
dtype = np.dtype([("id", np.int64), ("payload", np.float64)])
@@ -525,9 +554,12 @@ def test_chunk_local_index_descriptor_and_lookup_path(tmp_path, kind):
525554
meta = descriptor["bucket"] if kind == "bucket" else descriptor["partial"]
526555

527556
assert meta["layout"] == "chunk-local-v1"
528-
assert meta["chunk_len"] == arr.chunks[0]
557+
expected_chunk_multiplier = 2
558+
expected_chunk_len = arr.chunks[0] * expected_chunk_multiplier
559+
assert meta["chunk_multiplier"] == expected_chunk_multiplier
560+
assert meta["chunk_len"] == expected_chunk_len
529561
expected_nav_len = (
530-
arr.blocks[0] if kind == "bucket" else max(arr.blocks[0] // 4, math.ceil(arr.chunks[0] / 2048))
562+
arr.blocks[0] if kind == "bucket" else max(arr.blocks[0] // 4, math.ceil(expected_chunk_len / 2048))
531563
)
532564
assert meta["nav_segment_len"] == expected_nav_len
533565
assert meta["l1_path"] is not None

0 commit comments

Comments
 (0)