Skip to content

Commit e6731ae

Browse files
committed
Preliminary implementation of OPSI indexes
1 parent 59e41cb commit e6731ae

6 files changed

Lines changed: 1838 additions & 30 deletions

File tree

bench/indexing/index_query_bench.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@
2424

2525
SIZES = (1_000_000, 2_000_000, 5_000_000, 10_000_000)
2626
DEFAULT_REPEATS = 3
27-
KINDS = ("summary", "bucket", "partial", "full")
27+
KINDS = ("summary", "bucket", "partial", "full", "opsi")
2828
DEFAULT_KIND = "bucket"
2929
DISTS = ("sorted", "block-shuffled", "permuted", "random")
3030
RNG_SEED = 0
3131
DEFAULT_OPLEVEL = 5
3232
FULL_QUERY_MODES = ("auto", "selective-ooc", "whole-load")
3333
DATASET_LAYOUT_VERSION = "payload-ramp-v1"
3434
BUILD_MODES = ("auto", "memory", "ooc")
35+
FULL_INDEX_METHODS = ("global-sort", "opsi")
36+
FULL_INDEX_METHODS = ("global-sort", "opsi")
3537

3638
COLD_COLUMNS = [
3739
("rows", lambda result: f"{result['size']:,}"),
@@ -443,19 +445,33 @@ def _condition_expr(lo: object, hi: object, dtype: np.dtype, *, query_single_val
443445
return f"(id >= {lo_literal}) & (id <= {hi_literal})"
444446

445447

448+
def _index_kind_method(kind: str) -> tuple[str, str | None]:
449+
if kind == "opsi":
450+
return "full", "opsi"
451+
if kind == "full":
452+
return "full", "global-sort"
453+
return kind, None
454+
455+
446456
def _valid_index_descriptor(arr: blosc2.NDArray, kind: str, optlevel: int, build: str) -> dict | None:
457+
actual_kind, method = _index_kind_method(kind)
447458
for descriptor in arr.indexes:
448459
if descriptor.get("version") != blosc2_indexing.INDEX_FORMAT_VERSION:
449460
continue
450461
expected_ooc = build != "memory"
451-
if (
462+
if not (
452463
descriptor.get("field") == "id"
453-
and descriptor.get("kind") == kind
464+
and descriptor.get("kind") == actual_kind
454465
and int(descriptor.get("optlevel", -1)) == int(optlevel)
455466
and bool(descriptor.get("ooc", False)) is bool(expected_ooc)
456467
and not descriptor.get("stale", False)
457468
):
458-
return descriptor
469+
continue
470+
if method is not None:
471+
build_method = descriptor.get("full", {}).get("build_method", "global-sort")
472+
if build_method != method:
473+
continue
474+
return descriptor
459475
return None
460476

461477

@@ -482,6 +498,7 @@ def _open_or_build_indexed_array(
482498
clevel: int | None,
483499
nthreads: int | None,
484500
no_mmap: bool,
501+
opsi_max_cycles: int,
485502
) -> tuple[blosc2.NDArray, float]:
486503
if path.exists():
487504
arr = blosc2.open(path, mode="a")
@@ -493,12 +510,17 @@ def _open_or_build_indexed_array(
493510

494511
arr = build_persistent_array(size, dist, id_dtype, path, chunks, blocks)
495512
build_start = time.perf_counter()
513+
actual_kind, method = _index_kind_method(kind)
496514
kwargs = {
497515
"field": "id",
498-
"kind": blosc2.IndexKind[kind.upper()],
516+
"kind": blosc2.IndexKind[actual_kind.upper()],
499517
"optlevel": optlevel,
500518
"build": build,
501519
}
520+
if method is not None:
521+
kwargs["method"] = method
522+
if method == "opsi":
523+
kwargs["opsi_max_cycles"] = opsi_max_cycles
502524
cparams = {}
503525
if codec is not None:
504526
cparams["codec"] = codec
@@ -531,6 +553,7 @@ def benchmark_size(
531553
kinds: tuple[str, ...],
532554
repeats: int,
533555
no_mmap: bool,
556+
opsi_max_cycles: int,
534557
cold_row_callback=None,
535558
) -> list[dict]:
536559
arr = _open_or_build_persistent_array(
@@ -574,6 +597,7 @@ def benchmark_size(
574597
clevel,
575598
nthreads,
576599
no_mmap,
600+
opsi_max_cycles,
577601
)
578602
idx_cond = blosc2.lazyexpr(condition_str, idx_arr.fields)
579603
idx_expr = idx_cond.where(idx_arr)
@@ -725,12 +749,28 @@ def parse_args() -> argparse.Namespace:
725749
default="auto",
726750
help="Index builder policy: auto, memory, or ooc. Default: auto.",
727751
)
752+
parser.add_argument(
753+
"--method",
754+
choices=FULL_INDEX_METHODS,
755+
default="global-sort",
756+
help=(
757+
"Full-index build method. Use with --kind full: global-sort selects the current full "
758+
"builder, opsi selects the OPSI builder. --kind opsi is a shorthand for "
759+
"--kind full --method opsi. Default: global-sort."
760+
),
761+
)
728762
parser.add_argument(
729763
"--full-query-mode",
730764
choices=FULL_QUERY_MODES,
731765
default="auto",
732766
help="How full exact queries should run during the benchmark: auto, selective-ooc, or whole-load.",
733767
)
768+
parser.add_argument(
769+
"--opsi-max-cycles",
770+
type=int,
771+
default=3,
772+
help="Maximum OPSI cycles before falling back to global-sort. Default: 3.",
773+
)
734774
parser.add_argument(
735775
"--codec",
736776
type=str,
@@ -776,9 +816,18 @@ def main() -> None:
776816
raise SystemExit("--clevel must be >= 0")
777817
if args.nthreads is not None and args.nthreads <= 0:
778818
raise SystemExit("--nthreads must be a positive integer")
819+
if args.opsi_max_cycles < 0:
820+
raise SystemExit("--opsi-max-cycles must be >= 0")
821+
if args.method == "opsi" and args.kind != "full":
822+
raise SystemExit("--method opsi requires --kind full; alternatively use --kind opsi")
823+
if args.kind == "opsi" and args.method != "global-sort":
824+
raise SystemExit("--kind opsi already selects the OPSI method; do not also pass --method")
779825
sizes = (args.size,) if args.size is not None else SIZES
780826
dists = DISTS if args.dist == "all" else (args.dist,)
781-
kinds = KINDS if args.kind == "all" else (args.kind,)
827+
if args.kind == "full" and args.method == "opsi":
828+
kinds = ("opsi",)
829+
else:
830+
kinds = KINDS if args.kind == "all" else (args.kind,)
782831

783832
if args.outdir is None:
784833
with tempfile.TemporaryDirectory() as tmpdir:
@@ -801,6 +850,7 @@ def main() -> None:
801850
args.clevel,
802851
args.nthreads,
803852
args.no_mmap,
853+
args.opsi_max_cycles,
804854
)
805855
else:
806856
args.outdir.mkdir(parents=True, exist_ok=True)
@@ -823,6 +873,7 @@ def main() -> None:
823873
args.clevel,
824874
args.nthreads,
825875
args.no_mmap,
876+
args.opsi_max_cycles,
826877
)
827878

828879

@@ -845,6 +896,7 @@ def run_benchmarks(
845896
clevel: int | None,
846897
nthreads: int | None,
847898
no_mmap: bool,
899+
opsi_max_cycles: int,
848900
) -> None:
849901
all_results = []
850902

@@ -894,6 +946,7 @@ def cold_progress_callback(row: dict) -> None:
894946
kinds,
895947
repeats,
896948
no_mmap,
949+
opsi_max_cycles,
897950
cold_row_callback=cold_progress_callback,
898951
)
899952
all_results.extend(size_results)

0 commit comments

Comments
 (0)