Skip to content

Commit 8a598d5

Browse files
feat: Add QPS-Latency tradeoff metrics for Streaming Tests
- Use HDR Histogram for memory-efficient latency tracking - Collect p99, p95, avg latency at each concurrency level per stage - Add Concurrent Search Performance UI with single-stage and multi-stage views - Closes #638
1 parent 45d1ff5 commit 8a598d5

7 files changed

Lines changed: 432 additions & 12 deletions

File tree

install/requirements_py3.11.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ pymilvus
2525
clickhouse_connect
2626
pyvespa
2727
mysql-connector-python
28-
packaging
28+
packaging
29+
hdrhistogram>=0.10.1

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies = [
4141
"scikit-learn",
4242
"pymilvus", # with pandas, numpy, ujson
4343
"ujson",
44+
"hdrhistogram>=0.10.1",
4445
]
4546
dynamic = ["version"]
4647

vectordb_bench/backend/runner/mp_runner.py

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from multiprocessing.queues import Queue
99

1010
import numpy as np
11+
from hdrh.histogram import HdrHistogram
1112

1213
from vectordb_bench.backend.filter import Filter, non_filter
1314

@@ -18,6 +19,12 @@
1819
NUM_PER_BATCH = config.NUM_PER_BATCH
1920
log = logging.getLogger(__name__)
2021

22+
# HDR Histogram constants
23+
HDR_HISTOGRAM_MIN_US = 1
24+
HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds
25+
HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy
26+
US_TO_SECONDS = 1_000_000
27+
2128

2229
class MultiProcessingSearchRunner:
2330
"""multiprocessing search runner
@@ -182,6 +189,31 @@ def run(self) -> float:
182189
def stop(self) -> None:
183190
pass
184191

192+
def _aggregate_latency_stats(self, res: list) -> tuple[float, float, float]:
193+
"""Aggregate latency stats from worker processes.
194+
195+
Returns:
196+
tuple: (p99, p95, avg) latencies in seconds
197+
"""
198+
latency_stats_list = [r[2] for r in res if r[2] and r[2].get("count", 0) > 0]
199+
200+
if not latency_stats_list:
201+
return 0, 0, 0
202+
203+
total_query_count = sum(stats["count"] for stats in latency_stats_list)
204+
205+
if total_query_count == 0:
206+
return 0, 0, 0
207+
208+
# Use max for conservative percentile estimate
209+
latency_p99 = max(stats["p99"] for stats in latency_stats_list)
210+
latency_p95 = max(stats["p95"] for stats in latency_stats_list)
211+
212+
# Weighted average
213+
latency_avg = sum(stats["avg"] * stats["count"] for stats in latency_stats_list) / total_query_count
214+
215+
return latency_p99, latency_p95, latency_avg
216+
185217
def run_by_dur(self, duration: int) -> tuple[float, float]:
186218
"""
187219
Returns:
@@ -190,13 +222,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]:
190222
"""
191223
return self._run_by_dur(duration)
192224

193-
def _run_by_dur(self, duration: int) -> tuple[float, float]:
225+
def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]:
194226
"""
195227
Returns:
196228
float: largest qps
197229
float: failed rate
230+
list: concurrency numbers
231+
list: qps values at each concurrency
232+
list: p99 latencies at each concurrency
233+
list: p95 latencies at each concurrency
234+
list: avg latencies at each concurrency
198235
"""
199236
max_qps = 0
237+
conc_num_list = []
238+
conc_qps_list = []
239+
conc_latency_p99_list = []
240+
conc_latency_p95_list = []
241+
conc_latency_avg_list = []
200242
try:
201243
for conc in self.concurrencies:
202244
with mp.Manager() as m:
@@ -226,9 +268,19 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
226268
cost = time.perf_counter() - start
227269

228270
qps = round(all_success_count / cost, 4)
271+
272+
latency_p99, latency_p95, latency_avg = self._aggregate_latency_stats(res)
273+
274+
conc_num_list.append(conc)
275+
conc_qps_list.append(qps)
276+
conc_latency_p99_list.append(latency_p99)
277+
conc_latency_p95_list.append(latency_p95)
278+
conc_latency_avg_list.append(latency_avg)
279+
229280
log.info(
230281
f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, "
231-
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
282+
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, "
283+
f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s",
232284
)
233285
if qps > max_qps:
234286
max_qps = qps
@@ -246,13 +298,24 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
246298
finally:
247299
self.stop()
248300

249-
return max_qps, failed_rate
301+
return (
302+
max_qps,
303+
failed_rate,
304+
conc_num_list,
305+
conc_qps_list,
306+
conc_latency_p99_list,
307+
conc_latency_p95_list,
308+
conc_latency_avg_list,
309+
)
250310

251-
def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
311+
def search_by_dur(
312+
self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition
313+
) -> tuple[int, int, dict]:
252314
"""
253315
Returns:
254316
int: successful requests count
255317
int: failed requests count
318+
dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram)
256319
"""
257320
# sync all process
258321
q.put(1)
@@ -263,6 +326,9 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
263326
self.db.prepare_filter(self.filters)
264327
num, idx = len(test_data), random.randint(0, len(test_data) - 1)
265328

329+
# Memory-efficient latency tracking
330+
histogram = HdrHistogram(HDR_HISTOGRAM_MIN_US, HDR_HISTOGRAM_MAX_US, HDR_HISTOGRAM_SIGNIFICANT_DIGITS)
331+
266332
start_time = time.perf_counter()
267333
success_count = 0
268334
failed_cnt = 0
@@ -271,6 +337,8 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
271337
try:
272338
self.db.search_embedding(test_data[idx], self.k)
273339
success_count += 1
340+
latency_us = int((time.perf_counter() - s) * US_TO_SECONDS)
341+
histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US))
274342
except Exception as e:
275343
failed_cnt += 1
276344
# reduce log
@@ -284,8 +352,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
284352

285353
if success_count % 500 == 0:
286354
log.debug(
287-
f"({mp.current_process().name:16}) search_count: {success_count}, "
288-
f"latest_latency={time.perf_counter()-s}",
355+
f"({mp.current_process().name:16}) search_count: {success_count}",
289356
)
290357

291358
total_dur = round(time.perf_counter() - start_time, 4)
@@ -295,4 +362,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
295362
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
296363
)
297364

298-
return success_count, failed_cnt
365+
# Pre-computed stats to avoid large data transfer
366+
latency_stats = {
367+
"p99": histogram.get_value_at_percentile(99) / US_TO_SECONDS,
368+
"p95": histogram.get_value_at_percentile(95) / US_TO_SECONDS,
369+
"avg": histogram.get_mean_value() / US_TO_SECONDS,
370+
"count": histogram.get_total_count(),
371+
}
372+
373+
return success_count, failed_cnt, latency_stats

vectordb_bench/backend/runner/read_write_runner.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,33 @@ def run_search(self, perc: int):
106106
log.info(
107107
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
108108
)
109-
max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
109+
result = self.run_by_dur(self.read_dur_after_write)
110+
max_qps = result[0]
111+
conc_failed_rate = result[1]
112+
conc_num_list = result[2]
113+
conc_qps_list = result[3]
114+
conc_latency_p99_list = result[4]
115+
conc_latency_p95_list = result[5]
116+
conc_latency_avg_list = result[6]
110117
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")
111118

112-
return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)]
119+
return [
120+
(
121+
perc,
122+
test_time,
123+
max_qps,
124+
recall,
125+
ndcg,
126+
p99_latency,
127+
p95_latency,
128+
conc_failed_rate,
129+
conc_num_list,
130+
conc_qps_list,
131+
conc_latency_p99_list,
132+
conc_latency_p95_list,
133+
conc_latency_avg_list,
134+
)
135+
]
113136

114137
def run_read_write(self) -> Metric:
115138
"""
@@ -160,6 +183,13 @@ def run_read_write(self) -> Metric:
160183
m.st_serial_latency_p95_list = [d[6] for d in r]
161184
m.st_conc_failed_rate_list = [d[7] for d in r]
162185

