Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,23 @@ def run_by_dur(self, duration: int) -> tuple[float, float]:
"""
return self._run_by_dur(duration)

def _run_by_dur(self, duration: int) -> tuple[float, float]:
def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, list, list]:
"""
Returns:
float: largest qps
float: failed rate
list: concurrency numbers
list: qps values at each concurrency
list: p99 latencies at each concurrency
list: p95 latencies at each concurrency
list: avg latencies at each concurrency
"""
max_qps = 0
conc_num_list = []
conc_qps_list = []
conc_latency_p99_list = []
conc_latency_p95_list = []
conc_latency_avg_list = []
try:
for conc in self.concurrencies:
with mp.Manager() as m:
Expand Down Expand Up @@ -226,9 +236,34 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
cost = time.perf_counter() - start

qps = round(all_success_count / cost, 4)

# Collect and calculate latencies
all_latencies = []
for r in res:
if len(r) > 2 and r[2]: # Has latency data
all_latencies.extend(r[2])

# Calculate percentiles
if all_latencies:
latency_p99 = np.percentile(all_latencies, 99)
latency_p95 = np.percentile(all_latencies, 95)
latency_avg = np.mean(all_latencies)
else:
latency_p99 = 0
latency_p95 = 0
latency_avg = 0

# Store in lists
conc_num_list.append(conc)
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
conc_latency_p95_list.append(latency_p95)
conc_latency_avg_list.append(latency_avg)

log.info(
f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}, "
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}, "
f"p99={latency_p99:.4f}s, p95={latency_p95:.4f}s, avg={latency_avg:.4f}s",
)
if qps > max_qps:
max_qps = qps
Expand All @@ -246,13 +281,22 @@ def _run_by_dur(self, duration: int) -> tuple[float, float]:
finally:
self.stop()

return max_qps, failed_rate
return (
max_qps,
failed_rate,
conc_num_list,
conc_qps_list,
conc_latency_p99_list,
conc_latency_p95_list,
conc_latency_avg_list,
)

def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int, list]:
"""
Returns:
int: successful requests count
int: failed requests count
list: latencies of successful requests
"""
# sync all process
q.put(1)
Expand All @@ -266,11 +310,13 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
start_time = time.perf_counter()
success_count = 0
failed_cnt = 0
latencies = []
while time.perf_counter() < start_time + dur:
s = time.perf_counter()
try:
self.db.search_embedding(test_data[idx], self.k)
success_count += 1
latencies.append(time.perf_counter() - s)
except Exception as e:
failed_cnt += 1
# reduce log
Expand All @@ -295,4 +341,4 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
)

return success_count, failed_cnt
return success_count, failed_cnt, latencies
37 changes: 33 additions & 4 deletions vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,20 @@ def run_search(self, perc: int):
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
result = self.run_by_dur(self.read_dur_after_write)
max_qps = result[0]
conc_failed_rate = result[1]
conc_num_list = result[2]
conc_qps_list = result[3]
conc_latency_p99_list = result[4]
conc_latency_p95_list = result[5]
conc_latency_avg_list = result[6]
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")

return [(perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate)]
return [(
perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate,
conc_num_list, conc_qps_list, conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list
)]

def run_read_write(self) -> Metric:
"""
Expand Down Expand Up @@ -159,6 +169,13 @@ def run_read_write(self) -> Metric:
m.st_serial_latency_p99_list = [d[5] for d in r]
m.st_serial_latency_p95_list = [d[6] for d in r]
m.st_conc_failed_rate_list = [d[7] for d in r]

# Extract concurrent latency data
m.st_conc_num_list_list = [d[8] for d in r]
m.st_conc_qps_list_list = [d[9] for d in r]
m.st_conc_latency_p99_list_list = [d[10] for d in r]
m.st_conc_latency_p95_list_list = [d[11] for d in r]
m.st_conc_latency_avg_list_list = [d[12] for d in r]

except Exception as e:
log.warning(f"Read and write error: {e}")
Expand Down Expand Up @@ -226,6 +243,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
log.info(f"Insert {perc}% done, total batch={total_batch}")
test_time = round(time.perf_counter(), 4)
max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate = 0, 0, 0, 0, 0, 0
conc_num_list, conc_qps_list = [], []
conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list = [], [], []
try:
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
res, ssearch_dur = self.serial_search_runner.run()
Expand All @@ -246,12 +265,22 @@ def wait_next_target(start: int, target_batch: int) -> bool:
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, "
f"dur={each_conc_search_dur:.4f}"
)
max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
conc_result = self.run_by_dur(each_conc_search_dur)
max_qps = conc_result[0]
conc_failed_rate = conc_result[1]
conc_num_list = conc_result[2]
conc_qps_list = conc_result[3]
conc_latency_p99_list = conc_result[4]
conc_latency_p95_list = conc_result[5]
conc_latency_avg_list = conc_result[6]
else:
log.warning(f"Skip concurrent tests, each_conc_search_dur={each_conc_search_dur} less than 10s.")
except Exception as e:
log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate))
result.append((
perc, test_time, max_qps, recall, ndcg, p99_latency, p95_latency, conc_failed_rate,
conc_num_list, conc_qps_list, conc_latency_p99_list, conc_latency_p95_list, conc_latency_avg_list
))
start_batch = target_batch

# Drain the queue
Expand Down
Loading