From b30b9e101c21bed43c999143679a284d815ca54e Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 10 Apr 2026 13:25:41 -0500 Subject: [PATCH 1/5] Add OpenSearch backend to cuvs-bench Signed-off-by: James Bourbeau --- .../cuvs_bench/backends/__init__.py | 3 + .../cuvs_bench/backends/opensearch.py | 978 ++++++++++++++++++ .../cuvs_bench/backends/search_spaces.py | 24 + .../config/algos/opensearch_faiss_hnsw.yaml | 37 + .../config/algos/opensearch_hnsw.yaml | 41 + .../cuvs_bench/orchestrator/__init__.py | 3 + .../cuvs_bench/orchestrator/orchestrator.py | 122 ++- .../cuvs_bench/tests/test_opensearch.py | 314 ++++++ python/cuvs_bench/pyproject.toml | 8 + 9 files changed, 1473 insertions(+), 57 deletions(-) create mode 100644 python/cuvs_bench/cuvs_bench/backends/opensearch.py create mode 100644 python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml create mode 100644 python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml create mode 100644 python/cuvs_bench/cuvs_bench/tests/test_opensearch.py diff --git a/python/cuvs_bench/cuvs_bench/backends/__init__.py b/python/cuvs_bench/cuvs_bench/backends/__init__.py index 3e9688dbe4..a09e6fd1fe 100644 --- a/python/cuvs_bench/cuvs_bench/backends/__init__.py +++ b/python/cuvs_bench/cuvs_bench/backends/__init__.py @@ -25,10 +25,12 @@ ) from .cpp_gbench import CppGoogleBenchmarkBackend +from .opensearch import OpenSearchBackend # Auto-register built-in backends _registry = get_registry() _registry.register("cpp_gbench", CppGoogleBenchmarkBackend) +_registry.register("opensearch", OpenSearchBackend) __all__ = [ # Base classes and data structures @@ -43,4 +45,5 @@ "get_backend", # Built-in backends "CppGoogleBenchmarkBackend", + "OpenSearchBackend", ] diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py new file mode 100644 index 0000000000..192f72705c --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -0,0 +1,978 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# + +""" +OpenSearch benchmark backend for cuvs-bench supporting nmslib (HNSW), faiss, +and lucene engines for approximate nearest-neighbor search. + +It also supports the remote index build service (OpenSearch 3.0+), +which offloads Faiss HNSW graph construction to a GPU-accelerated external service. +https://docs.opensearch.org/latest/vector-search/remote-index-build/ +""" + +import itertools +import os +import time +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import yaml + +from .base import BenchmarkBackend, BuildResult, Dataset, SearchResult +from ..orchestrator.config_loaders import ( + ConfigLoader, + DatasetConfig, + BenchmarkConfig, + IndexConfig, +) + + +def _load_vectors( + path: str, subset_size: Optional[int] = None +) -> np.ndarray: + """Read a binary vector file (.fbin, .ibin, .u8bin, ...). + + The file format is: 4-byte uint32 n_rows, 4-byte uint32 n_cols, + followed by n_rows x n_cols elements of the matching dtype. + """ + _DTYPE_FOR_EXT = { + ".fbin": np.float32, + ".f16bin": np.float16, + ".u8bin": np.uint8, + ".i8bin": np.int8, + ".ibin": np.int32, + } + + ext = os.path.splitext(path)[1].lower() + dtype = _DTYPE_FOR_EXT.get(ext, np.float32) + with open(path, "rb") as f: + n_rows = int(np.frombuffer(f.read(4), dtype=np.uint32)[0]) + n_cols = int(np.frombuffer(f.read(4), dtype=np.uint32)[0]) + if subset_size is not None: + n_rows = min(n_rows, subset_size) + raw = f.read(n_rows * n_cols * np.dtype(dtype).itemsize) + data = np.frombuffer(raw, dtype=dtype) + return data.reshape(n_rows, n_cols) + + +def _load_yaml(path: str) -> Any: + with open(path, "r") as fh: + return yaml.safe_load(fh) + + +def _get_dataset_conf(dataset: str, all_confs: list) -> dict: + for d in all_confs: + if dataset == d["name"]: + return d + raise ValueError( + f"Could not find a dataset configuration for {dataset!r} in datasets.yaml" + ) + + +def _gather_algo_configs( + config_path: str, algorithm_configuration: Optional[str] +) -> List[str]: + """Return file paths of opensearch-prefixed algo YAML files.""" + algos_dir = os.path.join(config_path, "algos") + files: List[str] = [ + os.path.join(algos_dir, f) + for f in os.listdir(algos_dir) + if f.startswith("opensearch") and f.endswith(".yaml") + ] + if algorithm_configuration: + if os.path.isdir(algorithm_configuration): + files += [ + os.path.join(algorithm_configuration, f) + for f in os.listdir(algorithm_configuration) + if f.startswith("opensearch") and f.endswith(".yaml") + ] + elif os.path.isfile(algorithm_configuration): + files.append(algorithm_configuration) + return files + + +class OpenSearchConfigLoader(ConfigLoader): + """ + Configuration loader for the OpenSearch backend. + + Reads the shared ``datasets.yaml`` and opensearch-prefixed algorithm YAML + files from the standard config directory. Expands the Cartesian product of + build-parameter lists and returns one :class:`BenchmarkConfig` per + build-parameter combination (with one :class:`IndexConfig` each carrying + the full list of search-parameter combinations). + + Registration + ------------ + from cuvs_bench.orchestrator import register_config_loader + register_config_loader("opensearch", OpenSearchConfigLoader) + """ + + _DEFAULT_CONFIG_PATH: str = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "../config" + ) + + @property + def backend_type(self) -> str: + return "opensearch" + + def load( + self, + dataset: str, + dataset_path: str, + algorithms: Optional[str] = None, + count: int = 10, + batch_size: int = 10000, + **kwargs, + ) -> Tuple[DatasetConfig, List[BenchmarkConfig]]: + """ + Load OpenSearch benchmark configurations. + + Parameters + ---------- + dataset : str + Dataset name; must appear in ``datasets.yaml``. + dataset_path : str + Root directory where dataset files live. + algorithms : Optional[str] + Comma-separated algorithm names to run (e.g. ``"opensearch_hnsw"``). + If *None*, all opensearch-prefixed algorithms are included. + count : int + Number of neighbors *k* (informational for this loader). + batch_size : int + Query batch size (informational for this loader). + **kwargs + Recognized extra keys: + + - ``dataset_configuration`` – path to a custom ``datasets.yaml`` + - ``algorithm_configuration`` – path to an extra algo config dir/file + - ``groups`` – comma-separated group names to restrict to + - ``subset_size`` – limit dataset to first *N* vectors + - ``host`` – OpenSearch host (default: ``"localhost"``) + - ``port`` – OpenSearch port (default: ``9200``) + - ``username`` – HTTP auth username (default: ``"admin"``) + - ``password`` – HTTP auth password (default: ``"admin"``) + - ``use_ssl`` – whether to use HTTPS (default: ``False``) + - ``verify_certs`` – verify SSL certificates (default: ``False``) + - ``bulk_batch_size`` – vectors per bulk request (default: ``500``) + - ``remote_index_build`` – enable GPU remote index build (default: ``False``) + - ``remote_build_size_min`` – minimum segment size to trigger GPU build (default: ``"1kb"``) + - ``remote_build_s3_endpoint`` – S3 endpoint URL + - ``remote_build_s3_bucket`` – bucket name (default: ``"opensearch-vectors"``) + - ``remote_build_s3_prefix`` – key prefix for ``.faiss`` polling (default: ``"knn-indexes/"``) + - ``remote_build_s3_access_key`` – S3 access key + - ``remote_build_s3_secret_key`` – S3 secret key + - ``remote_build_timeout`` – GPU build timeout in seconds (default: ``600``) + + Returns + ------- + Tuple[DatasetConfig, List[BenchmarkConfig]] + """ + config_path = self._DEFAULT_CONFIG_PATH + + ds_yaml = kwargs.get("dataset_configuration") or os.path.join( + config_path, "datasets", "datasets.yaml" + ) + all_ds = _load_yaml(ds_yaml) + ds_conf = _get_dataset_conf(dataset, all_ds) + + def _resolve(rel: Optional[str]) -> Optional[str]: + if rel and not os.path.isabs(rel): + return os.path.join(dataset_path, rel) + return rel + + dataset_config = DatasetConfig( + name=ds_conf["name"], + base_file=_resolve(ds_conf.get("base_file")), + query_file=_resolve(ds_conf.get("query_file")), + groundtruth_neighbors_file=_resolve( + ds_conf.get("groundtruth_neighbors_file") + ), + distance=ds_conf.get("distance", "euclidean"), + dims=ds_conf.get("dims"), + subset_size=kwargs.get("subset_size"), + ) + + algo_files = _gather_algo_configs( + config_path, kwargs.get("algorithm_configuration") + ) + + allowed_algos = ( + [a.strip() for a in algorithms.split(",")] if algorithms else None + ) + allowed_groups = ( + [g.strip() for g in kwargs["groups"].split(",")] + if kwargs.get("groups") + else None + ) + + host = kwargs.get("host", "localhost") + port = kwargs.get("port", 9200) + + # Connection and remote-build kwargs forwarded verbatim to backend_config + _conn_keys = ( + "username", + "password", + "use_ssl", + "verify_certs", + "bulk_batch_size", + # Remote Index Build (OpenSearch 3.0+, faiss engine only) + "remote_index_build", + "remote_build_size_min", + "remote_build_s3_endpoint", + "remote_build_s3_bucket", + "remote_build_s3_prefix", + "remote_build_s3_access_key", + "remote_build_s3_secret_key", + "remote_build_timeout", + ) + conn_kwargs = {k: kwargs[k] for k in _conn_keys if k in kwargs} + + benchmark_configs: List[BenchmarkConfig] = [] + + for algo_file in algo_files: + algo_yaml = _load_yaml(algo_file) + algo_name = algo_yaml.get("name", "") + if allowed_algos and algo_name not in allowed_algos: + continue + + groups: Dict[str, Any] = algo_yaml.get("groups", {}) + if allowed_groups: + groups = {k: v for k, v in groups.items() if k in allowed_groups} + + for group_name, group_conf in groups.items(): + build_spec: Dict[str, List] = group_conf.get("build", {}) + search_spec: Dict[str, List] = group_conf.get("search", {}) + + build_keys = list(build_spec.keys()) + build_combos = ( + list(itertools.product(*build_spec.values())) + if build_spec + else [()] + ) + + search_keys = list(search_spec.keys()) + search_combos = ( + list(itertools.product(*search_spec.values())) + if search_spec + else [()] + ) + search_params_list = [ + dict(zip(search_keys, vals)) for vals in search_combos + ] + + for build_vals in build_combos: + build_param = dict(zip(build_keys, build_vals)) + + # Human-readable index label + prefix = ( + f"{algo_name}_{group_name}" + if group_name != "base" + else algo_name + ) + parts = [prefix] + [f"{k}{v}" for k, v in build_param.items()] + index_label = ".".join(parts) + + # OpenSearch index names must be lowercase with no dots + os_index_name = index_label.replace(".", "_").lower() + index_file = os.path.join( + dataset_path, dataset, "index", index_label + ) + + index_cfg = IndexConfig( + name=index_label, + algo=algo_name, + build_param=build_param, + search_params=search_params_list, + file=index_file, + ) + + engine = "nmslib" + if "faiss" in algo_name: + engine = "faiss" + elif "lucene" in algo_name: + engine = "lucene" + + backend_cfg: Dict[str, Any] = { + "name": index_label, + "host": host, + "port": port, + "index_name": os_index_name, + "engine": engine, + "algo": algo_name, + "requires_network": True, + **conn_kwargs, + } + + benchmark_configs.append( + BenchmarkConfig( + indexes=[index_cfg], + backend_config=backend_cfg, + ) + ) + + return dataset_config, benchmark_configs + + +# Mapping from cuvs-bench distance metric names to OpenSearch space_type +_DISTANCE_TO_SPACE_TYPE: Dict[str, str] = { + "euclidean": "l2", + "l2": "l2", + "inner_product": "innerproduct", + "innerproduct": "innerproduct", + "cosine": "cosinesimil", + "cosinesimil": "cosinesimil", + "angular": "cosinesimil", +} + + +class OpenSearchBackend(BenchmarkBackend): + """ + Benchmark backend for OpenSearch's k-NN plugin. + + Supports the nmslib (HNSW), faiss (HNSW / IVF), and lucene (HNSW) + engines. Vectors are bulk-indexed as ``knn_vector`` fields and retrieved + via the standard ``knn`` query type. + + Requires ``opensearch-py`` Python package. + + Parameters + ---------- + config : Dict[str, Any] + Backend configuration produced by :class:`OpenSearchConfigLoader`. + Recognized keys: + + Required: + - ``name`` – index label (e.g. ``"opensearch_hnsw.m16.ef_construction100"``) + - ``index_name`` – OpenSearch index name (lowercase, no dots) + - ``engine`` – ``"nmslib"``, ``"faiss"``, or ``"lucene"`` + - ``algo`` – algorithm name (e.g. ``"opensearch_hnsw"``) + + Optional: + - ``host`` – hostname (default: ``"localhost"``) + - ``port`` – port (default: ``9200``) + - ``username`` – HTTP basic auth user (default: ``"admin"``) + - ``password`` – HTTP basic auth password (default: ``"admin"``) + - ``use_ssl`` – use HTTPS (default: ``False``) + - ``verify_certs`` – verify SSL certs (default: ``False``) + - ``bulk_batch_size`` – vectors per bulk request (default: ``500``) + - ``requires_network`` – trigger network pre-flight check (default: ``True``) + - ``remote_index_build`` – set ``index.knn.remote_index_build.enabled=true`` + on the index at creation time, opting it into the GPU build path (default: ``False``). + - ``remote_build_size_min`` – minimum segment size to trigger GPU build, e.g. ``"1kb"`` + (default: ``"1kb"`` when ``remote_index_build=True``) + - ``remote_build_s3_endpoint`` – S3 endpoint URL for build polling + - ``remote_build_s3_bucket`` – bucket name (default: ``"opensearch-vectors"``) + - ``remote_build_s3_prefix`` – key prefix to scan for ``.faiss`` files (default: ``"knn-indexes/"``) + - ``remote_build_s3_access_key`` – S3 access key + - ``remote_build_s3_secret_key`` – S3 secret key + - ``remote_build_timeout`` – seconds to wait for GPU build (default: ``600``) + + """ + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.__client = None + + @property + def algo(self) -> str: + return self.config.get("algo", "opensearch") + + @property + def _client(self): + if self.__client is None: + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise ImportError( + "`opensearch-py` is required for the OpenSearch backend in cuvs-bench.\n\n" + "Install it with: `pip install opensearch-py`" + ) from e + + host = self.config.get("host", "localhost") + port = self.config.get("port", 9200) + username = self.config.get("username", "admin") + password = self.config.get("password", "admin") + use_ssl = self.config.get("use_ssl", False) + verify_certs = self.config.get("verify_certs", False) + + self.__client = OpenSearch( + hosts=[{"host": host, "port": port}], + http_auth=(username, password) if username else None, + use_ssl=use_ssl, + verify_certs=verify_certs, + ) + return self.__client + + def _check_network_available(self) -> bool: + try: + self._client.cluster.health(request_timeout=5) + return True + except ImportError: + return False + except Exception: + return False + + def initialize(self) -> None: + """Eagerly open the connection to OpenSearch.""" + _ = self._client + + def cleanup(self) -> None: + """Close the OpenSearch connection.""" + if self.__client is not None: + self.__client.close() + self.__client = None + + def _build_index_mapping( + self, + dims: int, + engine: str, + space_type: str, + build_param: Dict[str, Any], + remote_index_build: bool = False, + remote_build_size_min: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Construct the OpenSearch index mapping dict for k-NN. + + The mapping enables the k-NN plugin and configures the vector field + with the chosen engine and method parameters. + + When ``remote_index_build=True``, ``index.knn.remote_index_build.enabled`` + is set to ``true`` in the index settings, opting qualifying segments into + the GPU build path. ``remote_build_size_min`` sets + ``index.knn.remote_index_build.size.min`` to control the minimum segment + size that triggers the GPU build; the default ``"1kb"`` ensures any + non-trivial segment is built remotely. The cluster-level infrastructure + is assumed to be pre-configured externally. + """ + m = build_param.get("m", 16) + ef_construction = build_param.get("ef_construction", 100) + + if engine in ("nmslib", "lucene"): + method_name = "hnsw" + method_params: Dict[str, Any] = { + "m": m, + "ef_construction": ef_construction, + } + elif engine == "faiss": + faiss_method = build_param.get("faiss_method", "hnsw") + method_name = faiss_method + if faiss_method == "hnsw": + method_params = { + "m": m, + "ef_construction": ef_construction, + } + elif faiss_method == "ivf": + method_params = {"nlist": build_param.get("nlist", 4)} + else: + raise ValueError( + f"Unsupported faiss_method {faiss_method!r}. " + "Use 'hnsw' or 'ivf'." + ) + else: + raise ValueError( + f"Unknown OpenSearch k-NN engine {engine!r}. " + "Use 'nmslib', 'faiss', or 'lucene'." + ) + + index_settings = { + "knn": True, + "number_of_shards": build_param.get("number_of_shards", 1), + "number_of_replicas": build_param.get("number_of_replicas", 0), + } + if remote_index_build: + if engine != "faiss": + raise ValueError( + "Remote Index Build only supports the faiss engine " + f"(got engine={engine!r}). " + "Use algorithms='opensearch_faiss_hnsw'." + ) + index_settings["knn.remote_index_build.enabled"] = True + index_settings["knn.remote_index_build.size.min"] = ( + remote_build_size_min or "1kb" + ) + + return { + "settings": {"index": index_settings}, + "mappings": { + "properties": { + "vector": { + "type": "knn_vector", + "dimension": dims, + "method": { + "name": method_name, + "space_type": space_type, + "engine": engine, + "parameters": method_params, + }, + } + } + }, + } + + def _bulk_index( + self, + index_name: str, + vectors: np.ndarray, + bulk_batch_size: int, + ) -> None: + """ + Bulk-index vectors into index_name using the helpers API. + + Vectors are stored under the ``"vector"`` field with their integer + row index as the document ``_id`` so they can be mapped back to + ground-truth neighbor lists. + """ + from opensearchpy.helpers import streaming_bulk + + def _doc_generator(): + for i, vec in enumerate(vectors): + yield { + "_index": index_name, + "_id": str(i), + "vector": vec.tolist(), + } + + total = vectors.shape[0] + indexed = 0 + for ok, info in streaming_bulk( + self._client, + _doc_generator(), + chunk_size=bulk_batch_size, + request_timeout=120, + ): + if not ok: + raise RuntimeError(f"Failed to index document: {info}") + indexed += 1 + if indexed % max(bulk_batch_size, 1000) == 0: + print(f" Indexed {indexed} / {total} vectors") + print(f" Indexed all {total} vectors") + + def _wait_for_remote_build( + self, + expected_count: int = 1, + timeout: int = 600, + poll_interval: int = 5, + ) -> None: + """ + Poll S3 for .faiss files confirming the GPU remote build completed. + + Raises ``TimeoutError`` if the expected number of files does not appear + within *timeout* seconds. + """ + try: + import boto3 + from botocore.config import Config as BotocoreConfig + except ImportError as exc: + raise ImportError( + "boto3 is required for remote index build polling. " + "Install it with: pip install boto3" + ) from exc + + s3_endpoint = self.config.get("remote_build_s3_endpoint") + s3_bucket = self.config.get("remote_build_s3_bucket", "opensearch-vectors") + s3_prefix = self.config.get("remote_build_s3_prefix", "knn-indexes/") + s3_access_key = self.config.get("remote_build_s3_access_key") + s3_secret_key = self.config.get("remote_build_s3_secret_key") + + s3 = boto3.client( + "s3", + endpoint_url=s3_endpoint, + aws_access_key_id=s3_access_key, + aws_secret_access_key=s3_secret_key, + region_name="us-east-1", + config=BotocoreConfig(signature_version="s3v4"), + ) + + deadline = time.time() + timeout + while time.time() < deadline: + try: + resp = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix) + faiss_files = [ + obj["Key"] + for obj in resp.get("Contents", []) + if obj["Key"].endswith(".faiss") + ] + if len(faiss_files) >= expected_count: + return + except Exception as exc: + print(f" S3 poll error: {exc}") + time.sleep(poll_interval) + + raise TimeoutError( + f"GPU build not confirmed after {timeout}s: " + f"expected {expected_count} .faiss file(s) in s3://{s3_bucket}/{s3_prefix}" + ) + + def build( + self, + dataset: Dataset, + indexes: List[IndexConfig], + force: bool = False, + dry_run: bool = False, + ) -> BuildResult: + """ + Build an OpenSearch k-NN index from the dataset's base vectors. + + Creates an index with k-NN plugin mapping and bulk-indexes all base + vectors. If the index already exists and ``force=False`` the build + is skipped. + + When ``remote_index_build=True`` the build time includes the full GPU + build flow: ingest → force merge → wait for ``.faiss`` files in S3. + + Parameters + ---------- + dataset : Dataset + Must have either non-empty ``base_vectors`` or a valid + ``base_file`` path. + indexes : List[IndexConfig] + The first element provides the build parameters. + force : bool + Delete and recreate the index if it already exists. + dry_run : bool + Log what would happen without making any changes. + + Returns + ------- + BuildResult + """ + skip = self._pre_flight_check() + if skip: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message=f"pre-flight check failed: {skip}", + ) + + if not indexes: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message="No indexes provided", + ) + + index_cfg = indexes[0] + build_param = index_cfg.build_param + index_name = self.config.get( + "index_name", index_cfg.name.replace(".", "_").lower() + ) + engine = self.config.get("engine", "nmslib") + bulk_batch_size = int(self.config.get("bulk_batch_size", 500)) + remote_index_build = bool(self.config.get("remote_index_build", False)) + remote_build_size_min = self.config.get("remote_build_size_min") + + if dry_run: + print(f"[dry_run] Would build OpenSearch index '{index_name}' " + f"(engine={engine}, remote_index_build={remote_index_build}, build_param={build_param})") + + return BuildResult( + index_path=index_name, + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_param, + success=True, + ) + + + # Handle existing index + if self._client.indices.exists(index=index_name): + if force: + self._client.indices.delete(index=index_name) + else: + return BuildResult( + index_path=index_name, + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_param, + success=True, + ) + + # Load base vectors (may be empty if only file path is provided) + base_vectors = dataset.base_vectors + subset_size = dataset.metadata.get("subset_size") + if base_vectors.size == 0 and dataset.base_file: + base_vectors = _load_vectors(dataset.base_file, subset_size) + + if base_vectors.size == 0: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_param, + success=False, + error_message=( + "No base vectors available. Provide dataset.base_vectors " + "or a valid dataset.base_file path." + ), + ) + + dims = base_vectors.shape[1] + space_type = _DISTANCE_TO_SPACE_TYPE.get( + dataset.distance_metric, "l2" + ) + + # Create index + mapping = self._build_index_mapping( + dims, engine, space_type, build_param, remote_index_build, remote_build_size_min + ) + self._client.indices.create(index=index_name, body=mapping) + + # Bulk index, then trigger and await the GPU build (if remote) + t0 = time.perf_counter() + self._bulk_index(index_name, base_vectors, bulk_batch_size) + self._client.indices.refresh(index=index_name, request_timeout=120) + if remote_index_build: + # Force-merge `index_name` to one segment, initiating the remote GPU build. + self._client.indices.forcemerge( + index=index_name, max_num_segments=1, request_timeout=300 + ) + self._wait_for_remote_build( + expected_count=1, + timeout=int(self.config.get("remote_build_timeout", 600)), + ) + build_time = time.perf_counter() - t0 + + # Index size + stats = self._client.indices.stats(index=index_name) + index_size_bytes = int( + stats["_all"]["total"]["store"]["size_in_bytes"] + ) + + return BuildResult( + index_path=index_name, + build_time_seconds=build_time, + index_size_bytes=index_size_bytes, + algorithm=self.algo, + build_params=build_param, + metadata={ + "engine": engine, + "space_type": space_type, + "remote_index_build": remote_index_build, + }, + success=True, + ) + + def search( + self, + dataset: Dataset, + indexes: List[IndexConfig], + k: int, + batch_size: int = 10000, + mode: str = "latency", + force: bool = False, + search_threads: Optional[int] = None, + dry_run: bool = False, + ) -> SearchResult: + """ + Search the OpenSearch k-NN index for nearest neighbors. + + Iterates over every search-parameter combination defined in the index + config, updating the index-level ``ef_search`` setting between runs. + Metrics (recall, QPS, latency) are collected per parameter set and + stored in ``SearchResult.metadata["per_search_param_results"]``. + + The *neighbors* and *distances* arrays in the returned result reflect + the **last** search-parameter combination (highest ef_search by + convention), while *recall* and *queries_per_second* are the averages + across all parameter combinations. + + Parameters + ---------- + dataset : Dataset + Must have either non-empty ``query_vectors`` or a valid + ``query_file`` path. Ground truth is loaded from + ``groundtruth_neighbors_file`` if available. + indexes : List[IndexConfig] + The first element is used; its ``search_params`` list defines the + ef_search values to sweep. + k : int + Number of neighbors to retrieve per query. + batch_size : int + Unused; included for interface compatibility. + mode : str + ``"latency"`` or ``"throughput"``; informational for this backend. + force : bool + Unused; included for interface compatibility. + search_threads : Optional[int] + Unused; OpenSearch manages its own threading. + dry_run : bool + Log what would happen without making any OpenSearch calls. + + Returns + ------- + SearchResult + """ + skip = self._pre_flight_check() + if skip: + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=[], + success=False, + error_message=f"pre-flight check failed: {skip}", + ) + + if not indexes: + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=[], + success=False, + error_message="No indexes provided", + ) + + index_cfg = indexes[0] + index_name = self.config.get( + "index_name", index_cfg.name.replace(".", "_").lower() + ) + engine = self.config.get("engine", "nmslib") + search_params_list = index_cfg.search_params or [{}] + + if dry_run: + print(f"[dry_run] Would search OpenSearch index '{index_name}' with {len(search_params_list)} param set(s) (k={k})") + + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=search_params_list, + success=True, + ) + + # Load query vectors and ground truth from files if not already loaded + query_vectors = dataset.query_vectors + if query_vectors.size == 0 and dataset.query_file: + query_vectors = _load_vectors(dataset.query_file) + + if query_vectors.size == 0: + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=search_params_list, + success=False, + error_message=( + "No query vectors available. Provide dataset.query_vectors " + "or a valid dataset.query_file path." + ), + ) + + groundtruth = dataset.groundtruth_neighbors + if groundtruth is None and dataset.groundtruth_neighbors_file: + groundtruth = _load_vectors(dataset.groundtruth_neighbors_file) + + n_queries = query_vectors.shape[0] + + # Run search for each search-parameter combination + per_param_results: List[Dict[str, Any]] = [] + last_neighbors = np.full((n_queries, k), -1, dtype=np.int64) + last_distances = np.zeros((n_queries, k), dtype=np.float32) + + for sp in search_params_list: + ef_search = sp.get("ef_search", 100) + + if engine in ("nmslib", "faiss"): + self._client.indices.put_settings( + index=index_name, + body={"index.knn.algo_param.ef_search": ef_search}, + ) + + neighbors = np.full((n_queries, k), -1, dtype=np.int64) + distances = np.zeros((n_queries, k), dtype=np.float32) + + t0 = time.perf_counter() + for i, q_vec in enumerate(query_vectors): + body: Dict[str, Any] = { + "size": k, + "query": { + "knn": { + "vector": { + "vector": q_vec.tolist(), + "k": k, + } + } + }, + } + resp = self._client.search(index=index_name, body=body) + hits = resp["hits"]["hits"] + for j, hit in enumerate(hits[:k]): + neighbors[i, j] = int(hit["_id"]) + distances[i, j] = float(hit["_score"]) + + elapsed = time.perf_counter() - t0 + qps = n_queries / elapsed if elapsed > 0 else 0.0 + + recall = 0.0 + if groundtruth is not None: + gt_k = min(k, groundtruth.shape[1]) + gt = groundtruth[:, :gt_k] + n_correct = sum( + len(set(neighbors[i, :k].tolist()) & set(gt[i].tolist())) + for i in range(n_queries) + ) + recall = n_correct / (n_queries * gt_k) + + per_param_results.append( + { + "search_params": sp, + "search_time_ms": elapsed * 1000.0, + "queries_per_second": qps, + "recall": recall, + } + ) + last_neighbors = neighbors + last_distances = distances + + # Aggregate across all search-param combinations + avg_recall = float( + np.mean([r["recall"] for r in per_param_results]) + ) + avg_qps = float( + np.mean([r["queries_per_second"] for r in per_param_results]) + ) + total_search_time_ms = float( + sum(r["search_time_ms"] for r in per_param_results) + ) + + return SearchResult( + neighbors=last_neighbors, + distances=last_distances, + search_time_ms=total_search_time_ms, + queries_per_second=avg_qps, + recall=avg_recall, + algorithm=self.algo, + search_params=search_params_list, + metadata={ + "engine": engine, + "per_search_param_results": per_param_results, + }, + success=True, + ) diff --git a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py index 5a1848294c..386bdd1166 100644 --- a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py +++ b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py @@ -158,6 +158,30 @@ "ef": {"type": "int", "min": 10, "max": 1000}, }, }, + # ========================================================================= + # OpenSearch HNSW (nmslib engine) + # ========================================================================= + "opensearch_hnsw": { + "build": { + "m": {"type": "int", "min": 4, "max": 64}, + "ef_construction": {"type": "int", "min": 32, "max": 1024}, + }, + "search": { + "ef_search": {"type": "int", "min": 10, "max": 2048}, + }, + }, + # ========================================================================= + # OpenSearch HNSW (faiss engine) + # ========================================================================= + "opensearch_faiss_hnsw": { + "build": { + "m": {"type": "int", "min": 4, "max": 64}, + "ef_construction": {"type": "int", "min": 32, "max": 1024}, + }, + "search": { + "ef_search": {"type": "int", "min": 10, "max": 2048}, + }, + }, } diff --git a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml new file mode 100644 index 0000000000..ea505fdd2b --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml @@ -0,0 +1,37 @@ +# OpenSearch k-NN benchmark configuration – faiss HNSW engine +# +# Uses OpenSearch's faiss engine with the HNSW method. Faiss HNSW typically +# offers faster build times and a smaller memory footprint than nmslib at +# comparable recall levels. +# +# Build parameters +# ---------------- +# m Number of bidirectional graph edges per node. +# ef_construction Candidate list size during construction. +# +# Search parameters +# ----------------- +# ef_search Candidate list size at query time (updated via index settings). + +name: opensearch_faiss_hnsw +groups: + base: + build: + m: [16, 32] + ef_construction: [100, 200, 512] + search: + ef_search: [50, 100, 200, 512] + + large: + build: + m: [32] + ef_construction: [200, 512] + search: + ef_search: [100, 200, 512, 1024] + + test: + build: + m: [16] + ef_construction: [100] + search: + ef_search: [50, 100] diff --git a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml new file mode 100644 index 0000000000..35ec2647a9 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml @@ -0,0 +1,41 @@ +# OpenSearch k-NN benchmark configuration – nmslib HNSW engine +# +# The nmslib engine implements HNSW and is the default k-NN backend in +# OpenSearch. Build parameters control graph connectivity and construction +# quality; search parameters control the ef_search sweep used at query time. +# +# Build parameters +# ---------------- +# m Number of bidirectional graph edges per node (higher = better +# recall, larger index, slower build). Analogous to M in hnswlib. +# ef_construction Size of the candidate list during graph construction (higher = +# better recall, slower build). +# +# Search parameters +# ----------------- +# ef_search Size of the candidate list at query time. Updated between +# search-parameter sweeps via index settings. Higher values +# improve recall at the cost of latency. + +name: opensearch_hnsw +groups: + base: + build: + m: [16, 32, 64] + ef_construction: [100, 200, 512] + search: + ef_search: [50, 100, 200, 512, 1024] + + large: + build: + m: [32, 64] + ef_construction: [200, 512] + search: + ef_search: [100, 200, 512, 1024] + + test: + build: + m: [16] + ef_construction: [100] + search: + ef_search: [50, 100] diff --git a/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py b/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py index c6ad2db7f1..cc170e7675 100644 --- a/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py +++ b/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py @@ -11,6 +11,7 @@ register_config_loader, get_config_loader, ) +from ..backends.opensearch import OpenSearchConfigLoader __all__ = [ # Main orchestrator @@ -21,6 +22,7 @@ "BenchmarkConfig", "DatasetConfig", "CppGBenchConfigLoader", + "OpenSearchConfigLoader", # Registry functions "get_backend_class", "list_backends", @@ -36,6 +38,7 @@ def _register_builtin_loaders(): """Register built-in config loaders.""" register_config_loader("cpp_gbench", CppGBenchConfigLoader) + register_config_loader("opensearch", OpenSearchConfigLoader) # Auto-register when module is imported diff --git a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py index 091290979c..0e98babfa2 100644 --- a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py +++ b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py @@ -222,43 +222,47 @@ def _run_sweep( for config in benchmark_configs: # Create backend instance backend = self.backend_class(config.backend_config) - - if build: - # Pass ALL indexes at once - ONE C++ command builds all - build_result = backend.build( - dataset=bench_dataset, - indexes=config.indexes, - force=force, - dry_run=dry_run, - ) - results.append(build_result) - - if not build_result.success: - print( - f"Build failed for {config.index_name}: {build_result.error_message}" + try: + backend.initialize() + + if build: + # Pass ALL indexes at once - ONE C++ command builds all + build_result = backend.build( + dataset=bench_dataset, + indexes=config.indexes, + force=force, + dry_run=dry_run, ) - continue - - if search: - # Pass ALL indexes at once - ONE C++ command searches all - # Each index has its own search_params list - # Total benchmarks = sum(len(idx.search_params) for idx in indexes) - search_result = backend.search( - dataset=bench_dataset, - indexes=config.indexes, - k=count, - batch_size=batch_size, - mode=search_mode, - force=force, - search_threads=search_threads, - dry_run=dry_run, - ) - results.append(search_result) - - if not search_result.success: - print( - f"Search failed for {config.index_name}: {search_result.error_message}" + results.append(build_result) + + if not build_result.success: + print( + f"Build failed for {config.index_name}: {build_result.error_message}" + ) + continue + + if search: + # Pass ALL indexes at once - ONE C++ command searches all + # Each index has its own search_params list + # Total benchmarks = sum(len(idx.search_params) for idx in indexes) + search_result = backend.search( + dataset=bench_dataset, + indexes=config.indexes, + k=count, + batch_size=batch_size, + mode=search_mode, + force=force, + search_threads=search_threads, + dry_run=dry_run, ) + results.append(search_result) + + if not search_result.success: + print( + f"Search failed for {config.index_name}: {search_result.error_message}" + ) + finally: + backend.cleanup() return results @@ -550,32 +554,36 @@ def _run_trial( "append_results": append_results, } backend = self.backend_class(backend_config) + try: + backend.initialize() - result = None + result = None - if build: - result = backend.build( - dataset=bench_dataset, - indexes=config.indexes, - force=force, - dry_run=dry_run, - ) - if not result.success: - return result + if build: + result = backend.build( + dataset=bench_dataset, + indexes=config.indexes, + force=force, + dry_run=dry_run, + ) + if not result.success: + return result - if search: - result = backend.search( - dataset=bench_dataset, - indexes=config.indexes, - k=count, - batch_size=batch_size, - mode=search_mode, - force=force, - search_threads=search_threads, - dry_run=dry_run, - ) + if search: + result = backend.search( + dataset=bench_dataset, + indexes=config.indexes, + k=count, + batch_size=batch_size, + mode=search_mode, + force=force, + search_threads=search_threads, + dry_run=dry_run, + ) - return result + return result + finally: + backend.cleanup() def _create_dataset(self, dataset_config: DatasetConfig) -> Dataset: """ diff --git a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py new file mode 100644 index 0000000000..f1ccc3882d --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py @@ -0,0 +1,314 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +""" +Tests for the OpenSearch benchmark backend. +""" + +import os +from urllib.parse import urlparse + +import numpy as np +import pytest +import requests + +pytest.importorskip("opensearchpy") + +from cuvs_bench.backends.base import Dataset +from cuvs_bench.backends.opensearch import OpenSearchBackend, OpenSearchConfigLoader +from cuvs_bench.orchestrator.config_loaders import IndexConfig + + + +def _make_backend(config_overrides: dict = None) -> OpenSearchBackend: + """Backend with no network requirement so pre-flight passes without a server.""" + config = { + "name": "test_index", + "index_name": "test_index", + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + **(config_overrides or {}), + } + return OpenSearchBackend(config) + + +def _make_dataset(n_base: int = 10, n_queries: int = 5, dims: int = 4, k: int = 10) -> Dataset: + rng = np.random.default_rng(0) + base = rng.random((n_base, dims)).astype(np.float32) + queries = rng.random((n_queries, dims)).astype(np.float32) + dists = np.sum((queries[:, None, :] - base[None, :, :]) ** 2, axis=2) + groundtruth = np.argsort(dists, axis=1)[:, :k].astype(np.int32) + return Dataset( + name="test", + base_vectors=base, + query_vectors=queries, + groundtruth_neighbors=groundtruth, + ) + + +def _make_index_cfg(search_params: list = None) -> IndexConfig: + return IndexConfig( + name="test_index", + algo="opensearch_faiss_hnsw", + build_param={"m": 16, "ef_construction": 100}, + search_params=search_params or [{"ef_search": 50}, {"ef_search": 100}], + file="", + ) + + +def _cleanup_backend(backend: OpenSearchBackend, index_name: str) -> None: + """Delete the test index and close the client connection.""" + try: + if backend._client.indices.exists(index=index_name): + backend._client.indices.delete(index=index_name) + except Exception: + pass + backend.cleanup() + + + +@pytest.fixture(scope="session") +def opensearch_url(): + """Skip integration tests when no live OpenSearch node is reachable.""" + url = os.environ.get("OPENSEARCH_URL", "http://localhost:9200") + try: + requests.get(f"{url}/_cluster/health", timeout=2).raise_for_status() + except Exception: + pytest.skip("no OpenSearch node reachable") + return url + + +@pytest.fixture +def live_backend(opensearch_url): + """Backend connected to a live OpenSearch node; cleans up the test index on teardown.""" + parsed = urlparse(opensearch_url) + index_name = "cuvs_test_index" + backend = OpenSearchBackend({ + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + }) + yield backend + _cleanup_backend(backend, index_name) + + +@pytest.fixture +def config_dir(tmp_path): + """Config directory with a minimal dataset and algo definition.""" + (tmp_path / "datasets").mkdir() + (tmp_path / "datasets" / "datasets.yaml").write_text( + "- name: test-ds\n distance: euclidean\n dims: 4\n" + ) + (tmp_path / "algos").mkdir() + (tmp_path / "algos" / "opensearch_faiss_hnsw.yaml").write_text( + "name: opensearch_faiss_hnsw\n" + "groups:\n" + " test:\n" + " build:\n" + " m: [16, 32]\n" + " ef_construction: [100, 200]\n" + " search:\n" + " ef_search: [50, 100]\n" + ) + return tmp_path + + + +class TestOpenSearchConfigLoader: + def test_load_produces_correct_configs(self, config_dir): + loader = OpenSearchConfigLoader() + loader._DEFAULT_CONFIG_PATH = str(config_dir) + + dataset_config, benchmark_configs = loader.load( + dataset="test-ds", dataset_path="/data", groups="test" + ) + + assert dataset_config.name == "test-ds" + assert dataset_config.distance == "euclidean" + # m=[16,32] x ef_construction=[100,200] = 4 build combos + assert len(benchmark_configs) == 4 + bc = benchmark_configs[0] + assert bc.backend_config["engine"] == "faiss" + assert len(bc.indexes[0].search_params) == 2 # ef_search: [50, 100] + + def test_load_forwards_remote_build_kwargs(self, config_dir): + loader = OpenSearchConfigLoader() + loader._DEFAULT_CONFIG_PATH = str(config_dir) + + _, configs = loader.load( + dataset="test-ds", + dataset_path="/data", + remote_index_build=True, + remote_build_s3_endpoint="http://s3:9000", + remote_build_s3_bucket="my-bucket", + remote_build_s3_access_key="mykey", + remote_build_s3_secret_key="mysecret", + ) + + bc = configs[0].backend_config + assert bc["remote_index_build"] is True + assert bc["remote_build_s3_endpoint"] == "http://s3:9000" + assert bc["remote_build_s3_bucket"] == "my-bucket" + assert bc["remote_build_s3_access_key"] == "mykey" + assert bc["remote_build_s3_secret_key"] == "mysecret" + + + +class TestOpenSearchBackend: + def test_build_dry_run(self): + result = _make_backend().build(_make_dataset(), [_make_index_cfg()], dry_run=True) + assert result.success + assert result.index_path == "test_index" + + def test_search_dry_run(self): + result = _make_backend().search(_make_dataset(), [_make_index_cfg()], k=3, dry_run=True) + assert result.success + assert len(result.search_params) == 2 + + def test_search_fails_without_query_vectors(self): + dataset = Dataset( + name="empty", + base_vectors=np.empty((0, 4), dtype=np.float32), + query_vectors=np.empty((0, 4), dtype=np.float32), + ) + result = _make_backend().search(dataset, [_make_index_cfg()], k=3) + assert not result.success + assert "No query vectors" in result.error_message + + + +@pytest.fixture(scope="session") +def remote_build_env(opensearch_url): + """ + Skip remote index build tests if required env vars aren't set or services aren't reachable. + + Required environment variables: + BUILDER_URL URL of the remote index builder service + S3_ENDPOINT S3-compatible object store endpoint URL + S3_BUCKET Bucket name used by OpenSearch for vector staging + S3_ACCESS_KEY S3 access key + S3_SECRET_KEY S3 secret key + """ + builder_url = os.environ.get("BUILDER_URL") + s3_endpoint = os.environ.get("S3_ENDPOINT") + s3_bucket = os.environ.get("S3_BUCKET") + s3_access_key = os.environ.get("S3_ACCESS_KEY") + s3_secret_key = os.environ.get("S3_SECRET_KEY") + + missing = [ + name for name, val in { + "BUILDER_URL": builder_url, "S3_ENDPOINT": s3_endpoint, + "S3_BUCKET": s3_bucket, "S3_ACCESS_KEY": s3_access_key, + "S3_SECRET_KEY": s3_secret_key, + }.items() if not val + ] + if missing: + pytest.skip(f"remote index build tests require env vars: {', '.join(missing)}") + + try: + requests.get(builder_url, timeout=2) + except requests.exceptions.ConnectionError: + pytest.skip(f"remote index builder not reachable at {builder_url}") + + try: + requests.get(s3_endpoint, timeout=2) + except requests.exceptions.ConnectionError: + pytest.skip(f"S3 endpoint not reachable at {s3_endpoint}") + + # Register the S3 snapshot repo and enable remote index build + session = requests.Session() + session.headers.update({"Content-Type": "application/json"}) + repo_name = "vector-repo" + + session.put( + f"{opensearch_url}/_snapshot/{repo_name}", + json={"type": "s3", "settings": {"bucket": s3_bucket, "base_path": "knn-indexes"}}, + ).raise_for_status() + + session.put( + f"{opensearch_url}/_cluster/settings", + json={"persistent": { + "knn.remote_index_build.enabled": True, + "knn.remote_index_build.repository": repo_name, + "knn.remote_index_build.service.endpoint": builder_url, + }}, + ).raise_for_status() + + return { + "s3_endpoint": s3_endpoint, + "s3_bucket": s3_bucket, + "s3_access_key": s3_access_key, + "s3_secret_key": s3_secret_key, + } + + +@pytest.fixture +def live_remote_build_backend(opensearch_url, remote_build_env): + """Backend with remote_index_build=True pointing at a configured remote build stack.""" + parsed = urlparse(opensearch_url) + index_name = "cuvs_test_remote_index" + backend = OpenSearchBackend({ + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + "remote_index_build": True, + "remote_build_s3_endpoint": remote_build_env["s3_endpoint"], + "remote_build_s3_bucket": remote_build_env["s3_bucket"], + "remote_build_s3_prefix": "knn-indexes/", + "remote_build_s3_access_key": remote_build_env["s3_access_key"], + "remote_build_s3_secret_key": remote_build_env["s3_secret_key"], + }) + yield backend + _cleanup_backend(backend, index_name) + + + +@pytest.mark.integration +class TestOpenSearchBackendIntegration: + def test_build_and_search(self, live_backend): + k = 3 + dataset = _make_dataset(n_base=100, n_queries=10, dims=4, k=k) + idx = _make_index_cfg(search_params=[{"ef_search": 50}]) + + build_result = live_backend.build(dataset, [idx], force=True) + assert build_result.success + assert build_result.build_time_seconds > 0 + assert build_result.index_size_bytes > 0 + + search_result = live_backend.search(dataset, [idx], k=k) + assert search_result.success + assert search_result.recall > 0 + assert search_result.queries_per_second > 0 + assert len(search_result.metadata["per_search_param_results"]) == 1 + + + +@pytest.mark.integration +class TestOpenSearchRemoteIndexBuildIntegration: + def test_remote_build_and_search(self, live_remote_build_backend): + k = 3 + dataset = _make_dataset(n_base=100, n_queries=10, dims=4, k=k) + idx = _make_index_cfg(search_params=[{"ef_search": 50}]) + + build_result = live_remote_build_backend.build(dataset, [idx], force=True) + assert build_result.success + assert build_result.build_time_seconds > 0 + assert build_result.metadata["remote_index_build"] is True + + search_result = live_remote_build_backend.search(dataset, [idx], k=k) + assert search_result.success + assert search_result.recall > 0 + assert search_result.queries_per_second > 0 diff --git a/python/cuvs_bench/pyproject.toml b/python/cuvs_bench/pyproject.toml index ed080c7b1f..bfd86cc35b 100644 --- a/python/cuvs_bench/pyproject.toml +++ b/python/cuvs_bench/pyproject.toml @@ -38,12 +38,20 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] +[project.optional-dependencies] +opensearch = ["opensearch-py>=2.4.0"] + [project.urls] Homepage = "https://github.com/rapidsai/cuvs" [tool.setuptools.package-data] "*" = ["*.*", "VERSION"] +[tool.pytest.ini_options] +markers = [ + "integration: tests that require a live OpenSearch node (run with '-m integration')", +] + [tool.isort] line_length = 79 multi_line_output = 3 From a58363fa24aefba7572dbebd6c383fe5420420c0 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 10 Apr 2026 14:13:36 -0500 Subject: [PATCH 2/5] Lint Signed-off-by: James Bourbeau --- .../cuvs_bench/backends/opensearch.py | 42 +++--- .../cuvs_bench/tests/test_opensearch.py | 121 ++++++++++-------- 2 files changed, 95 insertions(+), 68 deletions(-) diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py index 192f72705c..7f77945a3b 100644 --- a/python/cuvs_bench/cuvs_bench/backends/opensearch.py +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -29,9 +29,7 @@ ) -def _load_vectors( - path: str, subset_size: Optional[int] = None -) -> np.ndarray: +def _load_vectors(path: str, subset_size: Optional[int] = None) -> np.ndarray: """Read a binary vector file (.fbin, .ibin, .u8bin, ...). The file format is: 4-byte uint32 n_rows, 4-byte uint32 n_cols, @@ -239,7 +237,9 @@ def _resolve(rel: Optional[str]) -> Optional[str]: groups: Dict[str, Any] = algo_yaml.get("groups", {}) if allowed_groups: - groups = {k: v for k, v in groups.items() if k in allowed_groups} + groups = { + k: v for k, v in groups.items() if k in allowed_groups + } for group_name, group_conf in groups.items(): build_spec: Dict[str, List] = group_conf.get("build", {}) @@ -271,7 +271,9 @@ def _resolve(rel: Optional[str]) -> Optional[str]: if group_name != "base" else algo_name ) - parts = [prefix] + [f"{k}{v}" for k, v in build_param.items()] + parts = [prefix] + [ + f"{k}{v}" for k, v in build_param.items() + ] index_label = ".".join(parts) # OpenSearch index names must be lowercase with no dots @@ -572,7 +574,9 @@ def _wait_for_remote_build( ) from exc s3_endpoint = self.config.get("remote_build_s3_endpoint") - s3_bucket = self.config.get("remote_build_s3_bucket", "opensearch-vectors") + s3_bucket = self.config.get( + "remote_build_s3_bucket", "opensearch-vectors" + ) s3_prefix = self.config.get("remote_build_s3_prefix", "knn-indexes/") s3_access_key = self.config.get("remote_build_s3_access_key") s3_secret_key = self.config.get("remote_build_s3_secret_key") @@ -673,8 +677,10 @@ def build( remote_build_size_min = self.config.get("remote_build_size_min") if dry_run: - print(f"[dry_run] Would build OpenSearch index '{index_name}' " - f"(engine={engine}, remote_index_build={remote_index_build}, build_param={build_param})") + print( + f"[dry_run] Would build OpenSearch index '{index_name}' " + f"(engine={engine}, remote_index_build={remote_index_build}, build_param={build_param})" + ) return BuildResult( index_path=index_name, @@ -685,7 +691,6 @@ def build( success=True, ) - # Handle existing index if self._client.indices.exists(index=index_name): if force: @@ -721,13 +726,16 @@ def build( ) dims = base_vectors.shape[1] - space_type = _DISTANCE_TO_SPACE_TYPE.get( - dataset.distance_metric, "l2" - ) + space_type = _DISTANCE_TO_SPACE_TYPE.get(dataset.distance_metric, "l2") # Create index mapping = self._build_index_mapping( - dims, engine, space_type, build_param, remote_index_build, remote_build_size_min + dims, + engine, + space_type, + build_param, + remote_index_build, + remote_build_size_min, ) self._client.indices.create(index=index_name, body=mapping) @@ -851,7 +859,9 @@ def search( search_params_list = index_cfg.search_params or [{}] if dry_run: - print(f"[dry_run] Would search OpenSearch index '{index_name}' with {len(search_params_list)} param set(s) (k={k})") + print( + f"[dry_run] Would search OpenSearch index '{index_name}' with {len(search_params_list)} param set(s) (k={k})" + ) return SearchResult( neighbors=np.zeros((0, k), dtype=np.int64), @@ -952,9 +962,7 @@ def search( last_distances = distances # Aggregate across all search-param combinations - avg_recall = float( - np.mean([r["recall"] for r in per_param_results]) - ) + avg_recall = float(np.mean([r["recall"] for r in per_param_results])) avg_qps = float( np.mean([r["queries_per_second"] for r in per_param_results]) ) diff --git a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py index f1ccc3882d..9913317280 100644 --- a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py +++ b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py @@ -16,11 +16,13 @@ pytest.importorskip("opensearchpy") from cuvs_bench.backends.base import Dataset -from cuvs_bench.backends.opensearch import OpenSearchBackend, OpenSearchConfigLoader +from cuvs_bench.backends.opensearch import ( + OpenSearchBackend, + OpenSearchConfigLoader, +) from cuvs_bench.orchestrator.config_loaders import IndexConfig - def _make_backend(config_overrides: dict = None) -> OpenSearchBackend: """Backend with no network requirement so pre-flight passes without a server.""" config = { @@ -33,7 +35,9 @@ def _make_backend(config_overrides: dict = None) -> OpenSearchBackend: return OpenSearchBackend(config) -def _make_dataset(n_base: int = 10, n_queries: int = 5, dims: int = 4, k: int = 10) -> Dataset: +def _make_dataset( + n_base: int = 10, n_queries: int = 5, dims: int = 4, k: int = 10 +) -> Dataset: rng = np.random.default_rng(0) base = rng.random((n_base, dims)).astype(np.float32) queries = rng.random((n_queries, dims)).astype(np.float32) @@ -67,7 +71,6 @@ def _cleanup_backend(backend: OpenSearchBackend, index_name: str) -> None: backend.cleanup() - @pytest.fixture(scope="session") def opensearch_url(): """Skip integration tests when no live OpenSearch node is reachable.""" @@ -84,17 +87,19 @@ def live_backend(opensearch_url): """Backend connected to a live OpenSearch node; cleans up the test index on teardown.""" parsed = urlparse(opensearch_url) index_name = "cuvs_test_index" - backend = OpenSearchBackend({ - "name": index_name, - "index_name": index_name, - "engine": "faiss", - "algo": "opensearch_faiss_hnsw", - "host": parsed.hostname, - "port": parsed.port or 9200, - "use_ssl": parsed.scheme == "https", - "verify_certs": False, - "requires_network": True, - }) + backend = OpenSearchBackend( + { + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + } + ) yield backend _cleanup_backend(backend, index_name) @@ -120,7 +125,6 @@ def config_dir(tmp_path): return tmp_path - class TestOpenSearchConfigLoader: def test_load_produces_correct_configs(self, config_dir): loader = OpenSearchConfigLoader() @@ -160,15 +164,18 @@ def test_load_forwards_remote_build_kwargs(self, config_dir): assert bc["remote_build_s3_secret_key"] == "mysecret" - class TestOpenSearchBackend: def test_build_dry_run(self): - result = _make_backend().build(_make_dataset(), [_make_index_cfg()], dry_run=True) + result = _make_backend().build( + _make_dataset(), [_make_index_cfg()], dry_run=True + ) assert result.success assert result.index_path == "test_index" def test_search_dry_run(self): - result = _make_backend().search(_make_dataset(), [_make_index_cfg()], k=3, dry_run=True) + result = _make_backend().search( + _make_dataset(), [_make_index_cfg()], k=3, dry_run=True + ) assert result.success assert len(result.search_params) == 2 @@ -183,7 +190,6 @@ def test_search_fails_without_query_vectors(self): assert "No query vectors" in result.error_message - @pytest.fixture(scope="session") def remote_build_env(opensearch_url): """ @@ -203,14 +209,20 @@ def remote_build_env(opensearch_url): s3_secret_key = os.environ.get("S3_SECRET_KEY") missing = [ - name for name, val in { - "BUILDER_URL": builder_url, "S3_ENDPOINT": s3_endpoint, - "S3_BUCKET": s3_bucket, "S3_ACCESS_KEY": s3_access_key, + name + for name, val in { + "BUILDER_URL": builder_url, + "S3_ENDPOINT": s3_endpoint, + "S3_BUCKET": s3_bucket, + "S3_ACCESS_KEY": s3_access_key, "S3_SECRET_KEY": s3_secret_key, - }.items() if not val + }.items() + if not val ] if missing: - pytest.skip(f"remote index build tests require env vars: {', '.join(missing)}") + pytest.skip( + f"remote index build tests require env vars: {', '.join(missing)}" + ) try: requests.get(builder_url, timeout=2) @@ -229,16 +241,21 @@ def remote_build_env(opensearch_url): session.put( f"{opensearch_url}/_snapshot/{repo_name}", - json={"type": "s3", "settings": {"bucket": s3_bucket, "base_path": "knn-indexes"}}, + json={ + "type": "s3", + "settings": {"bucket": s3_bucket, "base_path": "knn-indexes"}, + }, ).raise_for_status() session.put( f"{opensearch_url}/_cluster/settings", - json={"persistent": { - "knn.remote_index_build.enabled": True, - "knn.remote_index_build.repository": repo_name, - "knn.remote_index_build.service.endpoint": builder_url, - }}, + json={ + "persistent": { + "knn.remote_index_build.enabled": True, + "knn.remote_index_build.repository": repo_name, + "knn.remote_index_build.service.endpoint": builder_url, + } + }, ).raise_for_status() return { @@ -254,28 +271,29 @@ def live_remote_build_backend(opensearch_url, remote_build_env): """Backend with remote_index_build=True pointing at a configured remote build stack.""" parsed = urlparse(opensearch_url) index_name = "cuvs_test_remote_index" - backend = OpenSearchBackend({ - "name": index_name, - "index_name": index_name, - "engine": "faiss", - "algo": "opensearch_faiss_hnsw", - "host": parsed.hostname, - "port": parsed.port or 9200, - "use_ssl": parsed.scheme == "https", - "verify_certs": False, - "requires_network": True, - "remote_index_build": True, - "remote_build_s3_endpoint": remote_build_env["s3_endpoint"], - "remote_build_s3_bucket": remote_build_env["s3_bucket"], - "remote_build_s3_prefix": "knn-indexes/", - "remote_build_s3_access_key": remote_build_env["s3_access_key"], - "remote_build_s3_secret_key": remote_build_env["s3_secret_key"], - }) + backend = OpenSearchBackend( + { + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + "remote_index_build": True, + "remote_build_s3_endpoint": remote_build_env["s3_endpoint"], + "remote_build_s3_bucket": remote_build_env["s3_bucket"], + "remote_build_s3_prefix": "knn-indexes/", + "remote_build_s3_access_key": remote_build_env["s3_access_key"], + "remote_build_s3_secret_key": remote_build_env["s3_secret_key"], + } + ) yield backend _cleanup_backend(backend, index_name) - @pytest.mark.integration class TestOpenSearchBackendIntegration: def test_build_and_search(self, live_backend): @@ -295,7 +313,6 @@ def test_build_and_search(self, live_backend): assert len(search_result.metadata["per_search_param_results"]) == 1 - @pytest.mark.integration class TestOpenSearchRemoteIndexBuildIntegration: def test_remote_build_and_search(self, live_remote_build_backend): @@ -303,7 +320,9 @@ def test_remote_build_and_search(self, live_remote_build_backend): dataset = _make_dataset(n_base=100, n_queries=10, dims=4, k=k) idx = _make_index_cfg(search_params=[{"ef_search": 50}]) - build_result = live_remote_build_backend.build(dataset, [idx], force=True) + build_result = live_remote_build_backend.build( + dataset, [idx], force=True + ) assert build_result.success assert build_result.build_time_seconds > 0 assert build_result.metadata["remote_index_build"] is True From ed7c1d0baebaab2f8e2c16e1782546b7e9921dec Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 13 Apr 2026 11:43:22 -0500 Subject: [PATCH 3/5] Updates Signed-off-by: James Bourbeau --- .../cuvs_bench/backends/opensearch.py | 61 ++++++++------ .../cuvs_bench/backends/search_spaces.py | 12 --- .../config/algos/opensearch_faiss_hnsw.yaml | 4 +- .../config/algos/opensearch_hnsw.yaml | 41 ---------- .../cuvs_bench/tests/test_opensearch.py | 81 ++++++++++++------- 5 files changed, 88 insertions(+), 111 deletions(-) delete mode 100644 python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py index 7f77945a3b..2e6083f294 100644 --- a/python/cuvs_bench/cuvs_bench/backends/opensearch.py +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -4,8 +4,8 @@ # """ -OpenSearch benchmark backend for cuvs-bench supporting nmslib (HNSW), faiss, -and lucene engines for approximate nearest-neighbor search. +OpenSearch benchmark backend for cuvs-bench supporting faiss and lucene +engines for approximate nearest-neighbor search. It also supports the remote index build service (OpenSearch 3.0+), which offloads Faiss HNSW graph construction to a GPU-accelerated external service. @@ -15,7 +15,7 @@ import itertools import os import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import yaml @@ -107,9 +107,23 @@ class OpenSearchConfigLoader(ConfigLoader): register_config_loader("opensearch", OpenSearchConfigLoader) """ - _DEFAULT_CONFIG_PATH: str = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "../config" - ) + def __init__(self, config_path: Optional[Union[str, os.PathLike]] = None): + """ + Initialize the config loader. + + Parameters + ---------- + config_path : Optional[Union[str, os.PathLike]] + Path to the config directory. If None, uses the default path + bundled with cuvs-bench. + """ + self.config_path = ( + os.fspath(config_path) + if config_path is not None + else os.path.join( + os.path.dirname(os.path.realpath(__file__)), "../config" + ) + ) @property def backend_type(self) -> str: @@ -134,7 +148,7 @@ def load( dataset_path : str Root directory where dataset files live. algorithms : Optional[str] - Comma-separated algorithm names to run (e.g. ``"opensearch_hnsw"``). + Comma-separated algorithm names to run (e.g. ``"opensearch_faiss_hnsw"``). If *None*, all opensearch-prefixed algorithms are included. count : int Number of neighbors *k* (informational for this loader). @@ -167,10 +181,8 @@ def load( ------- Tuple[DatasetConfig, List[BenchmarkConfig]] """ - config_path = self._DEFAULT_CONFIG_PATH - ds_yaml = kwargs.get("dataset_configuration") or os.path.join( - config_path, "datasets", "datasets.yaml" + self.config_path, "datasets", "datasets.yaml" ) all_ds = _load_yaml(ds_yaml) ds_conf = _get_dataset_conf(dataset, all_ds) @@ -193,7 +205,7 @@ def _resolve(rel: Optional[str]) -> Optional[str]: ) algo_files = _gather_algo_configs( - config_path, kwargs.get("algorithm_configuration") + self.config_path, kwargs.get("algorithm_configuration") ) allowed_algos = ( @@ -290,11 +302,9 @@ def _resolve(rel: Optional[str]) -> Optional[str]: file=index_file, ) - engine = "nmslib" + engine = "lucene" if "faiss" in algo_name: engine = "faiss" - elif "lucene" in algo_name: - engine = "lucene" backend_cfg: Dict[str, Any] = { "name": index_label, @@ -333,8 +343,7 @@ class OpenSearchBackend(BenchmarkBackend): """ Benchmark backend for OpenSearch's k-NN plugin. - Supports the nmslib (HNSW), faiss (HNSW / IVF), and lucene (HNSW) - engines. Vectors are bulk-indexed as ``knn_vector`` fields and retrieved + Supports the faiss (HNSW / IVF) and lucene (HNSW) engines. Vectors are bulk-indexed as ``knn_vector`` fields and retrieved via the standard ``knn`` query type. Requires ``opensearch-py`` Python package. @@ -346,10 +355,10 @@ class OpenSearchBackend(BenchmarkBackend): Recognized keys: Required: - - ``name`` – index label (e.g. ``"opensearch_hnsw.m16.ef_construction100"``) + - ``name`` – index label (e.g. ``"opensearch_faiss_hnsw.m16.ef_construction100"``) - ``index_name`` – OpenSearch index name (lowercase, no dots) - - ``engine`` – ``"nmslib"``, ``"faiss"``, or ``"lucene"`` - - ``algo`` – algorithm name (e.g. ``"opensearch_hnsw"``) + - ``engine`` – ``"faiss"`` or ``"lucene"`` + - ``algo`` – algorithm name (e.g. ``"opensearch_faiss_hnsw"``) Optional: - ``host`` – hostname (default: ``"localhost"``) @@ -443,16 +452,16 @@ def _build_index_mapping( When ``remote_index_build=True``, ``index.knn.remote_index_build.enabled`` is set to ``true`` in the index settings, opting qualifying segments into - the GPU build path. ``remote_build_size_min`` sets + the GPU build path. ``remote_build_size_min`` sets ``index.knn.remote_index_build.size.min`` to control the minimum segment size that triggers the GPU build; the default ``"1kb"`` ensures any - non-trivial segment is built remotely. The cluster-level infrastructure + non-trivial segment is built remotely. The cluster-level infrastructure is assumed to be pre-configured externally. """ m = build_param.get("m", 16) ef_construction = build_param.get("ef_construction", 100) - if engine in ("nmslib", "lucene"): + if engine == "lucene": method_name = "hnsw" method_params: Dict[str, Any] = { "m": m, @@ -476,7 +485,7 @@ def _build_index_mapping( else: raise ValueError( f"Unknown OpenSearch k-NN engine {engine!r}. " - "Use 'nmslib', 'faiss', or 'lucene'." + "Use 'faiss' or 'lucene'." ) index_settings = { @@ -671,7 +680,7 @@ def build( index_name = self.config.get( "index_name", index_cfg.name.replace(".", "_").lower() ) - engine = self.config.get("engine", "nmslib") + engine = self.config.get("engine", "lucene") bulk_batch_size = int(self.config.get("bulk_batch_size", 500)) remote_index_build = bool(self.config.get("remote_index_build", False)) remote_build_size_min = self.config.get("remote_build_size_min") @@ -855,7 +864,7 @@ def search( index_name = self.config.get( "index_name", index_cfg.name.replace(".", "_").lower() ) - engine = self.config.get("engine", "nmslib") + engine = self.config.get("engine", "lucene") search_params_list = index_cfg.search_params or [{}] if dry_run: @@ -909,7 +918,7 @@ def search( for sp in search_params_list: ef_search = sp.get("ef_search", 100) - if engine in ("nmslib", "faiss"): + if engine == "faiss": self._client.indices.put_settings( index=index_name, body={"index.knn.algo_param.ef_search": ef_search}, diff --git a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py index 386bdd1166..bca48d792e 100644 --- a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py +++ b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py @@ -159,18 +159,6 @@ }, }, # ========================================================================= - # OpenSearch HNSW (nmslib engine) - # ========================================================================= - "opensearch_hnsw": { - "build": { - "m": {"type": "int", "min": 4, "max": 64}, - "ef_construction": {"type": "int", "min": 32, "max": 1024}, - }, - "search": { - "ef_search": {"type": "int", "min": 10, "max": 2048}, - }, - }, - # ========================================================================= # OpenSearch HNSW (faiss engine) # ========================================================================= "opensearch_faiss_hnsw": { diff --git a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml index ea505fdd2b..f5e1f83cd6 100644 --- a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml +++ b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml @@ -1,8 +1,6 @@ # OpenSearch k-NN benchmark configuration – faiss HNSW engine # -# Uses OpenSearch's faiss engine with the HNSW method. Faiss HNSW typically -# offers faster build times and a smaller memory footprint than nmslib at -# comparable recall levels. +# Uses OpenSearch's faiss engine with the HNSW method. # # Build parameters # ---------------- diff --git a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml deleted file mode 100644 index 35ec2647a9..0000000000 --- a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_hnsw.yaml +++ /dev/null @@ -1,41 +0,0 @@ -# OpenSearch k-NN benchmark configuration – nmslib HNSW engine -# -# The nmslib engine implements HNSW and is the default k-NN backend in -# OpenSearch. Build parameters control graph connectivity and construction -# quality; search parameters control the ef_search sweep used at query time. -# -# Build parameters -# ---------------- -# m Number of bidirectional graph edges per node (higher = better -# recall, larger index, slower build). Analogous to M in hnswlib. -# ef_construction Size of the candidate list during graph construction (higher = -# better recall, slower build). -# -# Search parameters -# ----------------- -# ef_search Size of the candidate list at query time. Updated between -# search-parameter sweeps via index settings. Higher values -# improve recall at the cost of latency. - -name: opensearch_hnsw -groups: - base: - build: - m: [16, 32, 64] - ef_construction: [100, 200, 512] - search: - ef_search: [50, 100, 200, 512, 1024] - - large: - build: - m: [32, 64] - ef_construction: [200, 512] - search: - ef_search: [100, 200, 512, 1024] - - test: - build: - m: [16] - ef_construction: [100] - search: - ef_search: [50, 100] diff --git a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py index 9913317280..7ce4f342ee 100644 --- a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py +++ b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py @@ -39,8 +39,8 @@ def _make_dataset( n_base: int = 10, n_queries: int = 5, dims: int = 4, k: int = 10 ) -> Dataset: rng = np.random.default_rng(0) - base = rng.random((n_base, dims)).astype(np.float32) - queries = rng.random((n_queries, dims)).astype(np.float32) + base = rng.random((n_base, dims), dtype=np.float32) + queries = rng.random((n_queries, dims), dtype=np.float32) dists = np.sum((queries[:, None, :] - base[None, :, :]) ** 2, axis=2) groundtruth = np.argsort(dists, axis=1)[:, :k].astype(np.int32) return Dataset( @@ -74,11 +74,11 @@ def _cleanup_backend(backend: OpenSearchBackend, index_name: str) -> None: @pytest.fixture(scope="session") def opensearch_url(): """Skip integration tests when no live OpenSearch node is reachable.""" - url = os.environ.get("OPENSEARCH_URL", "http://localhost:9200") + url = os.environ.get("OPENSEARCH_URL", "http://localhost:9200").rstrip("/") try: requests.get(f"{url}/_cluster/health", timeout=2).raise_for_status() except Exception: - pytest.skip("no OpenSearch node reachable") + pytest.skip("No OpenSearch node reachable") return url @@ -100,8 +100,10 @@ def live_backend(opensearch_url): "requires_network": True, } ) - yield backend - _cleanup_backend(backend, index_name) + try: + yield backend + finally: + _cleanup_backend(backend, index_name) @pytest.fixture @@ -109,27 +111,31 @@ def config_dir(tmp_path): """Config directory with a minimal dataset and algo definition.""" (tmp_path / "datasets").mkdir() (tmp_path / "datasets" / "datasets.yaml").write_text( - "- name: test-ds\n distance: euclidean\n dims: 4\n" + """\ +- name: test-ds + distance: euclidean + dims: 4 +""" ) (tmp_path / "algos").mkdir() (tmp_path / "algos" / "opensearch_faiss_hnsw.yaml").write_text( - "name: opensearch_faiss_hnsw\n" - "groups:\n" - " test:\n" - " build:\n" - " m: [16, 32]\n" - " ef_construction: [100, 200]\n" - " search:\n" - " ef_search: [50, 100]\n" + """\ +name: opensearch_faiss_hnsw +groups: + test: + build: + m: [16, 32] + ef_construction: [100, 200] + search: + ef_search: [50, 100] +""" ) return tmp_path class TestOpenSearchConfigLoader: def test_load_produces_correct_configs(self, config_dir): - loader = OpenSearchConfigLoader() - loader._DEFAULT_CONFIG_PATH = str(config_dir) - + loader = OpenSearchConfigLoader(config_path=config_dir) dataset_config, benchmark_configs = loader.load( dataset="test-ds", dataset_path="/data", groups="test" ) @@ -143,9 +149,7 @@ def test_load_produces_correct_configs(self, config_dir): assert len(bc.indexes[0].search_params) == 2 # ef_search: [50, 100] def test_load_forwards_remote_build_kwargs(self, config_dir): - loader = OpenSearchConfigLoader() - loader._DEFAULT_CONFIG_PATH = str(config_dir) - + loader = OpenSearchConfigLoader(config_path=config_dir) _, configs = loader.load( dataset="test-ds", dataset_path="/data", @@ -166,11 +170,12 @@ def test_load_forwards_remote_build_kwargs(self, config_dir): class TestOpenSearchBackend: def test_build_dry_run(self): - result = _make_backend().build( + backend = _make_backend() + result = backend.build( _make_dataset(), [_make_index_cfg()], dry_run=True ) assert result.success - assert result.index_path == "test_index" + assert result.index_path == backend.config["index_name"] def test_search_dry_run(self): result = _make_backend().search( @@ -179,6 +184,17 @@ def test_search_dry_run(self): assert result.success assert len(result.search_params) == 2 + def test_remote_build_requires_faiss_engine(self): + backend = _make_backend({"engine": "lucene"}) + with pytest.raises(ValueError, match="faiss engine"): + backend._build_index_mapping( + dims=4, + engine="lucene", + space_type="l2", + build_param={}, + remote_index_build=True, + ) + def test_search_fails_without_query_vectors(self): dataset = Dataset( name="empty", @@ -207,6 +223,7 @@ def remote_build_env(opensearch_url): s3_bucket = os.environ.get("S3_BUCKET") s3_access_key = os.environ.get("S3_ACCESS_KEY") s3_secret_key = os.environ.get("S3_SECRET_KEY") + s3_prefix = "knn-indexes/" missing = [ name @@ -221,13 +238,13 @@ def remote_build_env(opensearch_url): ] if missing: pytest.skip( - f"remote index build tests require env vars: {', '.join(missing)}" + f"Remote index build tests require environment variables: {', '.join(missing)}" ) try: requests.get(builder_url, timeout=2) except requests.exceptions.ConnectionError: - pytest.skip(f"remote index builder not reachable at {builder_url}") + pytest.skip(f"Remote index builder not reachable at {builder_url}") try: requests.get(s3_endpoint, timeout=2) @@ -243,7 +260,10 @@ def remote_build_env(opensearch_url): f"{opensearch_url}/_snapshot/{repo_name}", json={ "type": "s3", - "settings": {"bucket": s3_bucket, "base_path": "knn-indexes"}, + "settings": { + "bucket": s3_bucket, + "base_path": s3_prefix.rstrip("/"), + }, }, ).raise_for_status() @@ -261,6 +281,7 @@ def remote_build_env(opensearch_url): return { "s3_endpoint": s3_endpoint, "s3_bucket": s3_bucket, + "s3_prefix": s3_prefix, "s3_access_key": s3_access_key, "s3_secret_key": s3_secret_key, } @@ -285,13 +306,15 @@ def live_remote_build_backend(opensearch_url, remote_build_env): "remote_index_build": True, "remote_build_s3_endpoint": remote_build_env["s3_endpoint"], "remote_build_s3_bucket": remote_build_env["s3_bucket"], - "remote_build_s3_prefix": "knn-indexes/", + "remote_build_s3_prefix": remote_build_env["s3_prefix"], "remote_build_s3_access_key": remote_build_env["s3_access_key"], "remote_build_s3_secret_key": remote_build_env["s3_secret_key"], } ) - yield backend - _cleanup_backend(backend, index_name) + try: + yield backend + finally: + _cleanup_backend(backend, index_name) @pytest.mark.integration From 964a0df5a68b91565eb00a53533c5fe04b2a011d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Apr 2026 14:40:15 -0500 Subject: [PATCH 4/5] Updates Signed-off-by: James Bourbeau --- .../cuvs_bench/backends/opensearch.py | 126 +++++++++++++----- .../cuvs_bench/tests/test_opensearch.py | 27 ++-- 2 files changed, 111 insertions(+), 42 deletions(-) diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py index 2e6083f294..2349ab5057 100644 --- a/python/cuvs_bench/cuvs_bench/backends/opensearch.py +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -235,6 +235,7 @@ def _resolve(rel: Optional[str]) -> Optional[str]: "remote_build_s3_prefix", "remote_build_s3_access_key", "remote_build_s3_secret_key", + "remote_build_s3_session_token", "remote_build_timeout", ) conn_kwargs = {k: kwargs[k] for k in _conn_keys if k in kwargs} @@ -557,22 +558,13 @@ def _doc_generator(): if not ok: raise RuntimeError(f"Failed to index document: {info}") indexed += 1 - if indexed % max(bulk_batch_size, 1000) == 0: - print(f" Indexed {indexed} / {total} vectors") + milestone = max(total // 10, 1) + if indexed % milestone == 0: + print(f" Indexed {indexed} / {total} vectors ({100 * indexed // total}%)") print(f" Indexed all {total} vectors") - def _wait_for_remote_build( - self, - expected_count: int = 1, - timeout: int = 600, - poll_interval: int = 5, - ) -> None: - """ - Poll S3 for .faiss files confirming the GPU remote build completed. - - Raises ``TimeoutError`` if the expected number of files does not appear - within *timeout* seconds. - """ + def _make_s3_client(self): + """Create a boto3 S3 client from config. Returns (client, bucket, prefix).""" try: import boto3 from botocore.config import Config as BotocoreConfig @@ -582,27 +574,85 @@ def _wait_for_remote_build( "Install it with: pip install boto3" ) from exc - s3_endpoint = self.config.get("remote_build_s3_endpoint") - s3_bucket = self.config.get( - "remote_build_s3_bucket", "opensearch-vectors" - ) - s3_prefix = self.config.get("remote_build_s3_prefix", "knn-indexes/") - s3_access_key = self.config.get("remote_build_s3_access_key") - s3_secret_key = self.config.get("remote_build_s3_secret_key") - s3 = boto3.client( "s3", - endpoint_url=s3_endpoint, - aws_access_key_id=s3_access_key, - aws_secret_access_key=s3_secret_key, + endpoint_url=self.config.get("remote_build_s3_endpoint"), + aws_access_key_id=self.config.get("remote_build_s3_access_key"), + aws_secret_access_key=self.config.get("remote_build_s3_secret_key"), + aws_session_token=self.config.get("remote_build_s3_session_token"), region_name="us-east-1", config=BotocoreConfig(signature_version="s3v4"), ) + bucket = self.config.get("remote_build_s3_bucket", "opensearch-vectors") + prefix = self.config.get("remote_build_s3_prefix", "knn-indexes/") + return s3, bucket, prefix + + def _count_faiss_files(self) -> int: + """Return the number of .faiss files currently in S3 under the configured prefix.""" + s3, bucket, prefix = self._make_s3_client() + resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + return sum( + 1 for obj in resp.get("Contents", []) if obj["Key"].endswith(".faiss") + ) + + def _count_index_segments(self, index_name: str) -> int: + """Return the number of Lucene segments currently in the index.""" + resp = self._client.indices.segments(index=index_name) + total = 0 + for shard_group in resp.get("indices", {}).get(index_name, {}).get("shards", {}).values(): + for shard in shard_group: + total += len(shard.get("segments", {})) + return total + + def _wait_for_ingestion_builds( + self, + index_name: str, + timeout: int = 600, + poll_interval: int = 5, + ) -> None: + """ + Wait for all GPU builds triggered during ingestion to complete. + + After bulk indexing, OpenSearch may have flushed several segments and + dispatched a GPU build for each one. This method counts segments in + the index, then polls S3 until that many ``.faiss`` files exist — + confirming every ingestion-time GPU build has finished. + + Must be called *before* force-merge so the pre-merge `.faiss` count + is stable. + """ + segment_count = self._count_index_segments(index_name) + if segment_count == 0: + return + print( + f" Waiting for {segment_count} ingestion-time GPU build(s) to complete..." + ) + self._wait_for_remote_build( + expected_count=segment_count, + timeout=timeout, + poll_interval=poll_interval, + ) + print(f" All {segment_count} ingestion-time GPU build(s) confirmed in S3.") + + def _wait_for_remote_build( + self, + expected_count: int = 1, + timeout: int = 600, + poll_interval: int = 5, + ) -> None: + """ + Poll S3 until at least *expected_count* .faiss files exist under the + configured prefix. + + Raises ``TimeoutError`` if the expected number of files does not appear + within *timeout* seconds. + """ + s3, bucket, prefix = self._make_s3_client() deadline = time.time() + timeout while time.time() < deadline: try: - resp = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix) + resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) faiss_files = [ obj["Key"] for obj in resp.get("Contents", []) @@ -616,7 +666,7 @@ def _wait_for_remote_build( raise TimeoutError( f"GPU build not confirmed after {timeout}s: " - f"expected {expected_count} .faiss file(s) in s3://{s3_bucket}/{s3_prefix}" + f"expected {expected_count} .faiss file(s) in s3://{bucket}/{prefix}" ) def build( @@ -753,13 +803,23 @@ def build( self._bulk_index(index_name, base_vectors, bulk_batch_size) self._client.indices.refresh(index=index_name, request_timeout=120) if remote_index_build: - # Force-merge `index_name` to one segment, initiating the remote GPU build. - self._client.indices.forcemerge( - index=index_name, max_num_segments=1, request_timeout=300 - ) + remote_timeout = int(self.config.get("remote_build_timeout", 600)) + # Wait for every GPU build dispatched during ingestion to finish. + # Segments flushed mid-ingest each trigger their own GPU build, so + # all of those must complete before we proceed. + self._wait_for_ingestion_builds(index_name, timeout=remote_timeout) + # Snapshot .faiss count now that ingestion builds are done, then + # force-merge to one segment to trigger the final GPU build. + pre_merge_count = self._count_faiss_files() + + self._client.indices.forcemerge( + index=index_name, max_num_segments=1, request_timeout=300 + ) + + if remote_index_build: self._wait_for_remote_build( - expected_count=1, - timeout=int(self.config.get("remote_build_timeout", 600)), + expected_count=pre_merge_count + 1, + timeout=remote_timeout, ) build_time = time.perf_counter() - t0 diff --git a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py index 7ce4f342ee..2bc28557bd 100644 --- a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py +++ b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py @@ -213,23 +213,27 @@ def remote_build_env(opensearch_url): Required environment variables: BUILDER_URL URL of the remote index builder service - S3_ENDPOINT S3-compatible object store endpoint URL S3_BUCKET Bucket name used by OpenSearch for vector staging - S3_ACCESS_KEY S3 access key - S3_SECRET_KEY S3 secret key + S3_ACCESS_KEY S3 access key ID + S3_SECRET_KEY S3 secret access key + + Optional environment variables: + S3_ENDPOINT Custom S3 endpoint URL (omit to use real AWS S3) + S3_SESSION_TOKEN STS session token (required for temporary credentials) """ builder_url = os.environ.get("BUILDER_URL") - s3_endpoint = os.environ.get("S3_ENDPOINT") + # S3_ENDPOINT is optional — omit it (or leave unset) to use real AWS S3. + s3_endpoint = os.environ.get("S3_ENDPOINT") or None s3_bucket = os.environ.get("S3_BUCKET") s3_access_key = os.environ.get("S3_ACCESS_KEY") s3_secret_key = os.environ.get("S3_SECRET_KEY") + s3_session_token = os.environ.get("S3_SESSION_TOKEN") or None s3_prefix = "knn-indexes/" missing = [ name for name, val in { "BUILDER_URL": builder_url, - "S3_ENDPOINT": s3_endpoint, "S3_BUCKET": s3_bucket, "S3_ACCESS_KEY": s3_access_key, "S3_SECRET_KEY": s3_secret_key, @@ -246,10 +250,11 @@ def remote_build_env(opensearch_url): except requests.exceptions.ConnectionError: pytest.skip(f"Remote index builder not reachable at {builder_url}") - try: - requests.get(s3_endpoint, timeout=2) - except requests.exceptions.ConnectionError: - pytest.skip(f"S3 endpoint not reachable at {s3_endpoint}") + if s3_endpoint is not None: + try: + requests.get(s3_endpoint, timeout=2) + except requests.exceptions.ConnectionError: + pytest.skip(f"S3 endpoint not reachable at {s3_endpoint}") # Register the S3 snapshot repo and enable remote index build session = requests.Session() @@ -284,6 +289,7 @@ def remote_build_env(opensearch_url): "s3_prefix": s3_prefix, "s3_access_key": s3_access_key, "s3_secret_key": s3_secret_key, + "s3_session_token": s3_session_token, } @@ -309,6 +315,9 @@ def live_remote_build_backend(opensearch_url, remote_build_env): "remote_build_s3_prefix": remote_build_env["s3_prefix"], "remote_build_s3_access_key": remote_build_env["s3_access_key"], "remote_build_s3_secret_key": remote_build_env["s3_secret_key"], + "remote_build_s3_session_token": remote_build_env[ + "s3_session_token" + ], } ) try: From 887b2a3abcb63c873b6cdd40b31beddf03c6e9c4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Apr 2026 14:48:05 -0500 Subject: [PATCH 5/5] Lint Signed-off-by: James Bourbeau --- .../cuvs_bench/backends/opensearch.py | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py index 2349ab5057..b83359d2a0 100644 --- a/python/cuvs_bench/cuvs_bench/backends/opensearch.py +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -560,7 +560,9 @@ def _doc_generator(): indexed += 1 milestone = max(total // 10, 1) if indexed % milestone == 0: - print(f" Indexed {indexed} / {total} vectors ({100 * indexed // total}%)") + print( + f" Indexed {indexed} / {total} vectors ({100 * indexed // total}%)" + ) print(f" Indexed all {total} vectors") def _make_s3_client(self): @@ -578,12 +580,16 @@ def _make_s3_client(self): "s3", endpoint_url=self.config.get("remote_build_s3_endpoint"), aws_access_key_id=self.config.get("remote_build_s3_access_key"), - aws_secret_access_key=self.config.get("remote_build_s3_secret_key"), + aws_secret_access_key=self.config.get( + "remote_build_s3_secret_key" + ), aws_session_token=self.config.get("remote_build_s3_session_token"), region_name="us-east-1", config=BotocoreConfig(signature_version="s3v4"), ) - bucket = self.config.get("remote_build_s3_bucket", "opensearch-vectors") + bucket = self.config.get( + "remote_build_s3_bucket", "opensearch-vectors" + ) prefix = self.config.get("remote_build_s3_prefix", "knn-indexes/") return s3, bucket, prefix @@ -592,14 +598,21 @@ def _count_faiss_files(self) -> int: s3, bucket, prefix = self._make_s3_client() resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) return sum( - 1 for obj in resp.get("Contents", []) if obj["Key"].endswith(".faiss") + 1 + for obj in resp.get("Contents", []) + if obj["Key"].endswith(".faiss") ) def _count_index_segments(self, index_name: str) -> int: """Return the number of Lucene segments currently in the index.""" resp = self._client.indices.segments(index=index_name) total = 0 - for shard_group in resp.get("indices", {}).get(index_name, {}).get("shards", {}).values(): + for shard_group in ( + resp.get("indices", {}) + .get(index_name, {}) + .get("shards", {}) + .values() + ): for shard in shard_group: total += len(shard.get("segments", {})) return total @@ -632,7 +645,9 @@ def _wait_for_ingestion_builds( timeout=timeout, poll_interval=poll_interval, ) - print(f" All {segment_count} ingestion-time GPU build(s) confirmed in S3.") + print( + f" All {segment_count} ingestion-time GPU build(s) confirmed in S3." + ) def _wait_for_remote_build( self,