Skip to content

Commit c00f12f

Browse files
committed
use many connections on the benchmarks server
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent ff12040 commit c00f12f

6 files changed

Lines changed: 69 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks-website/server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ base64 = "0.22"
2929
# track vortex-duckdb's bundled engine version (build.rs)
3030
duckdb = { version = "1.10502", features = ["bundled"] }
3131
maud = { version = "0.27", features = ["axum"] }
32+
parking_lot = { workspace = true }
3233
serde = { workspace = true, features = ["derive"] }
3334
serde_json = { workspace = true }
3435
subtle = "2.6"

benchmarks-website/server/src/app.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::ingest;
3535
/// or a small `String`).
3636
#[derive(Clone)]
3737
pub struct AppState {
38-
/// Mutex-guarded DuckDB connection. See [`crate::db`].
38+
/// Shared DuckDB handle. See [`crate::db`].
3939
pub db: DbHandle,
4040
/// Bearer token expected on `/api/ingest`. Compared via constant-time eq.
4141
pub bearer_token: Arc<String>,

benchmarks-website/server/src/db.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33

44
//! DuckDB connection management plus the deterministic `measurement_id` hash.
55
//!
6-
//! The server holds a single [`duckdb::Connection`] inside an async
7-
//! [`tokio::sync::Mutex`]. All DB work runs inside `spawn_blocking` so the
8-
//! Tokio runtime is never blocked on synchronous DuckDB calls.
6+
//! The server keeps one root [`duckdb::Connection`] and clones a fresh
7+
//! connection from it for each blocking DB task. All DB work runs inside
8+
//! `spawn_blocking` so the Tokio runtime is never blocked on synchronous
9+
//! DuckDB calls.
910
//!
1011
//! `measurement_id` is a server-internal xxhash64 over `commit_sha` plus
1112
//! each table's dimensional tuple. Including `commit_sha` makes every
@@ -21,7 +22,7 @@ use std::sync::Arc;
2122
use anyhow::Context as _;
2223
use anyhow::Result;
2324
use duckdb::Connection;
24-
use tokio::sync::Mutex;
25+
use parking_lot::Mutex;
2526
use twox_hash::XxHash64;
2627

2728
use crate::records::CompressionSize;
@@ -31,8 +32,25 @@ use crate::records::RandomAccessTime;
3132
use crate::records::VectorSearchRun;
3233
use crate::schema::SCHEMA_DDL;
3334

34-
/// A connection guard the rest of the crate hands around.
35-
pub type DbHandle = Arc<Mutex<Connection>>;
35+
/// Shared DuckDB handle. Cloning the handle is cheap; each DB task clones a
36+
/// task-local [`Connection`] before doing work.
37+
#[derive(Clone)]
38+
pub struct DbHandle {
39+
root: Arc<Mutex<Connection>>,
40+
}
41+
42+
impl DbHandle {
43+
fn new(root: Connection) -> Self {
44+
Self {
45+
root: Arc::new(Mutex::new(root)),
46+
}
47+
}
48+
49+
fn connection(&self) -> Result<Connection> {
50+
let root = self.root.lock();
51+
root.try_clone().context("cloning DuckDB connection")
52+
}
53+
}
3654

3755
/// Open the DuckDB file at `path` (creating it if absent) and apply the
3856
/// schema DDL. Returns a handle ready to be cloned into the Axum state.
@@ -41,20 +59,20 @@ pub fn open<P: AsRef<Path>>(path: P) -> Result<DbHandle> {
4159
.with_context(|| format!("opening DuckDB at {}", path.as_ref().display()))?;
4260
conn.execute_batch(SCHEMA_DDL)
4361
.context("applying schema DDL")?;
44-
Ok(Arc::new(Mutex::new(conn)))
62+
Ok(DbHandle::new(conn))
4563
}
4664

47-
/// Run a synchronous DB operation on the blocking pool, holding the connection
48-
/// mutex for the duration of the call.
65+
/// Run a synchronous DB operation on the blocking pool using a task-local
66+
/// DuckDB connection cloned from the shared database handle.
4967
pub async fn run_blocking<F, T>(handle: &DbHandle, f: F) -> Result<T>
5068
where
5169
F: FnOnce(&mut Connection) -> Result<T> + Send + 'static,
5270
T: Send + 'static,
5371
{
5472
let handle = handle.clone();
5573
tokio::task::spawn_blocking(move || {
56-
let mut guard = handle.blocking_lock();
57-
f(&mut guard)
74+
let mut conn = handle.connection()?;
75+
f(&mut conn)
5876
})
5977
.await
6078
.context("DB task panicked")?

benchmarks-website/server/src/ingest.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ use crate::records::Record;
5757
use crate::records::VectorSearchRun;
5858
use crate::schema::SCHEMA_VERSION;
5959

60+
// Unless we start merging 128 PR every second we are not hitting this max.
61+
const WRITE_CONFLICT_ATTEMPTS: usize = 128;
62+
6063
/// Successful ingest response body.
6164
#[derive(Debug, Serialize)]
6265
pub struct IngestResponse {
@@ -102,7 +105,38 @@ fn validate_envelope(env: &Envelope) -> Result<(), IngestError> {
102105
Ok(())
103106
}
104107

108+
fn retry_write_conflicts<F, T>(mut op: F) -> Result<T>
109+
where
110+
F: FnMut() -> Result<T>,
111+
{
112+
for attempt in 1..=WRITE_CONFLICT_ATTEMPTS {
113+
match op() {
114+
Ok(value) => return Ok(value),
115+
Err(err) if attempt < WRITE_CONFLICT_ATTEMPTS && is_retryable_write_conflict(&err) => {
116+
std::thread::yield_now();
117+
}
118+
Err(err) => return Err(err),
119+
}
120+
}
121+
unreachable!("loop either returns a value or the final error")
122+
}
123+
124+
fn is_retryable_write_conflict(err: &anyhow::Error) -> bool {
125+
err.chain().any(|cause| {
126+
let message = cause.to_string().to_ascii_lowercase();
127+
message.contains("conflict")
128+
&& (message.contains("transaction")
129+
|| message.contains("write")
130+
|| message.contains("tuple")
131+
|| message.contains("update"))
132+
})
133+
}
134+
105135
fn apply_envelope(conn: &mut Connection, env: Envelope) -> Result<IngestResponse> {
136+
retry_write_conflicts(|| apply_envelope_once(conn, &env))
137+
}
138+
139+
fn apply_envelope_once(conn: &mut Connection, env: &Envelope) -> Result<IngestResponse> {
106140
let tx = conn.transaction().context("begin transaction")?;
107141

108142
upsert_commit(&tx, &env.commit).context("upsert commit")?;

benchmarks-website/server/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
//! |---------------|---------------------------------------------------------------------------------------------|
3636
//! | [`app`] | [`app::AppState`] (DB handle + bearer + path) and the Axum router composition. |
3737
//! | [`auth`] | Bearer-token middleware for `/api/ingest`. |
38-
//! | [`db`] | [`db::DbHandle`] connection wrapper + the per-fact-table `measurement_id_*` hash functions. |
38+
//! | [`db`] | [`db::DbHandle`] task-local connection cloning + the per-fact-table `measurement_id_*` hash functions. |
3939
//! | [`schema`] | DuckDB DDL ([`schema::SCHEMA_DDL`]) and the wire schema version. |
4040
//! | [`records`] | Wire shapes for `POST /api/ingest`. |
4141
//! | [`ingest`] | `POST /api/ingest` handler — envelope validation, transaction, upsert dispatch. |
@@ -51,8 +51,8 @@
5151
//! routes skip auth.
5252
//! 3. The handler parses body / path / query into typed inputs (e.g.
5353
//! [`slug::ChartKey::from_slug`]).
54-
//! 4. The handler hands a closure to [`db::run_blocking`], which acquires
55-
//! the connection mutex and runs the synchronous DuckDB call on
54+
//! 4. The handler hands a closure to [`db::run_blocking`], which clones a
55+
//! task-local DuckDB connection and runs the synchronous call on
5656
//! `tokio::task::spawn_blocking` so the runtime stays free.
5757
//! 5. The closure returns `Result<T, anyhow::Error>`. Errors are mapped
5858
//! into [`error::IngestError`] / [`error::ApiError`] with the right

0 commit comments

Comments
 (0)