186+
# Extract concurrent latency data
187+
m.st_conc_num_list_list = [d[8] for d in r]
188+
m.st_conc_qps_list_list = [d[9] for d in r]
189+
m.st_conc_latency_p99_list_list = [d[10] for d in r]
190+
m.st_conc_latency_p95_list_list = [d[11] for d in r]
191+
m.st_conc_latency_avg_list_list = [d[12] for d in r]
192+
163193
except Exception as e:
164194
log.warning(f"Read and write error: {e}")
165195
executor.shutdown(wait=True, cancel_futures=True)
@@ -226,6 +256,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
226256
log.info(f"Insert {perc}% done, total batch={total_batch}")
227257
test_time = round(time.perf_counter(), 4)
228258
max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0
259+
conc_num_list, conc_qps_list = [], []
260+
conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], []
229261
try:
230262
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
231263
res, ssearch_dur = self.serial_search_runner.run()
@@ -246,12 +278,35 @@ def wait_next_target(start: int, target_batch: int) -> bool:
246278
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, "
247279
f"dur={each_conc_search_dur:.4f}"
248280
)
249-
max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
281+
conc_result = self.run_by_dur(each_conc_search_dur)
282+
max_qps = conc_result[0]
283+
conc_failed_rate = conc_result[1]
284+
conc_num_list = conc_result[2]
285+
conc_qps_list = conc_result[3]
286+
conc_latency_p99_list = conc_result[4]
287+
conc_latency_p95_list = conc_result[5]
288+
conc_latency_avg_list = conc_result[6]
250289
else:
251290
log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.")
252291
except Exception as e:
253292
log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
254-
result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate))
293+
result.append(
294+
(
295+
perc,
296+
test_time,
297+
max_qps,
298+
recall,
299+
ndcg,
300+
p99_latency,
301+
p95_latency,
302+
conc_failed_rate,
303+
conc_num_list,
304+
conc_qps_list,
305+
conc_latency_p99_list,
306+
conc_latency_p95_list,
307+
conc_latency_avg_list,
308+
)
309+
)
255310
start_batch = target_batch
256311

257312
# Drain the queue

0 commit comments

Comments
 (0)