diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt
index 10a0c2e29..c6b5d1cc3 100644
--- a/install/requirements_py3.11.txt
+++ b/install/requirements_py3.11.txt
@@ -25,4 +25,5 @@ pymilvus
clickhouse_connect
pyvespa
mysql-connector-python
-packaging
\ No newline at end of file
+packaging
+hdrhistogram>=0.10.1
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index c4563b26e..3eb4d0b0f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -41,6 +41,7 @@ dependencies = [
"scikit-learn",
"pymilvus", # with pandas, numpy, ujson
"ujson",
+ "hdrhistogram>=0.10.1",
]
dynamic = ["version"]
diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py
index 320eb86e2..9133e407a 100644
--- a/vectordb_bench/backend/runner/mp_runner.py
+++ b/vectordb_bench/backend/runner/mp_runner.py
@@ -8,6 +8,7 @@
from multiprocessing.queues import Queue
import numpy as np
+from hdrh.histogram import HdrHistogram
from vectordb_bench.backend.filter import Filter, non_filter
@@ -18,6 +19,12 @@
NUM_PER_BATCH = config.NUM_PER_BATCH
log = logging.getLogger(__name__)
+# HDR Histogram constants
+HDR_HISTOGRAM_MIN_US = 1
+HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds
+HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy
+US_TO_SECONDS = 1_000_000
+
class MultiProcessingSearchRunner:
"""multiprocessing search runner
@@ -182,6 +189,31 @@ def run(self) -> float:
def stop(self) -> None:
pass
+ def _aggregate_latency_stats(self, res: list) -> tuple[float, float, float]:
+ """Aggregate latency stats from worker processes.
+
+ Returns:
+ tuple: (p99, p95, avg) latencies in seconds
+ """
+ latency_stats_list = [r[2] for r in res if r[2] and r[2].get("count", 0) > 0]
+
+ if not latency_stats_list:
+ return 0, 0, 0
+
+ total_query_count = sum(stats["count"] for stats in latency_stats_list)
+
+ if total_query_count == 0:
+ return 0, 0, 0
+
+ # Use max for conservative percentile estimate
+ latency_p99 = max(stats["p99"] for stats in latency_stats_list)
+ latency_p95 = max(stats["p95"] for stats in latency_stats_list)
+
+ # Weighted average
+ latency_avg = sum(stats["avg"] * stats["count"] for stats in latency_stats_list) / total_query_count
+
+ return latency_p99, latency_p95, latency_avg
+
def run_by_dur(self, duration: int) -> tuple[float, float]:
"""
Returns:
@@ -190,13 +222,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]:
"""
return self._run_by_dur(duration)
- def _run_by_dur(self, duration: int) -> tuple[float, float]:
+ def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]:
"""
Returns:
float: largest qps
float: failed rate
+ list: concurrency numbers
+ list: qps values at each concurrency
+ list: p99 latencies at each concurrency
+ list: p95 latencies at each concurrency
+ list: avg latencies at each concurrency
"""
max_qps = 0
+ conc_num_list = []
+ conc_qps_list = []
+ conc_latency_p99_list = []
+ conc_latency_p95_list = []
+ conc_latency_avg_list = []
try:
for conc in self.concurrencies:
with mp.Manager() as m:
@@ -226,9 +268,19 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
cost = time.perf_counter() - start
qps = round(all_success_count / cost, 4)
+
+ latency_p99, latency_p95, latency_avg = self._aggregate_latency_stats(res)
+
+ conc_num_list.append(conc)
+ conc_qps_list.append(qps)
+ conc_latency_p99_list.append(latency_p99)
+ conc_latency_p95_list.append(latency_p95)
+ conc_latency_avg_list.append(latency_avg)
+
log.info(
f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, "
- f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
+ f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, "
+ f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s",
)
if qps > max_qps:
max_qps = qps
@@ -246,13 +298,24 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
finally:
self.stop()
- return max_qps, failed_rate
+ return (
+ max_qps,
+ failed_rate,
+ conc_num_list,
+ conc_qps_list,
+ conc_latency_p99_list,
+ conc_latency_p95_list,
+ conc_latency_avg_list,
+ )
- def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
+ def search_by_dur(
+ self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition
+ ) -> tuple[int, int, dict]:
"""
Returns:
int: successful requests count
int: failed requests count
+ dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram)
"""
# sync all process
q.put(1)
@@ -263,6 +326,9 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
self.db.prepare_filter(self.filters)
num, idx = len(test_data), random.randint(0, len(test_data) - 1)
+ # Memory-efficient latency tracking
+ histogram = HdrHistogram(HDR_HISTOGRAM_MIN_US, HDR_HISTOGRAM_MAX_US, HDR_HISTOGRAM_SIGNIFICANT_DIGITS)
+
start_time = time.perf_counter()
success_count = 0
failed_cnt = 0
@@ -271,6 +337,8 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
try:
self.db.search_embedding(test_data[idx], self.k)
success_count += 1
+ latency_us = int((time.perf_counter() - s) * US_TO_SECONDS)
+ histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US))
except Exception as e:
failed_cnt += 1
# reduce log
@@ -284,8 +352,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
if success_count % 500 == 0:
log.debug(
- f"({mp.current_process().name:16}) search_count: {success_count}, "
- f"latest_latency={time.perf_counter()-s}",
+ f"({mp.current_process().name:16}) search_count: {success_count}",
)
total_dur = round(time.perf_counter() - start_time, 4)
@@ -295,4 +362,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
)
- return success_count, failed_cnt
+ # Pre-computed stats to avoid large data transfer
+ latency_stats = {
+ "p99": histogram.get_value_at_percentile(99) / US_TO_SECONDS,
+ "p95": histogram.get_value_at_percentile(95) / US_TO_SECONDS,
+ "avg": histogram.get_mean_value() / US_TO_SECONDS,
+ "count": histogram.get_total_count(),
+ }
+
+ return success_count, failed_cnt, latency_stats
diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py
index e6b5c8d9b..d3d1df2fa 100644
--- a/vectordb_bench/backend/runner/read_write_runner.py
+++ b/vectordb_bench/backend/runner/read_write_runner.py
@@ -106,10 +106,33 @@ def run_search(self, perc: int):
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
- max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
+ result = self.run_by_dur(self.read_dur_after_write)
+ max_qps = result[0]
+ conc_failed_rate = result[1]
+ conc_num_list = result[2]
+ conc_qps_list = result[3]
+ conc_latency_p99_list = result[4]
+ conc_latency_p95_list = result[5]
+ conc_latency_avg_list = result[6]
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")
- return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)]
+ return [
+ (
+ perc,
+ test_time,
+ max_qps,
+ recall,
+ ndcg,
+ p99_latency,
+ p95_latency,
+ conc_failed_rate,
+ conc_num_list,
+ conc_qps_list,
+ conc_latency_p99_list,
+ conc_latency_p95_list,
+ conc_latency_avg_list,
+ )
+ ]
def run_read_write(self) -> Metric:
"""
@@ -160,6 +183,13 @@ def run_read_write(self) -> Metric:
m.st_serial_latency_p95_list = [d[6] for d in r]
m.st_conc_failed_rate_list = [d[7] for d in r]
+ # Extract concurrent latency data
+ m.st_conc_num_list_list = [d[8] for d in r]
+ m.st_conc_qps_list_list = [d[9] for d in r]
+ m.st_conc_latency_p99_list_list = [d[10] for d in r]
+ m.st_conc_latency_p95_list_list = [d[11] for d in r]
+ m.st_conc_latency_avg_list_list = [d[12] for d in r]
+
except Exception as e:
log.warning(f"Read and write error: {e}")
executor.shutdown(wait=True, cancel_futures=True)
@@ -226,6 +256,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
log.info(f"Insert {perc}% done, total batch={total_batch}")
test_time = round(time.perf_counter(), 4)
max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0
+ conc_num_list, conc_qps_list = [], []
+ conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], []
try:
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
res, ssearch_dur = self.serial_search_runner.run()
@@ -246,12 +278,35 @@ def wait_next_target(start: int, target_batch: int) -> bool:
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, "
f"dur={each_conc_search_dur:.4f}"
)
- max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
+ conc_result = self.run_by_dur(each_conc_search_dur)
+ max_qps = conc_result[0]
+ conc_failed_rate = conc_result[1]
+ conc_num_list = conc_result[2]
+ conc_qps_list = conc_result[3]
+ conc_latency_p99_list = conc_result[4]
+ conc_latency_p95_list = conc_result[5]
+ conc_latency_avg_list = conc_result[6]
else:
log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.")
except Exception as e:
log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
- result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate))
+ result.append(
+ (
+ perc,
+ test_time,
+ max_qps,
+ recall,
+ ndcg,
+ p99_latency,
+ p95_latency,
+ conc_failed_rate,
+ conc_num_list,
+ conc_qps_list,
+ conc_latency_p99_list,
+ conc_latency_p95_list,
+ conc_latency_avg_list,
+ )
+ )
start_batch = target_batch
# Drain the queue
diff --git a/vectordb_bench/frontend/components/streaming/concurrent_detail.py b/vectordb_bench/frontend/components/streaming/concurrent_detail.py
new file mode 100644
index 000000000..0580bad14
--- /dev/null
+++ b/vectordb_bench/frontend/components/streaming/concurrent_detail.py
@@ -0,0 +1,273 @@
+import plotly.graph_objects as go
+import streamlit as st
+
+
+def drawConcurrentPerformanceSection(container, case_data, case_name: str):
+ """Main section for concurrent performance detail in streaming tests"""
+
+ # Check if data exists
+ if not case_data.get("st_conc_qps_list_list") or len(case_data["st_conc_qps_list_list"]) == 0:
+ container.info(
+ "Concurrent latency detail not available for this test. "
+ "Re-run the test with updated code to collect this data."
+ )
+ return
+
+ container.markdown("---")
+ container.subheader("Concurrent Search Performance")
+ container.markdown(
+ "Detailed Latency → QPS relationship at each stage. " "Displays how latency and QPS vary with concurrency."
+ )
+
+ # View mode selector
+ view_mode = container.radio(
+ "View Mode", options=["Single Stage", "Compare Stages"], horizontal=True, key=f"{case_name}-view-mode"
+ )
+
+ if view_mode == "Single Stage":
+ drawSingleStageView(container, case_data, case_name)
+ else:
+ drawCompareStagesView(container, case_data, case_name)
+
+
+def drawSingleStageView(container, case_data, case_name: str):
+ """Show detailed Latency→QPS for one selected stage"""
+
+ stages = case_data["st_search_stage_list"]
+
+ # Find the last stage with data for default selection
+ default_stage_idx = len(stages) - 1
+ for i in range(len(stages) - 1, -1, -1):
+ if case_data["st_conc_qps_list_list"][i]:
+ default_stage_idx = i
+ break
+
+ # Stage selector (show all stages)
+ stage = container.selectbox(
+ "Select Stage",
+ options=stages,
+ index=default_stage_idx,
+ format_func=lambda x: f"{x}% data loaded",
+ key=f"{case_name}-stage-selector",
+ )
+
+ stage_idx = stages.index(stage)
+
+ # Check if this stage has concurrent data
+ if not case_data["st_conc_qps_list_list"][stage_idx]:
+ container.warning(
+ f"No concurrent search data for {stage}% stage.\n\n"
+ f"**Reason:** Concurrent tests were skipped because there wasn't enough time "
+ f"between stages (< 10s per concurrency level).\n\n"
+ f"**Tip:** Use a larger dataset or slower insert rate to get data for all stages."
+ )
+ return
+
+ # Latency metric selector
+ latency_metric = container.radio(
+ "Latency Metric", options=["P99", "P95", "Average"], horizontal=True, key=f"{case_name}-latency-metric"
+ )
+
+ # Get data for selected stage
+ qps_values = case_data["st_conc_qps_list_list"][stage_idx]
+ conc_nums = case_data["st_conc_num_list_list"][stage_idx]
+
+ # Get latency based on selection
+ if latency_metric == "P99":
+ latencies_sec = case_data["st_conc_latency_p99_list_list"][stage_idx]
+ elif latency_metric == "P95":
+ latencies_sec = case_data["st_conc_latency_p95_list_list"][stage_idx]
+ else:
+ latencies_sec = case_data["st_conc_latency_avg_list_list"][stage_idx]
+
+ latencies_ms = [l * 1000 for l in latencies_sec] # Convert to ms
+
+ # Draw chart
+ drawQPSLatencyChart(container, qps_values, latencies_ms, stage, latency_metric, case_name)
+
+ # Draw table
+ drawMetricsTable(
+ container,
+ qps_values,
+ conc_nums,
+ case_data["st_conc_latency_p99_list_list"][stage_idx],
+ case_data["st_conc_latency_p95_list_list"][stage_idx],
+ case_data["st_conc_latency_avg_list_list"][stage_idx],
+ stage,
+ )
+
+
+def drawQPSLatencyChart(container, qps_values, latencies_ms, stage, metric_name, case_name):
+ """Draw Latency vs QPS scatter plot"""
+
+ fig = go.Figure()
+
+ fig.add_trace(
+ go.Scatter(
+ x=latencies_ms,
+ y=qps_values,
+ mode="lines+markers+text",
+ text=[f"{qps:.1f}" for qps in qps_values],
+ textposition="top center",
+ marker=dict(size=12, color="#1f77b4"),
+ line=dict(width=2, color="#1f77b4"),
+ hovertemplate=("QPS: %{y:.1f}
" f"{metric_name} Latency: %{{x:.1f}}ms
" ""),
+ )
+ )
+
+ fig.update_layout(
+ title=f"Latency vs QPS at Stage {stage}%",
+ xaxis_title=f"Latency {metric_name} (ms)",
+ yaxis_title="Queries Per Second (QPS)",
+ height=500,
+ hovermode="closest",
+ showlegend=False,
+ )
+
+ container.plotly_chart(fig, use_container_width=True, key=f"{case_name}-chart-{stage}")
+
+
+def drawMetricsTable(container, qps_values, conc_nums, p99_list, p95_list, avg_list, stage):
+ """Draw detailed metrics table"""
+
+ container.markdown(f"**Detailed Metrics at Stage {stage}%**")
+
+ # Build table data
+ table_data = []
+ for i in range(len(qps_values)):
+ table_data.append(
+ {
+ "Concurrency": conc_nums[i],
+ "QPS": f"{qps_values[i]:.2f}",
+ "P99 (ms)": f"{p99_list[i] * 1000:.1f}",
+ "P95 (ms)": f"{p95_list[i] * 1000:.1f}",
+ "Avg (ms)": f"{avg_list[i] * 1000:.1f}",
+ }
+ )
+
+ container.table(table_data)
+
+
+def drawCompareStagesView(container, case_data, case_name: str):
+ """Show QPS→Latency curves for multiple stages"""
+
+ stages = case_data["st_search_stage_list"]
+
+ # Find stages with data for default selection
+ stages_with_data = [stage for i, stage in enumerate(stages) if case_data["st_conc_qps_list_list"][i]]
+
+ # Stage multi-selector (show all stages, but default to ones with data)
+ default_stages = []
+ if stages_with_data:
+ default_stages = [stages_with_data[0], stages_with_data[-1]] if len(stages_with_data) >= 2 else stages_with_data
+
+ selected_stages = container.multiselect(
+ "Select stages to compare",
+ options=stages,
+ default=default_stages,
+ format_func=lambda x: f"{x}%",
+ key=f"{case_name}-compare-stages",
+ help="Note: Some stages may have no concurrent data if test duration was too short",
+ )
+
+ if not selected_stages:
+ container.warning("Please select at least one stage to display.")
+ return
+
+ # Latency metric selector
+ latency_metric = container.radio(
+ "Latency Metric", options=["P99", "P95", "Average"], horizontal=True, key=f"{case_name}-compare-metric"
+ )
+
+ # Draw comparison chart
+ drawComparisonChart(container, case_data, selected_stages, latency_metric, case_name)
+
+
+def drawComparisonChart(container, case_data, selected_stages, metric_name, case_name):
+ """Draw multi-line comparison chart"""
+
+ fig = go.Figure()
+
+ # Color palette
+ colors = [
+ "#1f77b4",
+ "#ff7f0e",
+ "#2ca02c",
+ "#d62728",
+ "#9467bd",
+ "#8c564b",
+ "#e377c2",
+ "#7f7f7f",
+ "#bcbd22",
+ "#17becf",
+ ]
+
+ stages_plotted = 0
+ stages_skipped = []
+
+ for idx, stage in enumerate(selected_stages):
+ stage_idx = case_data["st_search_stage_list"].index(stage)
+
+ qps_values = case_data["st_conc_qps_list_list"][stage_idx]
+
+ # Skip stages with no data
+ if not qps_values:
+ stages_skipped.append(stage)
+ continue
+
+ # Get latency based on selection
+ if metric_name == "P99":
+ latencies_sec = case_data["st_conc_latency_p99_list_list"][stage_idx]
+ elif metric_name == "P95":
+ latencies_sec = case_data["st_conc_latency_p95_list_list"][stage_idx]
+ else:
+ latencies_sec = case_data["st_conc_latency_avg_list_list"][stage_idx]
+
+ latencies_ms = [l * 1000 for l in latencies_sec]
+ stages_plotted += 1
+
+ fig.add_trace(
+ go.Scatter(
+ x=latencies_ms,
+ y=qps_values,
+ mode="lines+markers",
+ name=f"{stage}% loaded",
+ marker=dict(size=10),
+ line=dict(width=2, color=colors[idx % len(colors)]),
+ hovertemplate=(
+ f"Stage {stage}%
"
+ "QPS: %{y:.1f}
"
+ f"{metric_name} Latency: %{{x:.1f}}ms
"
+ ""
+ ),
+ )
+ )
+
+ # Check if any data was plotted
+ if stages_plotted == 0:
+ container.warning(
+ f"None of the selected stages have concurrent search data.\n\n"
+ f"**Skipped stages:** {', '.join([f'{s}%' for s in stages_skipped])}\n\n"
+ f"**Reason:** Concurrent tests were skipped because there wasn't enough time "
+ f"between stages (< 10s per concurrency level).\n\n"
+ f"**Tip:** Use a larger dataset or slower insert rate to get data for all stages."
+ )
+ return
+
+ # Show warning for skipped stages
+ if stages_skipped:
+ container.info(f"Stages without data: {', '.join([f'{s}%' for s in stages_skipped])}")
+
+ fig.update_layout(
+ title=f"Latency vs QPS Evolution Across Stages",
+ xaxis_title=f"Latency {metric_name} (ms)",
+ yaxis_title="Queries Per Second (QPS)",
+ height=600,
+ hovermode="closest",
+ legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99),
+ )
+
+ container.plotly_chart(fig, use_container_width=True, key=f"{case_name}-compare-chart")
+
+ # Add insight
+ container.info("**Insight:** Compare curves across stages to understand how performance scales with data growth.")
diff --git a/vectordb_bench/frontend/pages/streaming.py b/vectordb_bench/frontend/pages/streaming.py
index 23e43490c..811c4d497 100644
--- a/vectordb_bench/frontend/pages/streaming.py
+++ b/vectordb_bench/frontend/pages/streaming.py
@@ -140,6 +140,14 @@ def case_results_filter(case_result: CaseResult) -> bool:
line_chart_displayed_y_metrics=line_chart_displayed_y_metrics,
)
+ # Concurrent performance detail section
+ from vectordb_bench.frontend.components.streaming.concurrent_detail import drawConcurrentPerformanceSection
+
+ for case_name in showCaseNames:
+ case_data_list = [d for d in shownData if d["case_name"] == case_name]
+ if case_data_list:
+ drawConcurrentPerformanceSection(st.container(), case_data_list[0], case_name)
+
# footer
footer(st.container())
diff --git a/vectordb_bench/metric.py b/vectordb_bench/metric.py
index 71ba3149f..3634b2114 100644
--- a/vectordb_bench/metric.py
+++ b/vectordb_bench/metric.py
@@ -41,6 +41,13 @@ class Metric:
st_serial_latency_p95_list: list[float] = field(default_factory=list)
st_conc_failed_rate_list: list[float] = field(default_factory=list)
+ # for streaming cases - concurrent latency data per stage
+ st_conc_num_list_list: list[list[int]] = field(default_factory=list)
+ st_conc_qps_list_list: list[list[float]] = field(default_factory=list)
+ st_conc_latency_p99_list_list: list[list[float]] = field(default_factory=list)
+ st_conc_latency_p95_list_list: list[list[float]] = field(default_factory=list)
+ st_conc_latency_avg_list_list: list[list[float]] = field(default_factory=list)
+
QURIES_PER_DOLLAR_METRIC = "QP$ (Quries per Dollar)"
LOAD_DURATION_METRIC = "load_duration"