Skip to content

Commit 23c350a

Browse files
committed
Index on b2z
1 parent 79eb799 commit 23c350a

6 files changed

Lines changed: 355 additions & 27 deletions

File tree

examples/ctable/index_on_b2z.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""Demonstrate that CTable indexes survive a .b2z round-trip.
2+
3+
Steps
4+
-----
5+
1. Build a small CTable with synthetic sensor data and save it as .b2z.
6+
2. Measure query speed with full scan (no index).
7+
3. Reopen in append mode, create FULL indexes, close (triggers rezip).
8+
4. Reopen read-only — indexes are present and queries are faster.
9+
"""
10+
11+
import os
12+
import shutil
13+
import time
14+
from dataclasses import dataclass
15+
16+
import numpy as np
17+
18+
import blosc2
19+
20+
# ---------------------------------------------------------------------------
21+
# 1. Schema and synthetic data
22+
# ---------------------------------------------------------------------------
23+
24+
25+
@dataclass
26+
class Reading:
27+
sensor_id: int = blosc2.field(blosc2.int32())
28+
timestamp: int = blosc2.field(blosc2.int64())
29+
value: float = blosc2.field(blosc2.float64())
30+
active: bool = blosc2.field(blosc2.bool())
31+
32+
33+
N = 5_000_000
34+
rng = np.random.default_rng(42)
35+
B2D = "/tmp/sensors.b2d"
36+
B2Z = "/tmp/sensors.b2z"
37+
38+
for p in (B2D, B2Z):
39+
if os.path.exists(p):
40+
(shutil.rmtree if os.path.isdir(p) else os.remove)(p)
41+
42+
# ---------------------------------------------------------------------------
43+
# 2. Create and zip
44+
# ---------------------------------------------------------------------------
45+
46+
print(f"Creating CTable with {N:,} rows ...")
47+
ct = blosc2.CTable(Reading, urlpath=B2D, mode="w", expected_size=N)
48+
ct.extend(
49+
{
50+
"sensor_id": rng.integers(0, 100, N, dtype=np.int32),
51+
"timestamp": np.arange(N, dtype=np.int64),
52+
"value": rng.uniform(-50.0, 150.0, N),
53+
"active": rng.integers(0, 2, N).astype(bool),
54+
}
55+
)
56+
ct.close()
57+
58+
store = blosc2.TreeStore(B2D, mode="r")
59+
store.to_b2z(filename=B2Z, overwrite=True)
60+
store.discard()
61+
shutil.rmtree(B2D)
62+
print(f" saved → {B2Z} ({os.path.getsize(B2Z) / 1e6:.1f} MB)")
63+
64+
# ---------------------------------------------------------------------------
65+
# 3. Baseline: full scan (no index)
66+
# ---------------------------------------------------------------------------
67+
68+
QUERIES = [
69+
("value > 100.0", lambda ct: ct.where(ct["value"] > 100.0)),
70+
("value > 120.0", lambda ct: ct.where(ct["value"] > 120.0)),
71+
("value between 0 and 10", lambda ct: ct.where((ct["value"] >= 0.0) & (ct["value"] <= 10.0))),
72+
("timestamp > 450_000", lambda ct: ct.where(ct["timestamp"] > 4_500_000)),
73+
("timestamp > 4_999_000", lambda ct: ct.where(ct["timestamp"] > 4_999_000)),
74+
]
75+
76+
77+
def bench(fn, reps=5):
78+
times = [0.0] * reps
79+
for i in range(reps):
80+
t = time.perf_counter()
81+
result = fn()
82+
times[i] = (time.perf_counter() - t) * 1000
83+
return min(times), result
84+
85+
86+
print("\n--- Full scan (no index) ---")
87+
baseline = {}
88+
ct = blosc2.CTable.open(B2Z, mode="r")
89+
assert not ct.indexes, "expected no indexes yet"
90+
for label, fn in QUERIES:
91+
ms, view = bench(lambda fn=fn: fn(ct))
92+
baseline[label] = (ms, len(view))
93+
print(f" {label:<38} {ms:7.1f} ms {len(view):>8,} rows")
94+
ct.close()
95+
96+
# ---------------------------------------------------------------------------
97+
# 4. Build indexes (append mode → rezip on close)
98+
# ---------------------------------------------------------------------------
99+
100+
print("\nBuilding indexes (mode='a') ...")
101+
ct = blosc2.CTable.open(B2Z, mode="a")
102+
103+
t0 = time.perf_counter()
104+
ct.create_index("value", kind=blosc2.IndexKind.FULL)
105+
print(f" value FULL index {(time.perf_counter() - t0) * 1000:.0f} ms")
106+
107+
t0 = time.perf_counter()
108+
ct.create_index("timestamp", kind=blosc2.IndexKind.FULL)
109+
print(f" timestamp FULL index {(time.perf_counter() - t0) * 1000:.0f} ms")
110+
111+
t0 = time.perf_counter()
112+
ct.close()
113+
print(
114+
f" closed + rezipped {(time.perf_counter() - t0) * 1000:.0f} ms "
115+
f"({os.path.getsize(B2Z) / 1e6:.1f} MB)"
116+
)
117+
118+
# ---------------------------------------------------------------------------
119+
# 5. Read-only: verify indexes survived, benchmark
120+
# ---------------------------------------------------------------------------
121+
122+
print("\nReopening .b2z read-only ...")
123+
ct = blosc2.CTable.open(B2Z, mode="r")
124+
found = [idx.col_name for idx in ct.indexes]
125+
print(f" indexes present: {found}")
126+
assert "value" in found, "index for 'value' missing after round-trip!"
127+
assert "timestamp" in found, "index for 'timestamp' missing after round-trip!"
128+
129+
print()
130+
print(f"{'query':<38} {'no index':>9} {'indexed':>9} {'speedup':>8} {'rows':>8}")
131+
print("-" * 78)
132+
133+
for label, fn in QUERIES:
134+
i_ms, view = bench(lambda fn=fn: fn(ct))
135+
b_ms, b_n = baseline[label]
136+
sp = b_ms / i_ms if i_ms > 0 else float("inf")
137+
assert len(view) == b_n, f"row count mismatch for {label!r}"
138+
print(f" {label:<38} {b_ms:8.1f}ms {i_ms:8.1f}ms {sp:7.1f}x {len(view):>8,}")
139+
140+
ct.close()
141+
142+
# ---------------------------------------------------------------------------
143+
# 6. Cleanup
144+
# ---------------------------------------------------------------------------
145+
146+
os.remove(B2Z)
147+
print("\nDone.")

src/blosc2/ctable.py

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,12 +1458,18 @@ def _init_columns(
14581458
dparams=col_storage.get("dparams"),
14591459
)
14601460
continue
1461+
# Recompute chunks/blocks using the actual dtype so that wide
1462+
# string columns (e.g. U183642) don't produce multi-GB chunks.
1463+
chunks = col_storage["chunks"]
1464+
blocks = col_storage["blocks"]
1465+
if col.config.chunks is None and col.config.blocks is None:
1466+
chunks, blocks = compute_chunks_blocks((expected_size,), dtype=col.dtype)
14611467
self._cols[col.name] = storage.create_column(
14621468
col.name,
14631469
dtype=col.dtype,
14641470
shape=(expected_size,),
1465-
chunks=col_storage["chunks"],
1466-
blocks=col_storage["blocks"],
1471+
chunks=chunks,
1472+
blocks=blocks,
14671473
cparams=col_storage.get("cparams"),
14681474
dparams=col_storage.get("dparams"),
14691475
)
@@ -3495,8 +3501,10 @@ def sort_by(
34953501
If a column used as a sort key does not support ordering
34963502
(e.g. complex numbers).
34973503
"""
3498-
if self.base is not None:
3499-
raise ValueError("Cannot sort a view. Materialise it first with .to_table() or sort the parent.")
3504+
if self.base is not None and inplace:
3505+
raise ValueError(
3506+
"Cannot sort a view inplace (would modify shared column data). Use sort_by(inplace=False) to get a sorted copy."
3507+
)
35003508
if inplace and self._read_only:
35013509
raise ValueError("Table is read-only (opened with mode='r').")
35023510

