Skip to content

Commit 6dddb5a

Browse files
feat: Add QPS-Latency tradeoff metrics for Streaming Tests
- Use HDR Histogram for memory-efficient latency tracking - Collect p99, p95, avg latency at each concurrency level per stage - Add Concurrent Search Performance UI with single-stage and multi-stage views - Closes #638
1 parent 45d1ff5 commit 6dddb5a

7 files changed

Lines changed: 417 additions & 12 deletions

File tree

install/requirements_py3.11.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ pymilvus
2525
clickhouse_connect
2626
pyvespa
2727
mysql-connector-python
28-
packaging
28+
packaging
29+
hdrhistogram>=0.10.1

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies = [
4141
"scikit-learn",
4242
"pymilvus", # with pandas, numpy, ujson
4343
"ujson",
44+
"hdrhistogram>=0.10.1",
4445
]
4546
dynamic = ["version"]
4647

vectordb_bench/backend/runner/mp_runner.py

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from multiprocessing.queues import Queue
99

1010
import numpy as np
11+
from hdrh.histogram import HdrHistogram
1112

1213
from vectordb_bench.backend.filter import Filter, non_filter
1314

@@ -18,6 +19,12 @@
1819
NUM_PER_BATCH = config.NUM_PER_BATCH
1920
log = logging.getLogger(__name__)
2021

22+
# HDR Histogram constants
23+
HDR_HISTOGRAM_MIN_US = 1
24+
HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds
25+
HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy
26+
US_TO_SECONDS = 1_000_000
27+
2128

2229
class MultiProcessingSearchRunner:
2330
"""multiprocessing search runner
@@ -190,13 +197,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]:
190197
"""
191198
return self._run_by_dur(duration)
192199

193-
def _run_by_dur(self, duration: int) -> tuple[float, float]:
200+
def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]:
194201
"""
195202
Returns:
196203
float: largest qps
197204
float: failed rate
205+
list: concurrency numbers
206+
list: qps values at each concurrency
207+
list: p99 latencies at each concurrency
208+
list: p95 latencies at each concurrency
209+
list: avg latencies at each concurrency
198210
"""
199211
max_qps = 0
212+
conc_num_list = []
213+
conc_qps_list = []
214+
conc_latency_p99_list = []
215+
conc_latency_p95_list = []
216+
conc_latency_avg_list = []
200217
try:
201218
for conc in self.concurrencies:
202219
with mp.Manager() as m:
@@ -226,9 +243,45 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
226243
cost = time.perf_counter() - start
227244

228245
qps = round(all_success_count / cost, 4)
246+
247+
# Aggregate latency stats from worker processes
248+
latency_stats_list = [
249+
r[2] for r in res
250+
if r[2] and r[2].get('count', 0) > 0
251+
]
252+
253+
if latency_stats_list:
254+
total_query_count = sum(stats['count'] for stats in latency_stats_list)
255+
256+
if total_query_count > 0:
257+
# Use max for conservative percentile estimate
258+
latency_p99 = max(stats['p99'] for stats in latency_stats_list)
259+
latency_p95 = max(stats['p95'] for stats in latency_stats_list)
260+
261+
# Weighted average
262+
latency_avg = sum(
263+
stats['avg'] * stats['count']
264+
for stats in latency_stats_list
265+
) / total_query_count
266+
else:
267+
latency_p99 = 0
268+
latency_p95 = 0
269+
latency_avg = 0
270+
else:
271+
latency_p99 = 0
272+
latency_p95 = 0
273+
latency_avg = 0
274+
275+
conc_num_list.append(conc)
276+
conc_qps_list.append(qps)
277+
conc_latency_p99_list.append(latency_p99)
278+
conc_latency_p95_list.append(latency_p95)
279+
conc_latency_avg_list.append(latency_avg)
280+
229281
log.info(
230282
f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, "
231-
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
283+
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, "
284+
f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s",
232285
)
233286
if qps > max_qps:
234287
max_qps = qps
@@ -246,13 +299,22 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
246299
finally:
247300
self.stop()
248301

249-
return max_qps, failed_rate
302+
return (
303+
max_qps,
304+
failed_rate,
305+
conc_num_list,
306+
conc_qps_list,
307+
conc_latency_p99_list,
308+
conc_latency_p95_list,
309+
conc_latency_avg_list,
310+
)
250311

251-
def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
312+
def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int, dict]:
252313
"""
253314
Returns:
254315
int: successful requests count
255316
int: failed requests count
317+
dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram)
256318
"""
257319
# sync all process
258320
q.put(1)
@@ -263,6 +325,13 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
263325
self.db.prepare_filter(self.filters)
264326
num, idx = len(test_data), random.randint(0, len(test_data) - 1)
265327

