Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion install/requirements_py3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pymilvus
clickhouse_connect
pyvespa
mysql-connector-python
packaging
packaging
hdrhistogram>=0.10.1
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"scikit-learn",
"pymilvus", # with pandas, numpy, ujson
"ujson",
"hdrhistogram>=0.10.1",
]
dynamic = ["version"]

Expand Down
89 changes: 82 additions & 7 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from multiprocessing.queues import Queue

import numpy as np
from hdrh.histogram import HdrHistogram

from vectordb_bench.backend.filter import Filter, non_filter

Expand All @@ -18,6 +19,12 @@
NUM_PER_BATCH = config.NUM_PER_BATCH
log = logging.getLogger(__name__)

# HDR Histogram constants
HDR_HISTOGRAM_MIN_US = 1
HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds
HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy
US_TO_SECONDS = 1_000_000


class MultiProcessingSearchRunner:
"""multiprocessing search runner
Expand Down Expand Up @@ -182,6 +189,31 @@ def run(self) -> float:
def stop(self) -> None:
pass

def _aggregate_latency_stats(self, res: list) -> tuple[float, float, float]:
"""Aggregate latency stats from worker processes.

Returns:
tuple: (p99, p95, avg) latencies in seconds
"""
latency_stats_list = [r[2] for r in res if r[2] and r[2].get("count", 0) > 0]

if not latency_stats_list:
return 0, 0, 0

total_query_count = sum(stats["count"] for stats in latency_stats_list)

if total_query_count == 0:
return 0, 0, 0

# Use max for conservative percentile estimate
latency_p99 = max(stats["p99"] for stats in latency_stats_list)
latency_p95 = max(stats["p95"] for stats in latency_stats_list)

# Weighted average
latency_avg = sum(stats["avg"] * stats["count"] for stats in latency_stats_list) / total_query_count

return latency_p99, latency_p95, latency_avg

def run_by_dur(self, duration: int) -> tuple[float, float]:
"""
Returns:
Expand All @@ -190,13 +222,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]:
"""
return self._run_by_dur(duration)

def _run_by_dur(self, duration: int) -> tuple[float, float]:
def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]:
"""
Returns:
float: largest qps
float: failed rate
list: concurrency numbers
list: qps values at each concurrency
list: p99 latencies at each concurrency
list: p95 latencies at each concurrency
list: avg latencies at each concurrency
"""
max_qps = 0
conc_num_list = []
conc_qps_list = []
conc_latency_p99_list = []
conc_latency_p95_list = []
conc_latency_avg_list = []
try:
for conc in self.concurrencies:
with mp.Manager() as m:
Expand Down Expand Up @@ -226,9 +268,19 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
cost = time.perf_counter() - start

qps = round(all_success_count / cost, 4)

latency_p99, latency_p95, latency_avg = self._aggregate_latency_stats(res)

conc_num_list.append(conc)
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
conc_latency_p95_list.append(latency_p95)
conc_latency_avg_list.append(latency_avg)

log.info(
f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, "
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, "
f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s",
)
if qps > max_qps:
max_qps = qps
Expand All @@ -246,13 +298,24 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
finally:
self.stop()

return max_qps, failed_rate
return (
max_qps,
failed_rate,
conc_num_list,
conc_qps_list,
conc_latency_p99_list,
conc_latency_p95_list,
conc_latency_avg_list,
)

def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
def search_by_dur(
self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition
) -> tuple[int, int, dict]:
"""
Returns:
int: successful requests count
int: failed requests count
dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram)
"""
# sync all process
q.put(1)
Expand All @@ -263,6 +326,9 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
self.db.prepare_filter(self.filters)
num, idx = len(test_data), random.randint(0, len(test_data) - 1)

# Memory-efficient latency tracking
histogram = HdrHistogram(HDR_HISTOGRAM_MIN_US, HDR_HISTOGRAM_MAX_US, HDR_HISTOGRAM_SIGNIFICANT_DIGITS)

start_time = time.perf_counter()
success_count = 0
failed_cnt = 0
Expand All @@ -271,6 +337,8 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
try:
self.db.search_embedding(test_data[idx], self.k)
success_count += 1
latency_us = int((time.perf_counter() - s) * US_TO_SECONDS)
histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US))
except Exception as e:
failed_cnt += 1
# reduce log
Expand All @@ -284,8 +352,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con

if success_count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) search_count: {success_count}, "
f"latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) search_count: {success_count}",
)

total_dur = round(time.perf_counter() - start_time, 4)
Expand All @@ -295,4 +362,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
)

return success_count, failed_cnt
# Pre-computed stats to avoid large data transfer
latency_stats = {
"p99": histogram.get_value_at_percentile(99) / US_TO_SECONDS,
"p95": histogram.get_value_at_percentile(95) / US_TO_SECONDS,
"avg": histogram.get_mean_value() / US_TO_SECONDS,
"count": histogram.get_total_count(),
}

return success_count, failed_cnt, latency_stats
63 changes: 59 additions & 4 deletions vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,33 @@ def run_search(self, perc: int):
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
result = self.run_by_dur(self.read_dur_after_write)
max_qps = result[0]
conc_failed_rate = result[1]
conc_num_list = result[2]
conc_qps_list = result[3]
conc_latency_p99_list = result[4]
conc_latency_p95_list = result[5]
conc_latency_avg_list = result[6]
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")

return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)]
return [
(
perc,
test_time,
max_qps,
recall,
ndcg,
p99_latency,
p95_latency,
conc_failed_rate,
conc_num_list,
conc_qps_list,
conc_latency_p99_list,
conc_latency_p95_list,
conc_latency_avg_list,
)
]

def run_read_write(self) -> Metric:
"""
Expand Down Expand Up @@ -160,6 +183,13 @@ def run_read_write(self) -> Metric:
m.st_serial_latency_p95_list = [d[6] for d in r]
m.st_conc_failed_rate_list = [d[7] for d in r]

# Extract concurrent latency data
m.st_conc_num_list_list = [d[8] for d in r]
m.st_conc_qps_list_list = [d[9] for d in r]
m.st_conc_latency_p99_list_list = [d[10] for d in r]
m.st_conc_latency_p95_list_list = [d[11] for d in r]
m.st_conc_latency_avg_list_list = [d[12] for d in r]

except Exception as e:
log.warning(f"Read and write error: {e}")
executor.shutdown(wait=True, cancel_futures=True)
Expand Down Expand Up @@ -226,6 +256,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
log.info(f"Insert {perc}% done, total batch={total_batch}")
test_time = round(time.perf_counter(), 4)
max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0
conc_num_list, conc_qps_list = [], []
conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], []
try:
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
res, ssearch_dur = self.serial_search_runner.run()
Expand All @@ -246,12 +278,35 @@ def wait_next_target(start: int, target_batch: int) -> bool:
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, "
f"dur={each_conc_search_dur:.4f}"
)
max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
conc_result = self.run_by_dur(each_conc_search_dur)
max_qps = conc_result[0]
conc_failed_rate = conc_result[1]
conc_num_list = conc_result[2]
conc_qps_list = conc_result[3]
conc_latency_p99_list = conc_result[4]
conc_latency_p95_list = conc_result[5]
conc_latency_avg_list = conc_result[6]
else:
log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.")
except Exception as e:
log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate))
result.append(
(
perc,
test_time,
max_qps,
recall,
ndcg,
p99_latency,
p95_latency,
conc_failed_rate,
conc_num_list,
conc_qps_list,
conc_latency_p99_list,
conc_latency_p95_list,
conc_latency_avg_list,
)
)
start_batch = target_batch

# Drain the queue
Expand Down
Loading