Skip to content

Commit 7d2ba50

Browse files
committed
Iter_sorted() added to CTable
1 parent 9d542ec commit 7d2ba50

1 file changed

Lines changed: 103 additions & 16 deletions

File tree

src/blosc2/ctable.py

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,79 @@ def __iter__(self):
16741674
for i in range(self.nrows):
16751675
yield _Row(self, i)
16761676

1677+
def iter_sorted(
1678+
self,
1679+
cols: str | list[str],
1680+
ascending: bool | list[bool] = True,
1681+
*,
1682+
start: int | None = None,
1683+
stop: int | None = None,
1684+
step: int | None = None,
1685+
batch_size: int = 4096,
1686+
):
1687+
"""Iterate rows in sorted order without materializing a full copy.
1688+
1689+
Uses a FULL index when available (no sort needed); otherwise falls
1690+
back to ``np.lexsort`` on live physical positions. Yields :class:`_Row`
1691+
objects in the same way as ``__iter__``.
1692+
1693+
The sorted positions array is stored as a compressed ``blosc2.NDArray``
1694+
to keep RAM usage low for large tables. ``batch_size`` positions are
1695+
decompressed at a time during iteration.
1696+
1697+
Parameters
1698+
----------
1699+
cols:
1700+
Column name or list of column names to sort by.
1701+
ascending:
1702+
Sort direction. A single bool applies to all keys; a list must
1703+
have the same length as *cols*.
1704+
start, stop, step:
1705+
Optional slice applied to the sorted sequence before iteration.
1706+
E.g. ``stop=10`` yields only the top-10 rows; ``step=2`` yields
1707+
every other row in sorted order.
1708+
batch_size:
1709+
Number of positions decompressed per iteration step. Larger
1710+
values reduce decompression overhead; smaller values use less
1711+
transient RAM. Default is 4096.
1712+
"""
1713+
cols, ascending = self._normalise_sort_keys(cols, ascending)
1714+
1715+
valid_np = self._valid_rows[:]
1716+
live_pos = np.where(valid_np)[0]
1717+
n = len(live_pos)
1718+
1719+
if n == 0:
1720+
return
1721+
1722+
sorted_pos = None
1723+
if len(cols) == 1:
1724+
sorted_pos = self._sorted_positions_from_full_index(cols[0], ascending[0])
1725+
if sorted_pos is not None and len(sorted_pos) != n:
1726+
sorted_pos = None
1727+
1728+
if sorted_pos is None:
1729+
order = np.lexsort(self._build_lex_keys(cols, ascending, live_pos, n))
1730+
sorted_pos = live_pos[order]
1731+
1732+
if start is not None or stop is not None or step is not None:
1733+
sorted_pos = sorted_pos[start:stop:step]
1734+
1735+
# Compress positions into an NDArray to reduce RAM usage for large tables.
1736+
# The uncompressed numpy array is released immediately after.
1737+
sorted_pos_nd = blosc2.asarray(np.asarray(sorted_pos, dtype=np.int64))
1738+
del sorted_pos
1739+
1740+
# physical → logical index mapping
1741+
phys_to_logical = np.empty(valid_np.shape[0], dtype=np.intp)
1742+
phys_to_logical[live_pos] = np.arange(n, dtype=np.intp)
1743+
1744+
total = len(sorted_pos_nd)
1745+
for i in range(0, total, batch_size):
1746+
chunk = sorted_pos_nd[i : i + batch_size]
1747+
for phys in chunk:
1748+
yield _Row(self, int(phys_to_logical[phys]))
1749+
16771750
# ------------------------------------------------------------------
16781751
# Open existing table (classmethod)
16791752
# ------------------------------------------------------------------
@@ -3369,27 +3442,23 @@ def _normalise_sort_keys(
33693442
return cols, ascending
33703443

33713444
def _sorted_positions_from_full_index(self, name: str, ascending: bool) -> np.ndarray | None:
3372-
"""Return live physical positions from a matching FULL index, if available."""
3373-
from blosc2.indexing import ordered_indices
3445+
"""Return live physical positions from a matching FULL index, if available.
33743446
3447+
Reads the pre-sorted positions sidecar directly rather than going through
3448+
the ordered_indices query machinery, which is optimised for selective range
3449+
queries and is much slower for full-table streaming.
3450+
"""
33753451
root = self._root_table
33763452
catalog = root._storage.load_index_catalog()
33773453
descriptor = None
3378-
target_arr = None
33793454

33803455
if name in root._cols:
33813456
descriptor = catalog.get(name)
3382-
if (
3383-
descriptor is not None
3384-
and descriptor.get("kind") == "full"
3385-
and not descriptor.get("stale", False)
3386-
):
3387-
target_arr = root._cols[name]
3388-
else:
3457+
if descriptor is None or descriptor.get("kind") != "full" or descriptor.get("stale", False):
33893458
descriptor = None
33903459
elif name in root._computed_cols:
33913460
cc = root._computed_cols[name]
3392-
for lookup_key, candidate in catalog.items():
3461+
for _lookup_key, candidate in catalog.items():
33933462
target = candidate.get("target") or {}
33943463
if (
33953464
target.get("source") == "expression"
@@ -3399,14 +3468,32 @@ def _sorted_positions_from_full_index(self, name: str, ascending: bool) -> np.nd
33993468
and list(target.get("dependencies", [])) == list(cc["col_deps"])
34003469
):
34013470
descriptor = candidate
3402-
target_arr = root._index_target_array(lookup_key, candidate)
34033471
break
3404-
if descriptor is None or target_arr is None:
3472+
if descriptor is None:
34053473
return None
34063474

3407-
positions = ordered_indices(target_arr, require_full=True)
3408-
if positions is None:
3409-
return None
3475+
positions_path = descriptor.get("full", {}).get("positions_path")
3476+
3477+
# Read pre-sorted positions directly — bypasses the ordered_indices query
3478+
# machinery which is built for selective range queries and is ~70x slower
3479+
# for full-table streaming.
3480+
if positions_path is not None:
3481+
# Persistent table: positions live in a sidecar .b2nd file.
3482+
positions_nd = blosc2.open(positions_path, mode="r")
3483+
else:
3484+
# In-memory table: positions live in the sidecar handle cache.
3485+
from blosc2.indexing import _SIDECAR_HANDLE_CACHE, _sidecar_handle_cache_key
3486+
3487+
target_arr = root._cols.get(name)
3488+
if target_arr is None:
3489+
return None
3490+
token = descriptor["token"]
3491+
cache_key = _sidecar_handle_cache_key(target_arr, token, "full", "positions")
3492+
positions_nd = _SIDECAR_HANDLE_CACHE.get(cache_key)
3493+
if positions_nd is None:
3494+
return None
3495+
3496+
positions = np.asarray(positions_nd[:], dtype=np.int64)
34103497
valid = root._valid_rows[:]
34113498
positions = np.asarray(positions, dtype=np.int64)
34123499
positions = positions[(positions >= 0) & (positions < len(valid))]

0 commit comments

Comments
 (0)