Skip to content

Commit f205a1e

Browse files
committed
Index on b2z
1 parent fdb4f2a commit f205a1e

6 files changed

Lines changed: 355 additions & 27 deletions

File tree

examples/ctable/index_on_b2z.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""Demonstrate that CTable indexes survive a .b2z round-trip.
2+
3+
Steps
4+
-----
5+
1. Build a small CTable with synthetic sensor data and save it as .b2z.
6+
2. Measure query speed with full scan (no index).
7+
3. Reopen in append mode, create FULL indexes, close (triggers rezip).
8+
4. Reopen read-only — indexes are present and queries are faster.
9+
"""
10+
11+
import os
12+
import shutil
13+
import time
14+
from dataclasses import dataclass
15+
16+
import numpy as np
17+
18+
import blosc2
19+
20+
# ---------------------------------------------------------------------------
21+
# 1. Schema and synthetic data
22+
# ---------------------------------------------------------------------------
23+
24+
25+
@dataclass
26+
class Reading:
27+
sensor_id: int = blosc2.field(blosc2.int32())
28+
timestamp: int = blosc2.field(blosc2.int64())
29+
value: float = blosc2.field(blosc2.float64())
30+
active: bool = blosc2.field(blosc2.bool())
31+
32+
33+
N = 5_000_000
34+
rng = np.random.default_rng(42)
35+
B2D = "/tmp/sensors.b2d"
36+
B2Z = "/tmp/sensors.b2z"
37+
38+
for p in (B2D, B2Z):
39+
if os.path.exists(p):
40+
(shutil.rmtree if os.path.isdir(p) else os.remove)(p)
41+
42+
# ---------------------------------------------------------------------------
43+
# 2. Create and zip
44+
# ---------------------------------------------------------------------------
45+
46+
print(f"Creating CTable with {N:,} rows ...")
47+
ct = blosc2.CTable(Reading, urlpath=B2D, mode="w", expected_size=N)
48+
ct.extend(
49+
{
50+
"sensor_id": rng.integers(0, 100, N, dtype=np.int32),
51+
"timestamp": np.arange(N, dtype=np.int64),
52+
"value": rng.uniform(-50.0, 150.0, N),
53+
"active": rng.integers(0, 2, N).astype(bool),
54+
}
55+
)
56+
ct.close()
57+
58+
store = blosc2.TreeStore(B2D, mode="r")
59+
store.to_b2z(filename=B2Z, overwrite=True)
60+
store.discard()
61+
shutil.rmtree(B2D)
62+
print(f" saved → {B2Z} ({os.path.getsize(B2Z) / 1e6:.1f} MB)")
63+
64+
# ---------------------------------------------------------------------------
65+
# 3. Baseline: full scan (no index)
66+
# ---------------------------------------------------------------------------
67+
68+
QUERIES = [
69+
("value > 100.0", lambda ct: ct.where(ct["value"] > 100.0)),
70+
("value > 120.0", lambda ct: ct.where(ct["value"] > 120.0)),
71+
("value between 0 and 10", lambda ct: ct.where((ct["value"] >= 0.0) & (ct["value"] <= 10.0))),
72+
("timestamp > 450_000", lambda ct: ct.where(ct["timestamp"] > 4_500_000)),
73+
("timestamp > 4_999_000", lambda ct: ct.where(ct["timestamp"] > 4_999_000)),
74+
]
75+
76+
77+
def bench(fn, reps=5):
78+
times = [0.0] * reps
79+
for i in range(reps):
80+
t = time.perf_counter()
81+
result = fn()
82+
times[i] = (time.perf_counter() - t) * 1000
83+
return min(times), result
84+
85+
86+
print("\n--- Full scan (no index) ---")
87+
baseline = {}
88+
ct = blosc2.CTable.open(B2Z, mode="r")
89+
assert not ct.indexes, "expected no indexes yet"
90+
for label, fn in QUERIES:
91+
ms, view = bench(lambda fn=fn: fn(ct))
92+
baseline[label] = (ms, len(view))
93+
print(f" {label:<38} {ms:7.1f} ms {len(view):>8,} rows")
94+
ct.close()
95+
96+
# ---------------------------------------------------------------------------
97+
# 4. Build indexes (append mode → rezip on close)
98+
# ---------------------------------------------------------------------------
99+
100+
print("\nBuilding indexes (mode='a') ...")
101+
ct = blosc2.CTable.open(B2Z, mode="a")
102+
103+
t0 = time.perf_counter()
104+
ct.create_index("value", kind=blosc2.IndexKind.FULL)
105+
print(f" value FULL index {(time.perf_counter() - t0) * 1000:.0f} ms")
106+
107+
t0 = time.perf_counter()
108+
ct.create_index("timestamp", kind=blosc2.IndexKind.FULL)
109+
print(f" timestamp FULL index {(time.perf_counter() - t0) * 1000:.0f} ms")
110+
111+
t0 = time.perf_counter()
112+
ct.close()
113+
print(
114+
f" closed + rezipped {(time.perf_counter() - t0) * 1000:.0f} ms "
115+
f"({os.path.getsize(B2Z) / 1e6:.1f} MB)"
116+
)
117+
118+
# ---------------------------------------------------------------------------
119+
# 5. Read-only: verify indexes survived, benchmark
120+
# ---------------------------------------------------------------------------
121+
122+
print("\nReopening .b2z read-only ...")
123+
ct = blosc2.CTable.open(B2Z, mode="r")
124+
found = [idx.col_name for idx in ct.indexes]
125+
print(f" indexes present: {found}")
126+
assert "value" in found, "index for 'value' missing after round-trip!"
127+
assert "timestamp" in found, "index for 'timestamp' missing after round-trip!"
128+
129+
print()
130+
print(f"{'query':<38} {'no index':>9} {'indexed':>9} {'speedup':>8} {'rows':>8}")
131+
print("-" * 78)
132+
133+
for label, fn in QUERIES:
134+
i_ms, view = bench(lambda fn=fn: fn(ct))
135+
b_ms, b_n = baseline[label]
136+
sp = b_ms / i_ms if i_ms > 0 else float("inf")
137+
assert len(view) == b_n, f"row count mismatch for {label!r}"
138+
print(f" {label:<38} {b_ms:8.1f}ms {i_ms:8.1f}ms {sp:7.1f}x {len(view):>8,}")
139+
140+
ct.close()
141+
142+
# ---------------------------------------------------------------------------
143+
# 6. Cleanup
144+
# ---------------------------------------------------------------------------
145+
146+
os.remove(B2Z)
147+
print("\nDone.")

src/blosc2/ctable.py

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,12 +1457,18 @@ def _init_columns(
14571457
dparams=col_storage.get("dparams"),
14581458
)
14591459
continue
1460+
# Recompute chunks/blocks using the actual dtype so that wide
1461+
# string columns (e.g. U183642) don't produce multi-GB chunks.
1462+
chunks = col_storage["chunks"]
1463+
blocks = col_storage["blocks"]
1464+
if col.config.chunks is None and col.config.blocks is None:
1465+
chunks, blocks = compute_chunks_blocks((expected_size,), dtype=col.dtype)
14601466
self._cols[col.name] = storage.create_column(
14611467
col.name,
14621468
dtype=col.dtype,
14631469
shape=(expected_size,),
1464-
chunks=col_storage["chunks"],
1465-
blocks=col_storage["blocks"],
1470+
chunks=chunks,
1471+
blocks=blocks,
14661472
cparams=col_storage.get("cparams"),
14671473
dparams=col_storage.get("dparams"),
14681474
)
@@ -3494,8 +3500,10 @@ def sort_by(
34943500
If a column used as a sort key does not support ordering
34953501
(e.g. complex numbers).
34963502
"""
3497-
if self.base is not None:
3498-
raise ValueError("Cannot sort a view. Materialise it first with .to_table() or sort the parent.")
3503+
if self.base is not None and inplace:
3504+
raise ValueError(
3505+
"Cannot sort a view inplace (would modify shared column data). Use sort_by(inplace=False) to get a sorted copy."
3506+
)
34993507
if inplace and self._read_only:
35003508
raise ValueError("Table is read-only (opened with mode='r').")
35013509