328+
# Memory-efficient latency tracking
329+
histogram = HdrHistogram(
330+
HDR_HISTOGRAM_MIN_US,
331+
HDR_HISTOGRAM_MAX_US,
332+
HDR_HISTOGRAM_SIGNIFICANT_DIGITS
333+
)
334+
266335
start_time = time.perf_counter()
267336
success_count = 0
268337
failed_cnt = 0
@@ -271,6 +340,8 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
271340
try:
272341
self.db.search_embedding(test_data[idx], self.k)
273342
success_count += 1
343+
latency_us = int((time.perf_counter() - s) * US_TO_SECONDS)
344+
histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US))
274345
except Exception as e:
275346
failed_cnt += 1
276347
# reduce log
@@ -284,8 +355,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
284355

285356
if success_count % 500 == 0:
286357
log.debug(
287-
f"({mp.current_process().name:16}) search_count: {success_count}, "
288-
f"latest_latency={time.perf_counter()-s}",
358+
f"({mp.current_process().name:16}) search_count: {success_count}",
289359
)
290360

291361
total_dur = round(time.perf_counter() - start_time, 4)
@@ -295,4 +365,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
295365
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
296366
)
297367

298-
return success_count, failed_cnt
368+
# Pre-computed stats to avoid large data transfer
369+
latency_stats = {
370+
'p99': histogram.get_value_at_percentile(99) / US_TO_SECONDS,
371+
'p95': histogram.get_value_at_percentile(95) / US_TO_SECONDS,
372+
'avg': histogram.get_mean_value() / US_TO_SECONDS,
373+
'count': histogram.get_total_count(),
374+
}
375+
376+
return success_count, failed_cnt, latency_stats

vectordb_bench/backend/runner/read_write_runner.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,20 @@ def run_search(self, perc: int):
106106
log.info(
107107
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
108108
)
109-
max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
109+
result = self.run_by_dur(self.read_dur_after_write)
110+
max_qps = result[0]
111+
conc_failed_rate = result[1]
112+
conc_num_list = result[2]
113+
conc_qps_list = result[3]
114+
conc_latency_p99_list = result[4]
115+
conc_latency_p95_list = result[5]
116+
conc_latency_avg_list = result[6]
110117
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")
111118

112-
return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)]
119+
return [(
120+
perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate,
121+
conc_num_list, conc_qps_list, conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list
122+
)]
113123

114124
def run_read_write(self) -> Metric:
115125
"""
@@ -159,6 +169,13 @@ def run_read_write(self) -> Metric:
159169
m.st_serial_latency_p99_list = [d[5] for d in r]
160170
m.st_serial_latency_p95_list = [d[6] for d in r]
161171
m.st_conc_failed_rate_list = [d[7] for d in r]
172+
173+
# Extract concurrent latency data
174+
m.st_conc_num_list_list = [d[8] for d in r]
175+
m.st_conc_qps_list_list = [d[9] for d in r]
176+
m.st_conc_latency_p99_list_list = [d[10] for d in r]
177+
m.st_conc_latency_p95_list_list = [d[11] for d in r]
178+
m.st_conc_latency_avg_list_list = [d[12] for d in r]
162179

163180
except Exception as e:
164181
log.warning(f"Read and write error: {e}")
@@ -226,6 +243,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
226243
log.info(f"Insert {perc}% done, total batch={total_batch}")
227244
test_time = round(time.perf_counter(), 4)
228245
max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0
246+
conc_num_list, conc_qps_list = [], []
247+
conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], []
229248
try:
230249
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
231250
res, ssearch_dur = self.serial_search_runner.run()
@@ -246,12 +265,22 @@ def wait_next_target(start: int, target_batch: int) -> bool:
246265
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, "
247266
f"dur={each_conc_search_dur:.4f}"
248267
)
249-
max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
268+
conc_result = self.run_by_dur(each_conc_search_dur)
269+
max_qps = conc_result[0]
270+
conc_failed_rate = conc_result[1]
271+
conc_num_list = conc_result[2]
272+
conc_qps_list = conc_result[3]
273+
conc_latency_p99_list = conc_result[4]
274+
conc_latency_p95_list = conc_result[5]
275+
conc_latency_avg_list = conc_result[6]
250276
else:
251277
log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.")
252278
except Exception as e:
253279
log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
254-
result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate))
280+
result.append((
281+
perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate,
282+
conc_num_list, conc_qps_list, conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list
283+
))
255284
start_batch = target_batch
256285

257286
# Drain the queue

0 commit comments

Comments
 (0)