From e7786f28703da36b6718258ffb66319be5a2afe2 Mon Sep 17 00:00:00 2001 From: Denise2004 <3087753261@qq.com> Date: Tue, 2 Dec 2025 08:21:50 +0000 Subject: [PATCH] batch cli --- README.md | 122 ++++++++ .../backend/clients/elastic_cloud/cli.py | 288 ++++++++++++++++++ vectordb_bench/cli/batch_cli.py | 145 +++++++-- vectordb_bench/cli/cli.py | 30 +- vectordb_bench/cli/vectordbbench.py | 10 + 5 files changed, 572 insertions(+), 23 deletions(-) create mode 100644 vectordb_bench/backend/clients/elastic_cloud/cli.py diff --git a/README.md b/README.md index e0e1630f3..bd7568da1 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,96 @@ Options: --quantization-type TEXT which type of quantization to use valid values [fp32, fp16, bq] --help Show this message and exit. ``` +### Run Elastic Cloud from command line + +Elastic Cloud supports multiple index types: HNSW, HNSW_INT8, HNSW_INT4, and HNSW_BBQ. + +**Example: Run HNSW index test** + +```shell +vectordbbench elasticcloudhnsw --db-label elastic-cloud-test \ +--cloud-id --password '' \ +--m 16 --ef-construction 100 --num-candidates 100 \ +--case-type Performance768D1M --number-of-shards 1 \ +--number-of-replicas 0 --refresh-interval 30s +``` + +**Example: Run HNSW_INT8 index test** + +```shell +vectordbbench elasticcloudhnswint8 --db-label elastic-cloud-int8 \ +--cloud-id --password '' \ +--m 16 --ef-construction 200 --num-candidates 200 \ +--case-type Performance1536D50K --element-type float +``` + +**Example: Run HNSW_INT4 index test** + +```shell +vectordbbench elasticcloudhnswint4 --db-label elastic-cloud-int4 \ +--cloud-id --password '' \ +--m 16 --ef-construction 200 --num-candidates 200 \ +--case-type Performance768D10M --use-rescore --oversample-ratio 2.0 +``` + +**Example: Run HNSW_BBQ index test** + +```shell +vectordbbench elasticcloudhnswbbq --db-label elastic-cloud-bbq \ +--cloud-id --password '' \ +--m 16 --ef-construction 200 --num-candidates 200 \ +--case-type Performance1536D5M --use-routing --use-force-merge +``` + +**Example: Run Label Filter Performance test** + +```shell +vectordbbench elasticcloudhnsw --db-label elastic-cloud-label-filter \ +--cloud-id --password '' \ +--case-type LabelFilterPerformanceCase \ +--dataset-with-size-type "Medium OpenAI (1536dim, 500K)" \ +--label-percentage 0.001 \ +--m 16 --ef-construction 128 --num-candidates 100 \ +--num-concurrency 1,5 --number-of-shards 1 +``` + +To list all options for Elastic Cloud, execute `vectordbbench elasticcloudhnsw --help`. The following are Elastic Cloud-specific command-line options: + +```text +$ vectordbbench elasticcloudhnsw --help +Usage: vectordbbench elasticcloudhnsw [OPTIONS] + +Options: + # Connection + --cloud-id TEXT Elastic Cloud ID [required] + --password TEXT Elastic Cloud password [required] + + # HNSW Index Parameters + --m INTEGER HNSW M parameter [default: 16] + --ef-construction INTEGER HNSW efConstruction parameter [default: 100] + --num-candidates INTEGER Number of candidates for search [default: 100] + --element-type [float|byte] Element type for vectors (float: 4 bytes, byte: 1 byte) [default: float] + + # Index Configuration + --number-of-shards INTEGER Number of shards [default: 1] + --number-of-replicas INTEGER Number of replicas [default: 0] + --refresh-interval TEXT Index refresh interval [default: 30s] + --merge-max-thread-count INTEGER + Maximum thread count for merge [default: 8] + --use-force-merge BOOLEAN Whether to use force merge [default: True] + --use-routing BOOLEAN Whether to use routing [default: False] + --use-rescore BOOLEAN Whether to use rescore [default: False] + --oversample-ratio FLOAT Oversample ratio for rescore [default: 2.0] + + # Common Options + --case-type [CapacityDim128|CapacityDim960|Performance768D100M|...] + Case type + --db-label TEXT Db label, default: date in ISO format + --k INTEGER K value for number of nearest neighbors to search [default: 100] + --num-concurrency TEXT Comma-separated list of concurrency values [default: 1,5,10,20,30,40,60,80] + --help Show this message and exit. +``` + ### Run OceanBase from command line Execute tests for the index types: HNSW, HNSW_SQ, or HNSW_BQ. @@ -392,10 +482,23 @@ milvushnsw: ef_search: 128 drop_old: False load: False +elasticcloudhnsw: + db_label: elastic-cloud-hnsw + cloud_id: + password: + case_type: Performance768D1M + m: 16 + ef_construction: 100 + num_candidates: 100 + number_of_shards: 1 + number_of_replicas: 0 + refresh_interval: 30s + element_type: float ``` > Notes: > - Options passed on the command line will override the configuration file* > - Parameter names use an _ not - +> - For `LabelFilterPerformanceCase` and `NewIntFilterPerformanceCase`, you must specify `dataset_with_size_type` in addition to `case_type` #### Using a batch configuration file. @@ -430,10 +533,29 @@ milvushnsw: ef_search: 128 drop_old: False load: False +elasticcloudhnsw: + - db_label: elastic-cloud-hnsw-test-1 + cloud_id: + password: + case_type: Performance768D1M + m: 16 + ef_construction: 100 + num_candidates: 100 + - db_label: elastic-cloud-label-filter-0.1 + cloud_id: + password: + case_type: LabelFilterPerformanceCase + dataset_with_size_type: "Medium OpenAI (1536dim, 500K)" + label_percentage: 0.001 + m: 16 + ef_construction: 128 + num_candidates: 100 + num_concurrency: "1,5" ``` > Notes: > - Options can only be passed through configuration files > - Parameter names use an _ not - +> - For `LabelFilterPerformanceCase` and `NewIntFilterPerformanceCase`, you must specify `dataset_with_size_type` in addition to `case_type` How to use? ```shell diff --git a/vectordb_bench/backend/clients/elastic_cloud/cli.py b/vectordb_bench/backend/clients/elastic_cloud/cli.py new file mode 100644 index 000000000..55e521d8f --- /dev/null +++ b/vectordb_bench/backend/clients/elastic_cloud/cli.py @@ -0,0 +1,288 @@ +from typing import Annotated, TypedDict, Unpack + +import click +from pydantic import SecretStr + +from vectordb_bench.backend.clients import DB +from vectordb_bench.cli.cli import ( + CommonTypedDict, + cli, + click_parameter_decorators_from_typed_dict, + run, +) + +DBTYPE = DB.ElasticCloud + + +class ElasticCloudTypedDict(TypedDict): + cloud_id: Annotated[ + str, + click.option("--cloud-id", type=str, help="Elastic Cloud ID", required=True), + ] + password: Annotated[ + str, + click.option("--password", type=str, help="Elastic Cloud password", required=True), + ] + number_of_shards: Annotated[ + int, + click.option( + "--number-of-shards", + type=int, + help="Number of shards", + required=False, + default=1, + show_default=True, + ), + ] + number_of_replicas: Annotated[ + int, + click.option( + "--number-of-replicas", + type=int, + help="Number of replicas", + required=False, + default=0, + show_default=True, + ), + ] + refresh_interval: Annotated[ + str, + click.option( + "--refresh-interval", + type=str, + help="Index refresh interval", + required=False, + default="30s", + show_default=True, + ), + ] + merge_max_thread_count: Annotated[ + int, + click.option( + "--merge-max-thread-count", + type=int, + help="Maximum thread count for merge", + required=False, + default=8, + show_default=True, + ), + ] + use_force_merge: Annotated[ + bool, + click.option( + "--use-force-merge", + type=bool, + help="Whether to use force merge", + required=False, + default=True, + show_default=True, + ), + ] + use_routing: Annotated[ + bool, + click.option( + "--use-routing", + type=bool, + help="Whether to use routing", + required=False, + default=False, + show_default=True, + ), + ] + use_rescore: Annotated[ + bool, + click.option( + "--use-rescore", + type=bool, + help="Whether to use rescore", + required=False, + default=False, + show_default=True, + ), + ] + oversample_ratio: Annotated[ + float, + click.option( + "--oversample-ratio", + type=float, + help="Oversample ratio for rescore", + required=False, + default=2.0, + show_default=True, + ), + ] + + +class ElasticCloudHNSWTypedDict(CommonTypedDict, ElasticCloudTypedDict): + m: Annotated[ + int, + click.option( + "--m", + type=int, + help="HNSW M parameter", + required=False, + default=16, + show_default=True, + ), + ] + ef_construction: Annotated[ + int, + click.option( + "--ef-construction", + type=int, + help="HNSW efConstruction parameter", + required=False, + default=100, + show_default=True, + ), + ] + num_candidates: Annotated[ + int, + click.option( + "--num-candidates", + type=int, + help="Number of candidates for search", + required=False, + default=100, + show_default=True, + ), + ] + element_type: Annotated[ + str, + click.option( + "--element-type", + type=click.Choice(["float", "byte"], case_sensitive=False), + help="Element type for vectors (float: 4 bytes, byte: 1 byte)", + required=False, + default="float", + show_default=True, + ), + ] + + +@cli.command() +@click_parameter_decorators_from_typed_dict(ElasticCloudHNSWTypedDict) +def ElasticCloudHNSW(**parameters: Unpack[ElasticCloudHNSWTypedDict]): + from ..api import IndexType + from .config import ElasticCloudConfig, ElasticCloudIndexConfig, ESElementType + + run( + db=DBTYPE, + db_config=ElasticCloudConfig( + db_label=parameters["db_label"], + cloud_id=SecretStr(parameters["cloud_id"]), + password=SecretStr(parameters["password"]), + ), + db_case_config=ElasticCloudIndexConfig( + index=IndexType.ES_HNSW, + M=parameters["m"], + efConstruction=parameters["ef_construction"], + num_candidates=parameters["num_candidates"], + element_type=ESElementType(parameters["element_type"]), + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + refresh_interval=parameters["refresh_interval"], + merge_max_thread_count=parameters["merge_max_thread_count"], + use_force_merge=parameters["use_force_merge"], + use_routing=parameters["use_routing"], + use_rescore=parameters["use_rescore"], + oversample_ratio=parameters["oversample_ratio"], + ), + **parameters, + ) + + +@cli.command() +@click_parameter_decorators_from_typed_dict(ElasticCloudHNSWTypedDict) +def ElasticCloudHNSWInt8(**parameters: Unpack[ElasticCloudHNSWTypedDict]): + from ..api import IndexType + from .config import ElasticCloudConfig, ElasticCloudIndexConfig, ESElementType + + run( + db=DBTYPE, + db_config=ElasticCloudConfig( + db_label=parameters["db_label"], + cloud_id=SecretStr(parameters["cloud_id"]), + password=SecretStr(parameters["password"]), + ), + db_case_config=ElasticCloudIndexConfig( + index=IndexType.ES_HNSW_INT8, + M=parameters["m"], + efConstruction=parameters["ef_construction"], + num_candidates=parameters["num_candidates"], + element_type=ESElementType(parameters["element_type"]), + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + refresh_interval=parameters["refresh_interval"], + merge_max_thread_count=parameters["merge_max_thread_count"], + use_force_merge=parameters["use_force_merge"], + use_routing=parameters["use_routing"], + use_rescore=parameters["use_rescore"], + oversample_ratio=parameters["oversample_ratio"], + ), + **parameters, + ) + + +@cli.command() +@click_parameter_decorators_from_typed_dict(ElasticCloudHNSWTypedDict) +def ElasticCloudHNSWInt4(**parameters: Unpack[ElasticCloudHNSWTypedDict]): + from ..api import IndexType + from .config import ElasticCloudConfig, ElasticCloudIndexConfig, ESElementType + + run( + db=DBTYPE, + db_config=ElasticCloudConfig( + db_label=parameters["db_label"], + cloud_id=SecretStr(parameters["cloud_id"]), + password=SecretStr(parameters["password"]), + ), + db_case_config=ElasticCloudIndexConfig( + index=IndexType.ES_HNSW_INT4, + M=parameters["m"], + efConstruction=parameters["ef_construction"], + num_candidates=parameters["num_candidates"], + element_type=ESElementType(parameters["element_type"]), + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + refresh_interval=parameters["refresh_interval"], + merge_max_thread_count=parameters["merge_max_thread_count"], + use_force_merge=parameters["use_force_merge"], + use_routing=parameters["use_routing"], + use_rescore=parameters["use_rescore"], + oversample_ratio=parameters["oversample_ratio"], + ), + **parameters, + ) + + +@cli.command() +@click_parameter_decorators_from_typed_dict(ElasticCloudHNSWTypedDict) +def ElasticCloudHNSWBBQ(**parameters: Unpack[ElasticCloudHNSWTypedDict]): + from ..api import IndexType + from .config import ElasticCloudConfig, ElasticCloudIndexConfig, ESElementType + + run( + db=DBTYPE, + db_config=ElasticCloudConfig( + db_label=parameters["db_label"], + cloud_id=SecretStr(parameters["cloud_id"]), + password=SecretStr(parameters["password"]), + ), + db_case_config=ElasticCloudIndexConfig( + index=IndexType.ES_HNSW_BBQ, + M=parameters["m"], + efConstruction=parameters["ef_construction"], + num_candidates=parameters["num_candidates"], + element_type=ESElementType(parameters["element_type"]), + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + refresh_interval=parameters["refresh_interval"], + merge_max_thread_count=parameters["merge_max_thread_count"], + use_force_merge=parameters["use_force_merge"], + use_routing=parameters["use_routing"], + use_rescore=parameters["use_rescore"], + oversample_ratio=parameters["oversample_ratio"], + ), + **parameters, + ) diff --git a/vectordb_bench/cli/batch_cli.py b/vectordb_bench/cli/batch_cli.py index 5ac2b1cf1..9a1869e27 100644 --- a/vectordb_bench/cli/batch_cli.py +++ b/vectordb_bench/cli/batch_cli.py @@ -1,5 +1,4 @@ import logging -import time from collections.abc import MutableMapping from concurrent.futures import wait from pathlib import Path @@ -14,6 +13,7 @@ cli, click_parameter_decorators_from_typed_dict, ) +from ..models import TaskConfig log = logging.getLogger(__name__) @@ -95,27 +95,136 @@ def format_bool_option(opt_name: str, value: Any, skip: bool = False, raw_key: s return args_arr -@cli.command() -@click_parameter_decorators_from_typed_dict(BatchCliTypedDict) -def BatchCli(): - ctx = click.get_current_context() - batch_config = ctx.default_map +def build_task_from_config(cmd_name: str, config_dict: dict[str, Any]) -> TaskConfig | None: - runner = CliRunner() + collected_tasks = [] + original_run = None - args_arr = build_sub_cmd_args(batch_config) + try: + from ..interface import benchmark_runner + + original_run = benchmark_runner.run + + def collect_task_wrapper(tasks: list[TaskConfig], task_label: str | None = None): # noqa: ARG001 + collected_tasks.extend(tasks) + return True + + benchmark_runner.run = collect_task_wrapper + + # build CLI parameters + args = [cmd_name] + bool_options = { + "drop_old": True, + "load": True, + "search_serial": True, + "search_concurrent": True, + "dry_run": False, + "custom_dataset_use_shuffled": True, + "custom_dataset_with_gt": True, + } + + def format_option(key: str, value: Any): + opt_name = key.replace("_", "-") + + if key in bool_options: + return format_bool_option(opt_name, value, skip=False) + + if key.startswith("skip_"): + raw_key = key[5:] + raw_opt = raw_key.replace("_", "-") + return format_bool_option(raw_opt, value, skip=True, raw_key=raw_key) + + return [f"--{opt_name}", str(value)] + + def format_bool_option(opt_name: str, value: Any, skip: bool = False, raw_key: str | None = None): + if isinstance(value, bool): + if skip: + if bool_options.get(raw_key, False): + return [f"--skip-{opt_name}"] if value else [f"--{opt_name}"] + return [f"--{opt_name}", str(value)] + if value: + return [f"--{opt_name}"] + if bool_options.get(opt_name.replace("-", "_"), False): + return [f"--skip-{opt_name}"] + return [] + return [f"--{opt_name}", str(value)] + + for k, v in config_dict.items(): + args.extend(format_option(k, v)) + + # call CLI command (this will trigger collect_task_wrapper) + runner = CliRunner() + result = runner.invoke(cli, args, catch_exceptions=False) - for args in args_arr: - log.info(f"got batch config: {' '.join(args)}") + if result.exception: + log.error(f"Failed to build task for {cmd_name}: {result.exception}") + return None - for args in args_arr: - result = runner.invoke(cli, args) - time.sleep(5) + if collected_tasks: + return collected_tasks[0] + return None # noqa: TRY300 - from ..interface import global_result_future + except Exception: + log.exception("Error building task from config") + return None + finally: + if original_run is not None: + from ..interface import benchmark_runner - if global_result_future: - wait([global_result_future]) + benchmark_runner.run = original_run - if result.exception: - log.exception(f"failed to run sub command: {args[0]}", exc_info=result.exception) + +@cli.command() +@click_parameter_decorators_from_typed_dict(BatchCliTypedDict) +def BatchCli(): + ctx = click.get_current_context() + batch_config = ctx.default_map + + from ..interface import benchmark_runner, global_result_future + + # collect all tasks + all_tasks: list[TaskConfig] = [] + task_labels: set[str] = set() + + for cmd_name, cmd_config_list in batch_config.items(): + for config_dict in cmd_config_list: + log.info(f"Building task for {cmd_name} with config: {config_dict.get('task_label', 'N/A')}") + + # collect task_label from config + if "task_label" in config_dict: + task_labels.add(config_dict["task_label"]) + + # TaskConfig + task = build_task_from_config(cmd_name, config_dict) + if task: + all_tasks.append(task) + log.info(f"Successfully built task: {task.db.value} - {task.case_config.case_id.name}") + else: + log.warning(f"Failed to build task for {cmd_name}") + + if not all_tasks: + log.error("No tasks were built from the batch config file") + return + + if len(task_labels) == 1: + task_label = task_labels.pop() + log.info(f"Using shared task_label from config: {task_label}") + elif len(task_labels) > 1: + task_label = next(iter(task_labels)) + log.warning(f"Multiple task_labels found in config, using the first one: {task_label}") + else: + from datetime import datetime + + task_label = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + log.info(f"No task_label found in config, using generated one: {task_label}") + + log.info(f"Running {len(all_tasks)} tasks with shared task_label: {task_label}") + + benchmark_runner.run(all_tasks, task_label) + + if global_result_future: + log.info("Waiting for all tasks to complete...") + wait([global_result_future]) + log.info("All tasks completed successfully") + else: + log.warning("No global_result_future found, tasks may be running in background") diff --git a/vectordb_bench/cli/cli.py b/vectordb_bench/cli/cli.py index 12bb4be9b..d89069e79 100644 --- a/vectordb_bench/cli/cli.py +++ b/vectordb_bench/cli/cli.py @@ -608,22 +608,24 @@ class OceanBaseIVFTypedDict(TypedDict): def cli(): ... -def run( +def build_task( db: DB, db_config: DBConfig, db_case_config: DBCaseConfig, **parameters: Unpack[CommonTypedDict], -): - """Builds a single VectorDBBench Task and runs it, awaiting the task until finished. +) -> TaskConfig: + """Builds a single VectorDBBench Task without running it. Args: db (DB) db_config (DBConfig) db_case_config (DBCaseConfig) **parameters: expects keys from CommonTypedDict - """ - task = TaskConfig( + Returns: + TaskConfig: The created task configuration + """ + return TaskConfig( db=db, db_config=db_config, db_case_config=db_case_config, @@ -644,6 +646,24 @@ def run( parameters["search_concurrent"], ), ) + + +def run( + db: DB, + db_config: DBConfig, + db_case_config: DBCaseConfig, + **parameters: Unpack[CommonTypedDict], +): + """Builds a single VectorDBBench Task and runs it, awaiting the task until finished. + + Args: + db (DB) + db_config (DBConfig) + db_case_config (DBCaseConfig) + **parameters: expects keys from CommonTypedDict + """ + + task = build_task(db, db_config, db_case_config, **parameters) task_label = parameters["task_label"] log.info(f"Task:\n{pformat(task)}\n") diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index 1c8df6c19..8924f5007 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -4,6 +4,12 @@ from ..backend.clients.clickhouse.cli import Clickhouse from ..backend.clients.cockroachdb.cli import CockroachDB as CockroachDBCli from ..backend.clients.doris.cli import Doris +from ..backend.clients.elastic_cloud.cli import ( + ElasticCloudHNSW, + ElasticCloudHNSWBBQ, + ElasticCloudHNSWInt4, + ElasticCloudHNSWInt8, +) from ..backend.clients.hologres.cli import HologresHGraph from ..backend.clients.lancedb.cli import LanceDB from ..backend.clients.mariadb.cli import MariaDBHNSW @@ -53,6 +59,10 @@ cli.add_command(HologresHGraph) cli.add_command(QdrantCloud) cli.add_command(QdrantLocal) +cli.add_command(ElasticCloudHNSW) +cli.add_command(ElasticCloudHNSWInt8) +cli.add_command(ElasticCloudHNSWInt4) +cli.add_command(ElasticCloudHNSWBBQ) cli.add_command(BatchCli) cli.add_command(S3Vectors) cli.add_command(TencentElasticsearch)