@@ -3523,8 +3531,15 @@ def sort_by(
35233531
sorted_pos = live_pos[order]
35243532

35253533
if inplace:
3526-
for _col_name, arr in self._cols.items():
3527-
arr[:n] = arr[sorted_pos]
3534+
for col in self._schema.columns:
3535+
arr = self._cols[col.name]
3536+
if self._is_list_column(col):
3537+
new_arr = ListArray(spec=col.spec)
3538+
new_arr.extend((arr[int(pos)] for pos in sorted_pos), validate=False)
3539+
new_arr.flush()
3540+
self._cols[col.name] = new_arr
3541+
else:
3542+
arr[:n] = arr[sorted_pos]
35283543
self._valid_rows[:n] = True
35293544
self._valid_rows[n:] = False
35303545
self._n_rows = n
@@ -3538,7 +3553,7 @@ def sort_by(
35383553
col_name = col.name
35393554
arr = self._cols[col_name]
35403555
if self._is_list_column(col):
3541-
result._cols[col_name].extend(arr[int(pos)] for pos in sorted_pos)
3556+
result._cols[col_name].extend((arr[int(pos)] for pos in sorted_pos), validate=False)
35423557
result._cols[col_name].flush()
35433558
else:
35443559
result._cols[col_name][:n] = arr[sorted_pos]
@@ -3548,11 +3563,66 @@ def sort_by(
35483563
result._last_pos = n
35493564
return result
35503565

3551-
def _empty_copy(self) -> CTable:
3566+
def copy(self, compact: bool = True) -> CTable:
3567+
"""Return a new standalone in-memory copy of this table.
3568+
3569+
Parameters
3570+
----------
3571+
compact:
3572+
If ``True`` (default), only live (non-deleted) rows are copied.
3573+
The result is a dense table with no tombstones and no parent
3574+
dependency — ideal for materialising a filtered view.
3575+
If ``False``, all physical slots are copied including deleted gaps,
3576+
preserving the tombstone state exactly.
3577+
"""
3578+
valid_np = self._valid_rows[:]
3579+
live_pos = np.where(valid_np)[0]
3580+
n_live = len(live_pos)
3581+
3582+
if compact:
3583+
n = n_live
3584+
else:
3585+
# High watermark: number of slots ever written.
3586+
# List columns are written sequentially with no gaps — their length
3587+
# is the exact high watermark. For scalar-only tables fall back to
3588+
# the last live position + 1 (writes are always sequential so no
3589+
# deleted slot can exist beyond the last live one).
3590+
n = 0
3591+
for col in self._schema.columns:
3592+
if self._is_list_column(col):
3593+
n = len(self._cols[col.name])
3594+
break
3595+
if n == 0:
3596+
n = int(live_pos[-1]) + 1 if n_live > 0 else 0
3597+
3598+
result = self._empty_copy(capacity=n)
3599+
3600+
for col in self._schema.columns:
3601+
col_name = col.name
3602+
arr = self._cols[col_name]
3603+
if self._is_list_column(col):
3604+
src = (arr[int(pos)] for pos in live_pos) if compact else (arr[i] for i in range(n))
3605+
result._cols[col_name].extend(src, validate=False)
3606+
result._cols[col_name].flush()
3607+
else:
3608+
result._cols[col_name][:n] = arr[live_pos] if compact else arr[:n]
3609+
3610+
if compact:
3611+
result._valid_rows[:n] = True
3612+
result._n_rows = n
3613+
result._last_pos = n - 1 if n > 0 else None
3614+
else:
3615+
result._valid_rows[:n] = valid_np[:n]
3616+
result._n_rows = n_live
3617+
result._last_pos = None # recomputed lazily on next append
3618+
3619+
return result
3620+
3621+
def _empty_copy(self, capacity: int | None = None) -> CTable:
35523622
"""Return a new empty in-memory CTable with the same schema and capacity."""
35533623
from blosc2 import compute_chunks_blocks
35543624

3555-
capacity = max(self._n_rows, 1)
3625+
capacity = max(capacity if capacity is not None else self._n_rows, 1)
35563626
default_chunks, default_blocks = compute_chunks_blocks((capacity,))
35573627
mem_storage = InMemoryTableStorage()
35583628

@@ -4386,10 +4456,18 @@ def _try_index_where(self, expr_result: blosc2.LazyExpr) -> np.ndarray | None:
43864456
primary_col_name, primary_col_arr, _ = indexed_columns[0]
43874457

43884458
# Inject every usable table-owned descriptor so plan_query can combine them.
4459+
# In .b2z read mode all columns share the same urlpath, so _array_key()
4460+
# returns the same key for every column — causing _SIDECAR_HANDLE_CACHE
4461+
# collisions across queries. Clear stale handles before each injection so
4462+
# the upcoming query always loads the correct sidecar for this column.
4463+
from blosc2.indexing import _clear_cached_data
4464+
43894465
for _col_name, col_arr, descriptor in indexed_columns:
43904466
arr_key = _array_key(col_arr)
43914467
if _is_persistent_array(col_arr):
43924468
store = _PERSISTENT_INDEXES.get(arr_key) or _default_index_store()
4469+
if store["indexes"].get(descriptor["token"]) is not descriptor:
4470+
_clear_cached_data(col_arr, descriptor["token"])
43934471
store["indexes"][descriptor["token"]] = descriptor
43944472
_PERSISTENT_INDEXES[arr_key] = store
43954473
else:
@@ -4604,7 +4682,11 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46044682
raw_columns[name] = data._cols[name][: data._n_rows]
46054683
provided_names.add(name)
46064684
else:
4607-
if isinstance(data, np.ndarray) and data.dtype.names is not None:
4685+
if isinstance(data, dict):
4686+
provided_names = set(data) & set(current_col_names)
4687+
new_nrows = len(next(iter(data.values())))
4688+
raw_columns = {name: data[name] for name in provided_names}
4689+
elif isinstance(data, np.ndarray) and data.dtype.names is not None:
46084690
new_nrows = len(data)
46094691
raw_columns = {name: data[name] for name in data.dtype.names if name in current_col_names}
46104692
provided_names = set(raw_columns)
@@ -4635,7 +4717,7 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46354717
list_processed_cols[name] = list(raw_columns[name])
46364718
else:
46374719
target_dtype = self._cols[name].dtype
4638-
scalar_processed_cols[name] = blosc2.asarray(raw_columns[name], dtype=target_dtype)
4720+
scalar_processed_cols[name] = np.ascontiguousarray(raw_columns[name], dtype=target_dtype)
46394721

46404722
end_pos = start_pos + new_nrows
46414723

@@ -4650,7 +4732,7 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) ->
46504732
for name in current_col_names:
46514733
col_meta = self._schema.columns_by_name[name]
46524734
if self._is_list_column(col_meta):
4653-
self._cols[name].extend(list_processed_cols[name])
4735+
self._cols[name].extend(list_processed_cols[name], validate=do_validate)
46544736
else:
46554737
self._cols[name][start_pos:end_pos] = scalar_processed_cols[name][:]
46564738

0 commit comments

Comments
 (0)