forked from zilliztech/VectorDBBench
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_cli.py
More file actions
230 lines (182 loc) · 7.57 KB
/
batch_cli.py
File metadata and controls
230 lines (182 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import logging
from collections.abc import MutableMapping
from concurrent.futures import wait
from pathlib import Path
from typing import Annotated, Any, TypedDict
import click
from click.testing import CliRunner
from yaml import Loader, load
from .. import config
from ..cli.cli import (
cli,
click_parameter_decorators_from_typed_dict,
)
from ..models import TaskConfig
log = logging.getLogger(__name__)
def click_get_defaults_from_file(ctx, param, value): # noqa: ANN001, ARG001
if not value:
raise click.MissingParameter
path = Path(value)
input_file = path if path.exists() else Path(config.CONFIG_LOCAL_DIR, path)
try:
with input_file.open() as f:
_config: dict[str, list[dict[str, Any]]] = load(f.read(), Loader=Loader) # noqa: S506
ctx.default_map = _config
except Exception as e:
msg = f"Failed to load batch config file: {e}"
raise click.BadParameter(msg) from e
return value
class BatchCliTypedDict(TypedDict):
batch_config_file: Annotated[
bool,
click.option(
"--batch-config-file",
type=click.Path(),
callback=click_get_defaults_from_file,
is_eager=True,
expose_value=False,
help="Read batch configuration from yaml file",
),
]
def build_sub_cmd_args(batch_config: MutableMapping[str, Any] | None):
bool_options = {
"drop_old": True,
"load": True,
"search_serial": True,
"search_concurrent": True,
"dry_run": False,
"custom_dataset_use_shuffled": True,
"custom_dataset_with_gt": True,
}
def format_option(key: str, value: Any):
opt_name = key.replace("_", "-")
if key in bool_options:
return format_bool_option(opt_name, value, skip=False)
if key.startswith("skip_"):
raw_key = key[5:]
raw_opt = raw_key.replace("_", "-")
return format_bool_option(raw_opt, value, skip=True, raw_key=raw_key)
return [f"--{opt_name}", str(value)]
def format_bool_option(opt_name: str, value: Any, skip: bool = False, raw_key: str | None = None):
if isinstance(value, bool):
if skip:
if bool_options.get(raw_key, False):
return [f"--skip-{opt_name}"] if value else [f"--{opt_name}"]
return [f"--{opt_name}", str(value)]
if value:
return [f"--{opt_name}"]
if bool_options.get(opt_name.replace("-", "_"), False):
return [f"--skip-{opt_name}"]
return []
return [f"--{opt_name}", str(value)]
args_arr = []
for sub_cmd_key, sub_cmd_config_list in batch_config.items():
for sub_cmd_args in sub_cmd_config_list:
args = [sub_cmd_key]
for k, v in sub_cmd_args.items():
args.extend(format_option(k, v))
args_arr.append(args)
return args_arr
def build_task_from_config(cmd_name: str, config_dict: dict[str, Any]) -> TaskConfig | None:
collected_tasks = []
original_run = None
try:
from ..interface import benchmark_runner
original_run = benchmark_runner.run
def collect_task_wrapper(tasks: list[TaskConfig], task_label: str | None = None): # noqa: ARG001
collected_tasks.extend(tasks)
return True
benchmark_runner.run = collect_task_wrapper
# build CLI parameters
args = [cmd_name]
bool_options = {
"drop_old": True,
"load": True,
"search_serial": True,
"search_concurrent": True,
"dry_run": False,
"custom_dataset_use_shuffled": True,
"custom_dataset_with_gt": True,
}
def format_option(key: str, value: Any):
opt_name = key.replace("_", "-")
if key in bool_options:
return format_bool_option(opt_name, value, skip=False)
if key.startswith("skip_"):
raw_key = key[5:]
raw_opt = raw_key.replace("_", "-")
return format_bool_option(raw_opt, value, skip=True, raw_key=raw_key)
return [f"--{opt_name}", str(value)]
def format_bool_option(opt_name: str, value: Any, skip: bool = False, raw_key: str | None = None):
if isinstance(value, bool):
if skip:
if bool_options.get(raw_key, False):
return [f"--skip-{opt_name}"] if value else [f"--{opt_name}"]
return [f"--{opt_name}", str(value)]
if value:
return [f"--{opt_name}"]
if bool_options.get(opt_name.replace("-", "_"), False):
return [f"--skip-{opt_name}"]
return []
return [f"--{opt_name}", str(value)]
for k, v in config_dict.items():
args.extend(format_option(k, v))
# call CLI command (this will trigger collect_task_wrapper)
runner = CliRunner()
result = runner.invoke(cli, args, catch_exceptions=False)
if result.exception:
log.error(f"Failed to build task for {cmd_name}: {result.exception}")
return None
if collected_tasks:
return collected_tasks[0]
return None # noqa: TRY300
except Exception:
log.exception("Error building task from config")
return None
finally:
if original_run is not None:
from ..interface import benchmark_runner
benchmark_runner.run = original_run
@cli.command()
@click_parameter_decorators_from_typed_dict(BatchCliTypedDict)
def BatchCli():
ctx = click.get_current_context()
batch_config = ctx.default_map
from ..interface import benchmark_runner, global_result_future
# collect all tasks
all_tasks: list[TaskConfig] = []
task_labels: set[str] = set()
for cmd_name, cmd_config_list in batch_config.items():
for config_dict in cmd_config_list:
log.info(f"Building task for {cmd_name} with config: {config_dict.get('task_label', 'N/A')}")
# collect task_label from config
if "task_label" in config_dict:
task_labels.add(config_dict["task_label"])
# TaskConfig
task = build_task_from_config(cmd_name, config_dict)
if task:
all_tasks.append(task)
log.info(f"Successfully built task: {task.db.value} - {task.case_config.case_id.name}")
else:
log.warning(f"Failed to build task for {cmd_name}")
if not all_tasks:
log.error("No tasks were built from the batch config file")
return
if len(task_labels) == 1:
task_label = task_labels.pop()
log.info(f"Using shared task_label from config: {task_label}")
elif len(task_labels) > 1:
task_label = next(iter(task_labels))
log.warning(f"Multiple task_labels found in config, using the first one: {task_label}")
else:
from datetime import datetime
task_label = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
log.info(f"No task_label found in config, using generated one: {task_label}")
log.info(f"Running {len(all_tasks)} tasks with shared task_label: {task_label}")
benchmark_runner.run(all_tasks, task_label)
if global_result_future:
log.info("Waiting for all tasks to complete...")
wait([global_result_future])
log.info("All tasks completed successfully")
else:
log.warning("No global_result_future found, tasks may be running in background")