From 8a598d534ceb2591d8a9cd242bfc6adb5c250759 Mon Sep 17 00:00:00 2001 From: Akhil-Pathivada Date: Mon, 8 Dec 2025 15:17:40 +0530 Subject: [PATCH] feat: Add QPS-Latency tradeoff metrics for Streaming Tests - Use HDR Histogram for memory-efficient latency tracking - Collect p99, p95, avg latency at each concurrency level per stage - Add Concurrent Search Performance UI with single-stage and multi-stage views - Closes #638 --- install/requirements_py3.11.txt | 3 +- pyproject.toml | 1 + vectordb_bench/backend/runner/mp_runner.py | 89 +++++- .../backend/runner/read_write_runner.py | 63 +++- .../components/streaming/concurrent_detail.py | 273 ++++++++++++++++++ vectordb_bench/frontend/pages/streaming.py | 8 + vectordb_bench/metric.py | 7 + 7 files changed, 432 insertions(+), 12 deletions(-) create mode 100644 vectordb_bench/frontend/components/streaming/concurrent_detail.py diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt index 10a0c2e29..c6b5d1cc3 100644 --- a/install/requirements_py3.11.txt +++ b/install/requirements_py3.11.txt @@ -25,4 +25,5 @@ pymilvus clickhouse_connect pyvespa mysql-connector-python -packaging \ No newline at end of file +packaging +hdrhistogram>=0.10.1 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c4563b26e..3eb4d0b0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "scikit-learn", "pymilvus", # with pandas, numpy, ujson "ujson", + "hdrhistogram>=0.10.1", ] dynamic = ["version"] diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 320eb86e2..9133e407a 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -8,6 +8,7 @@ from multiprocessing.queues import Queue import numpy as np +from hdrh.histogram import HdrHistogram from vectordb_bench.backend.filter import Filter, non_filter @@ -18,6 +19,12 @@ NUM_PER_BATCH = config.NUM_PER_BATCH log = logging.getLogger(__name__) +# HDR Histogram constants +HDR_HISTOGRAM_MIN_US = 1 +HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds +HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy +US_TO_SECONDS = 1_000_000 + class MultiProcessingSearchRunner: """multiprocessing search runner @@ -182,6 +189,31 @@ def run(self) -> float: def stop(self) -> None: pass + def _aggregate_latency_stats(self, res: list) -> tuple[float, float, float]: + """Aggregate latency stats from worker processes. + + Returns: + tuple: (p99, p95, avg) latencies in seconds + """ + latency_stats_list = [r[2] for r in res if r[2] and r[2].get("count", 0) > 0] + + if not latency_stats_list: + return 0, 0, 0 + + total_query_count = sum(stats["count"] for stats in latency_stats_list) + + if total_query_count == 0: + return 0, 0, 0 + + # Use max for conservative percentile estimate + latency_p99 = max(stats["p99"] for stats in latency_stats_list) + latency_p95 = max(stats["p95"] for stats in latency_stats_list) + + # Weighted average + latency_avg = sum(stats["avg"] * stats["count"] for stats in latency_stats_list) / total_query_count + + return latency_p99, latency_p95, latency_avg + def run_by_dur(self, duration: int) -> tuple[float, float]: """ Returns: @@ -190,13 +222,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]: """ return self._run_by_dur(duration) - def _run_by_dur(self, duration: int) -> tuple[float, float]: + def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]: """ Returns: float: largest qps float: failed rate + list: concurrency numbers + list: qps values at each concurrency + list: p99 latencies at each concurrency + list: p95 latencies at each concurrency + list: avg latencies at each concurrency """ max_qps = 0 + conc_num_list = [] + conc_qps_list = [] + conc_latency_p99_list = [] + conc_latency_p95_list = [] + conc_latency_avg_list = [] try: for conc in self.concurrencies: with mp.Manager() as m: @@ -226,9 +268,19 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]: cost = time.perf_counter() - start qps = round(all_success_count / cost, 4) + + latency_p99, latency_p95, latency_avg = self._aggregate_latency_stats(res) + + conc_num_list.append(conc) + conc_qps_list.append(qps) + conc_latency_p99_list.append(latency_p99) + conc_latency_p95_list.append(latency_p95) + conc_latency_avg_list.append(latency_avg) + log.info( f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, " - f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}", + f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, " + f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s", ) if qps > max_qps: max_qps = qps @@ -246,13 +298,24 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]: finally: self.stop() - return max_qps, failed_rate + return ( + max_qps, + failed_rate, + conc_num_list, + conc_qps_list, + conc_latency_p99_list, + conc_latency_p95_list, + conc_latency_avg_list, + ) - def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]: + def search_by_dur( + self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition + ) -> tuple[int, int, dict]: """ Returns: int: successful requests count int: failed requests count + dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram) """ # sync all process q.put(1) @@ -263,6 +326,9 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con self.db.prepare_filter(self.filters) num, idx = len(test_data), random.randint(0, len(test_data) - 1) + # Memory-efficient latency tracking + histogram = HdrHistogram(HDR_HISTOGRAM_MIN_US, HDR_HISTOGRAM_MAX_US, HDR_HISTOGRAM_SIGNIFICANT_DIGITS) + start_time = time.perf_counter() success_count = 0 failed_cnt = 0 @@ -271,6 +337,8 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con try: self.db.search_embedding(test_data[idx], self.k) success_count += 1 + latency_us = int((time.perf_counter() - s) * US_TO_SECONDS) + histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US)) except Exception as e: failed_cnt += 1 # reduce log @@ -284,8 +352,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con if success_count % 500 == 0: log.debug( - f"({mp.current_process().name:16}) search_count: {success_count}, " - f"latest_latency={time.perf_counter()-s}", + f"({mp.current_process().name:16}) search_count: {success_count}", ) total_dur = round(time.perf_counter() - start_time, 4) @@ -295,4 +362,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con f"qps (successful) in this process: {round(success_count / total_dur, 4):3}", ) - return success_count, failed_cnt + # Pre-computed stats to avoid large data transfer + latency_stats = { + "p99": histogram.get_value_at_percentile(99) / US_TO_SECONDS, + "p95": histogram.get_value_at_percentile(95) / US_TO_SECONDS, + "avg": histogram.get_mean_value() / US_TO_SECONDS, + "count": histogram.get_total_count(), + } + + return success_count, failed_cnt, latency_stats diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py index e6b5c8d9b..d3d1df2fa 100644 --- a/vectordb_bench/backend/runner/read_write_runner.py +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -106,10 +106,33 @@ def run_search(self, perc: int): log.info( f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}", ) - max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write) + result = self.run_by_dur(self.read_dur_after_write) + max_qps = result[0] + conc_failed_rate = result[1] + conc_num_list = result[2] + conc_qps_list = result[3] + conc_latency_p99_list = result[4] + conc_latency_p95_list = result[5] + conc_latency_avg_list = result[6] log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}") - return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)] + return [ + ( + perc, + test_time, + max_qps, + recall, + ndcg, + p99_latency, + p95_latency, + conc_failed_rate, + conc_num_list, + conc_qps_list, + conc_latency_p99_list, + conc_latency_p95_list, + conc_latency_avg_list, + ) + ] def run_read_write(self) -> Metric: """ @@ -160,6 +183,13 @@ def run_read_write(self) -> Metric: m.st_serial_latency_p95_list = [d[6] for d in r] m.st_conc_failed_rate_list = [d[7] for d in r] + # Extract concurrent latency data + m.st_conc_num_list_list = [d[8] for d in r] + m.st_conc_qps_list_list = [d[9] for d in r] + m.st_conc_latency_p99_list_list = [d[10] for d in r] + m.st_conc_latency_p95_list_list = [d[11] for d in r] + m.st_conc_latency_avg_list_list = [d[12] for d in r] + except Exception as e: log.warning(f"Read and write error: {e}") executor.shutdown(wait=True, cancel_futures=True) @@ -226,6 +256,8 @@ def wait_next_target(start: int, target_batch: int) -> bool: log.info(f"Insert {perc}% done, total batch={total_batch}") test_time = round(time.perf_counter(), 4) max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0 + conc_num_list, conc_qps_list = [], [] + conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], [] try: log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start") res, ssearch_dur = self.serial_search_runner.run() @@ -246,12 +278,35 @@ def wait_next_target(start: int, target_batch: int) -> bool: f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, " f"dur={each_conc_search_dur:.4f}" ) - max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur) + conc_result = self.run_by_dur(each_conc_search_dur) + max_qps = conc_result[0] + conc_failed_rate = conc_result[1] + conc_num_list = conc_result[2] + conc_qps_list = conc_result[3] + conc_latency_p99_list = conc_result[4] + conc_latency_p95_list = conc_result[5] + conc_latency_avg_list = conc_result[6] else: log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.") except Exception as e: log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}") - result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)) + result.append( + ( + perc, + test_time, + max_qps, + recall, + ndcg, + p99_latency, + p95_latency, + conc_failed_rate, + conc_num_list, + conc_qps_list, + conc_latency_p99_list, + conc_latency_p95_list, + conc_latency_avg_list, + ) + ) start_batch = target_batch # Drain the queue diff --git a/vectordb_bench/frontend/components/streaming/concurrent_detail.py b/vectordb_bench/frontend/components/streaming/concurrent_detail.py new file mode 100644 index 000000000..0580bad14 --- /dev/null +++ b/vectordb_bench/frontend/components/streaming/concurrent_detail.py @@ -0,0 +1,273 @@ +import plotly.graph_objects as go +import streamlit as st + + +def drawConcurrentPerformanceSection(container, case_data, case_name: str): + """Main section for concurrent performance detail in streaming tests""" + + # Check if data exists + if not case_data.get("st_conc_qps_list_list") or len(case_data["st_conc_qps_list_list"]) == 0: + container.info( + "Concurrent latency detail not available for this test. " + "Re-run the test with updated code to collect this data." + ) + return + + container.markdown("---") + container.subheader("Concurrent Search Performance") + container.markdown( + "Detailed Latency → QPS relationship at each stage. " "Displays how latency and QPS vary with concurrency." + ) + + # View mode selector + view_mode = container.radio( + "View Mode", options=["Single Stage", "Compare Stages"], horizontal=True, key=f"{case_name}-view-mode" + ) + + if view_mode == "Single Stage": + drawSingleStageView(container, case_data, case_name) + else: + drawCompareStagesView(container, case_data, case_name) + + +def drawSingleStageView(container, case_data, case_name: str): + """Show detailed Latency→QPS for one selected stage""" + + stages = case_data["st_search_stage_list"] + + # Find the last stage with data for default selection + default_stage_idx = len(stages) - 1 + for i in range(len(stages) - 1, -1, -1): + if case_data["st_conc_qps_list_list"][i]: + default_stage_idx = i + break + + # Stage selector (show all stages) + stage = container.selectbox( + "Select Stage", + options=stages, + index=default_stage_idx, + format_func=lambda x: f"{x}% data loaded", + key=f"{case_name}-stage-selector", + ) + + stage_idx = stages.index(stage) + + # Check if this stage has concurrent data + if not case_data["st_conc_qps_list_list"][stage_idx]: + container.warning( + f"No concurrent search data for {stage}% stage.\n\n" + f"**Reason:** Concurrent tests were skipped because there wasn't enough time " + f"between stages (< 10s per concurrency level).\n\n" + f"**Tip:** Use a larger dataset or slower insert rate to get data for all stages." + ) + return + + # Latency metric selector + latency_metric = container.radio( + "Latency Metric", options=["P99", "P95", "Average"], horizontal=True, key=f"{case_name}-latency-metric" + ) + + # Get data for selected stage + qps_values = case_data["st_conc_qps_list_list"][stage_idx] + conc_nums = case_data["st_conc_num_list_list"][stage_idx] + + # Get latency based on selection + if latency_metric == "P99": + latencies_sec = case_data["st_conc_latency_p99_list_list"][stage_idx] + elif latency_metric == "P95": + latencies_sec = case_data["st_conc_latency_p95_list_list"][stage_idx] + else: + latencies_sec = case_data["st_conc_latency_avg_list_list"][stage_idx] + + latencies_ms = [l * 1000 for l in latencies_sec] # Convert to ms + + # Draw chart + drawQPSLatencyChart(container, qps_values, latencies_ms, stage, latency_metric, case_name) + + # Draw table + drawMetricsTable( + container, + qps_values, + conc_nums, + case_data["st_conc_latency_p99_list_list"][stage_idx], + case_data["st_conc_latency_p95_list_list"][stage_idx], + case_data["st_conc_latency_avg_list_list"][stage_idx], + stage, + ) + + +def drawQPSLatencyChart(container, qps_values, latencies_ms, stage, metric_name, case_name): + """Draw Latency vs QPS scatter plot""" + + fig = go.Figure() + + fig.add_trace( + go.Scatter( + x=latencies_ms, + y=qps_values, + mode="lines+markers+text", + text=[f"{qps:.1f}" for qps in qps_values], + textposition="top center", + marker=dict(size=12, color="#1f77b4"), + line=dict(width=2, color="#1f77b4"), + hovertemplate=("QPS: %{y:.1f}
" f"{metric_name} Latency: %{{x:.1f}}ms
" ""), + ) + ) + + fig.update_layout( + title=f"Latency vs QPS at Stage {stage}%", + xaxis_title=f"Latency {metric_name} (ms)", + yaxis_title="Queries Per Second (QPS)", + height=500, + hovermode="closest", + showlegend=False, + ) + + container.plotly_chart(fig, use_container_width=True, key=f"{case_name}-chart-{stage}") + + +def drawMetricsTable(container, qps_values, conc_nums, p99_list, p95_list, avg_list, stage): + """Draw detailed metrics table""" + + container.markdown(f"**Detailed Metrics at Stage {stage}%**") + + # Build table data + table_data = [] + for i in range(len(qps_values)): + table_data.append( + { + "Concurrency": conc_nums[i], + "QPS": f"{qps_values[i]:.2f}", + "P99 (ms)": f"{p99_list[i] * 1000:.1f}", + "P95 (ms)": f"{p95_list[i] * 1000:.1f}", + "Avg (ms)": f"{avg_list[i] * 1000:.1f}", + } + ) + + container.table(table_data) + + +def drawCompareStagesView(container, case_data, case_name: str): + """Show QPS→Latency curves for multiple stages""" + + stages = case_data["st_search_stage_list"] + + # Find stages with data for default selection + stages_with_data = [stage for i, stage in enumerate(stages) if case_data["st_conc_qps_list_list"][i]] + + # Stage multi-selector (show all stages, but default to ones with data) + default_stages = [] + if stages_with_data: + default_stages = [stages_with_data[0], stages_with_data[-1]] if len(stages_with_data) >= 2 else stages_with_data + + selected_stages = container.multiselect( + "Select stages to compare", + options=stages, + default=default_stages, + format_func=lambda x: f"{x}%", + key=f"{case_name}-compare-stages", + help="Note: Some stages may have no concurrent data if test duration was too short", + ) + + if not selected_stages: + container.warning("Please select at least one stage to display.") + return + + # Latency metric selector + latency_metric = container.radio( + "Latency Metric", options=["P99", "P95", "Average"], horizontal=True, key=f"{case_name}-compare-metric" + ) + + # Draw comparison chart + drawComparisonChart(container, case_data, selected_stages, latency_metric, case_name) + + +def drawComparisonChart(container, case_data, selected_stages, metric_name, case_name): + """Draw multi-line comparison chart""" + + fig = go.Figure() + + # Color palette + colors = [ + "#1f77b4", + "#ff7f0e", + "#2ca02c", + "#d62728", + "#9467bd", + "#8c564b", + "#e377c2", + "#7f7f7f", + "#bcbd22", + "#17becf", + ] + + stages_plotted = 0 + stages_skipped = [] + + for idx, stage in enumerate(selected_stages): + stage_idx = case_data["st_search_stage_list"].index(stage) + + qps_values = case_data["st_conc_qps_list_list"][stage_idx] + + # Skip stages with no data + if not qps_values: + stages_skipped.append(stage) + continue + + # Get latency based on selection + if metric_name == "P99": + latencies_sec = case_data["st_conc_latency_p99_list_list"][stage_idx] + elif metric_name == "P95": + latencies_sec = case_data["st_conc_latency_p95_list_list"][stage_idx] + else: + latencies_sec = case_data["st_conc_latency_avg_list_list"][stage_idx] + + latencies_ms = [l * 1000 for l in latencies_sec] + stages_plotted += 1 + + fig.add_trace( + go.Scatter( + x=latencies_ms, + y=qps_values, + mode="lines+markers", + name=f"{stage}% loaded", + marker=dict(size=10), + line=dict(width=2, color=colors[idx % len(colors)]), + hovertemplate=( + f"Stage {stage}%
" + "QPS: %{y:.1f}
" + f"{metric_name} Latency: %{{x:.1f}}ms
" + "" + ), + ) + ) + + # Check if any data was plotted + if stages_plotted == 0: + container.warning( + f"None of the selected stages have concurrent search data.\n\n" + f"**Skipped stages:** {', '.join([f'{s}%' for s in stages_skipped])}\n\n" + f"**Reason:** Concurrent tests were skipped because there wasn't enough time " + f"between stages (< 10s per concurrency level).\n\n" + f"**Tip:** Use a larger dataset or slower insert rate to get data for all stages." + ) + return + + # Show warning for skipped stages + if stages_skipped: + container.info(f"Stages without data: {', '.join([f'{s}%' for s in stages_skipped])}") + + fig.update_layout( + title=f"Latency vs QPS Evolution Across Stages", + xaxis_title=f"Latency {metric_name} (ms)", + yaxis_title="Queries Per Second (QPS)", + height=600, + hovermode="closest", + legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99), + ) + + container.plotly_chart(fig, use_container_width=True, key=f"{case_name}-compare-chart") + + # Add insight + container.info("**Insight:** Compare curves across stages to understand how performance scales with data growth.") diff --git a/vectordb_bench/frontend/pages/streaming.py b/vectordb_bench/frontend/pages/streaming.py index 23e43490c..811c4d497 100644 --- a/vectordb_bench/frontend/pages/streaming.py +++ b/vectordb_bench/frontend/pages/streaming.py @@ -140,6 +140,14 @@ def case_results_filter(case_result: CaseResult) -> bool: line_chart_displayed_y_metrics=line_chart_displayed_y_metrics, ) + # Concurrent performance detail section + from vectordb_bench.frontend.components.streaming.concurrent_detail import drawConcurrentPerformanceSection + + for case_name in showCaseNames: + case_data_list = [d for d in shownData if d["case_name"] == case_name] + if case_data_list: + drawConcurrentPerformanceSection(st.container(), case_data_list[0], case_name) + # footer footer(st.container()) diff --git a/vectordb_bench/metric.py b/vectordb_bench/metric.py index 71ba3149f..3634b2114 100644 --- a/vectordb_bench/metric.py +++ b/vectordb_bench/metric.py @@ -41,6 +41,13 @@ class Metric: st_serial_latency_p95_list: list[float] = field(default_factory=list) st_conc_failed_rate_list: list[float] = field(default_factory=list) + # for streaming cases - concurrent latency data per stage + st_conc_num_list_list: list[list[int]] = field(default_factory=list) + st_conc_qps_list_list: list[list[float]] = field(default_factory=list) + st_conc_latency_p99_list_list: list[list[float]] = field(default_factory=list) + st_conc_latency_p95_list_list: list[list[float]] = field(default_factory=list) + st_conc_latency_avg_list_list: list[list[float]] = field(default_factory=list) + QURIES_PER_DOLLAR_METRIC = "QP$ (Quries per Dollar)" LOAD_DURATION_METRIC = "load_duration"