@@ -3522,8 +3530,15 @@ def sort_by(
35223530
sorted_pos = live_pos[order]
35233531

35243532
if inplace:
3525-
for _col_name, arr in self._cols.items():
3526-
arr[:n] = arr[sorted_pos]
3533+
for col in self._schema.columns:
3534+
arr = self._cols[col.name]
3535+
if self._is_list_column(col):
3536+
new_arr = ListArray(spec=col.spec)
3537+
new_arr.extend((arr[int(pos)] for pos in sorted_pos), validate=False)
3538+
new_arr.flush()
3539+
self._cols[col.name] = new_arr
3540+
else:
3541+
arr[:n] = arr[sorted_pos]
35273542
self._valid_rows[:n] = True
35283543
self._valid_rows[n:] = False
35293544
self._n_rows = n
@@ -3537,7 +3552,7 @@ def sort_by(
35373552
col_name = col.name
35383553
arr = self._cols[col_name]
35393554
if self._is_list_column(col):
3540-
result._cols[col_name].extend(arr[int(pos)] for pos in sorted_pos)
3555+
result._cols[col_name].extend((arr[int(pos)] for pos in sorted_pos), validate=False)
35413556
result._cols[col_name].flush()
35423557
else:
35433558
result._cols[col_name][:n] = arr[sorted_pos]
@@ -3547,11 +3562,66 @@ def sort_by(
35473562
result._last_pos = n
35483563
return result
35493564

3550-
def _empty_copy(self) -> CTable:
3565+
def copy(self, compact: bool = True) -> CTable:
3566+
"""Return a new standalone in-memory copy of this table.
3567+
3568+
Parameters
3569+
----------
3570+
compact:
3571+
If ``True`` (default), only live (non-deleted) rows are copied.
3572+
The result is a dense table with no tombstones and no parent
3573+
dependency — ideal for materialising a filtered view.
3574+
If ``False``, all physical slots are copied including deleted gaps,
3575+
preserving the tombstone state exactly.
3576+
"""
3577+
valid_np = self._valid_rows[:]
3578+
live_pos = np.where(valid_np)[0]
3579+
n_live = len(live_pos)
3580+
3581+
if compact:
3582+
n = n_live
3583+
else:
3584+
# High watermark: number of slots ever written.
3585+
# List columns are written sequentially with no gaps — their length
3586+
# is the exact high watermark. For scalar-only tables fall back to
3587+
# the last live position + 1 (writes are always sequential so no
3588+
# deleted slot can exist beyond the last live one).
3589+
n = 0
3590+
for col in self._schema.columns:
3591+
if self._is_list_column(col):
3592+
n = len(self._cols[col.name])
3593+
break
3594+
if n == 0:
3595+
n = int(live_pos[-1]) + 1 if n_live > 0 else 0
3596+
3597+
result = self._empty_copy(capacity=n)
3598+
3599+
for col in self._schema.columns:
3600+
col_name = col.name
3601+
arr = self._cols[col_name]
3602+
if self._is_list_column(col):
3603+
src = (arr[int(pos)] for pos in live_pos) if compact else (arr[i] for i in range(n))
3604+
result._cols[col_name].extend(src, validate=False)
3605+
result._cols[col_name].flush()
3606+
else:
3607+
result._cols[col_name][:n] = arr[live_pos] if compact else arr[:n]
3608+
3609+
if compact:
3610+
result._valid_rows[:n] = True
3611+
result._n_rows = n
3612+
result._last_pos = n - 1 if n > 0 else None
3613+
else:
3614+
result._valid_rows[:n] = valid_np[:n]
3615+
result._n_rows = n_live
3616+
result._last_pos = None # recomputed lazily on next append
3617+
3618+
return result
3619+
3620+
def _empty_copy(self, capacity: int | None = None) -> CTable:
35513621
"""Return a new empty in-memory CTable with the same schema and capacity."""
35523622
from blosc2 import compute_chunks_blocks
35533623

3554-
capacity = max(self._n_rows, 1)
3624+
capacity = max(capacity if capacity is not None else self._n_rows, 1)
35553625
default_chunks, default_blocks = compute_chunks_blocks((capacity,))
35563626
mem_storage = InMemoryTableStorage()
35573627

@@ -4385,10 +4455,18 @@ def _try_index_where(self, expr_result: blosc2.LazyExpr) -> np.ndarray | None:
43854455
primary_col_name, primary_col_arr, _ = indexed_columns[0]
43864456

43874457
# Inject every usable table-owned descriptor so plan_query can combine them.
4458+
# In .b2z read mode all columns share the same urlpath, so _array_key()
4459+
# returns the same key for every column — causing _SIDECAR_HANDLE_CACHE
4460+
# collisions across queries. Clear stale handles before each injection so
4461+
# the upcoming query always loads the correct sidecar for this column.
4462+
from blosc2.indexing import _clear_cached_data
4463+
43884464
for _col_name, col_arr, descriptor in indexed_columns:
43894465
arr_key = _array_key(col_arr)
43904466
if _is_persistent_array(col_arr):
43914467
store = _PERSISTENT_INDEXES.get(arr_key) or _default_index_store()
4468+
if store["indexes"].get(descriptor["token"]) is not descriptor:
4469+
_clear_cached_data(col_arr, descriptor["token"])
43924470
store["indexes"][descriptor["token"]] = descriptor
43934471
_PERSISTENT_INDEXES[arr_key] = store
43944472
else:
@@ -4603,7 +4681,11 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46034681
raw_columns[name] = data._cols[name][: data._n_rows]
46044682
provided_names.add(name)
46054683
else:
4606-
if isinstance(data, np.ndarray) and data.dtype.names is not None:
4684+
if isinstance(data, dict):
4685+
provided_names = set(data) & set(current_col_names)
4686+
new_nrows = len(next(iter(data.values())))
4687+
raw_columns = {name: data[name] for name in provided_names}
4688+
elif isinstance(data, np.ndarray) and data.dtype.names is not None:
46074689
new_nrows = len(data)
46084690
raw_columns = {name: data[name] for name in data.dtype.names if name in current_col_names}
46094691
provided_names = set(raw_columns)
@@ -4634,7 +4716,7 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46344716
list_processed_cols[name] = list(raw_columns[name])
46354717
else:
46364718
target_dtype = self._cols[name].dtype
4637-
scalar_processed_cols[name] = blosc2.asarray(raw_columns[name], dtype=target_dtype)
4719+
scalar_processed_cols[name] = np.ascontiguousarray(raw_columns[name], dtype=target_dtype)
46384720

46394721
end_pos = start_pos + new_nrows
46404722

@@ -4649,7 +4731,7 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46494731
for name in current_col_names:
46504732
col_meta = self._schema.columns_by_name[name]
46514733
if self._is_list_column(col_meta):
4652-
self._cols[name].extend(list_processed_cols[name])
4734+
self._cols[name].extend(list_processed_cols[name], validate=do_validate)
46534735
else:
46544736
self._cols[name][start_pos:end_pos] = scalar_processed_cols[name][:]
46554737

0 commit comments

Comments
 (0)