From 70e128400adbe58ca5a1d4d1a19642f0e63ca3a6 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 11 Mar 2026 15:45:02 -0700 Subject: [PATCH] feat: adapt PoolState to usage of deadpool everywhere just killing it for deadpool::Status since it provides a public constructor and kill the long unused syncstorage-spanner bb8 based manager Closes STOR-368 --- Cargo.lock | 3 ++ syncserver-db-common/src/lib.rs | 24 ++------- syncserver/Cargo.toml | 1 + syncserver/src/server/mod.rs | 22 +++----- syncserver/src/web/handlers.rs | 51 +++++++++--------- syncstorage-db-common/src/lib.rs | 4 +- syncstorage-db/Cargo.toml | 1 + syncstorage-db/src/lib.rs | 2 +- syncstorage-db/src/mock.rs | 13 +++-- syncstorage-mysql/src/pool.rs | 8 +-- syncstorage-postgres/src/pool.rs | 10 ++-- syncstorage-spanner/src/manager/bb8.rs | 72 -------------------------- syncstorage-spanner/src/manager/mod.rs | 1 - syncstorage-spanner/src/pool.rs | 8 +-- tokenserver-db-common/src/lib.rs | 10 ++-- tokenserver-db/Cargo.toml | 1 + tokenserver-db/src/mock.rs | 13 +++-- tokenserver-mysql/src/pool.rs | 8 +-- tokenserver-postgres/src/pool.rs | 10 ++-- 19 files changed, 90 insertions(+), 172 deletions(-) delete mode 100644 syncstorage-spanner/src/manager/bb8.rs diff --git a/Cargo.lock b/Cargo.lock index cf4f8ec764..952821cdc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3979,6 +3979,7 @@ dependencies = [ "base64", "cadence", "chrono", + "deadpool", "docopt", "futures 0.3.32", "glean", @@ -4078,6 +4079,7 @@ name = "syncstorage-db" version = "0.21.1" dependencies = [ "async-trait", + "deadpool", "env_logger 0.11.9", "lazy_static", "log", @@ -4406,6 +4408,7 @@ version = "0.21.1" dependencies = [ "async-trait", "chrono", + "deadpool", "env_logger 0.11.9", "syncserver-common", "syncserver-db-common", diff --git a/syncserver-db-common/src/lib.rs b/syncserver-db-common/src/lib.rs index dc0027902d..c36b32524e 100644 --- a/syncserver-db-common/src/lib.rs +++ b/syncserver-db-common/src/lib.rs @@ -1,7 +1,7 @@ pub mod error; pub mod test; -use std::{error::Error, fmt::Debug}; +use std::error::Error; #[cfg(debug_assertions)] use diesel::connection::InstrumentationEvent; @@ -14,25 +14,9 @@ use diesel_migrations::{EmbeddedMigrations, MigrationHarness}; use tokio::task::spawn_blocking; /// A trait to be implemented by database pool data structures. It provides an interface to -/// derive the current state of the pool, as represented by the `PoolState` struct. -pub trait GetPoolState { - fn state(&self) -> PoolState; -} - -#[derive(Debug, Default)] -/// A mockable r2d2::State -pub struct PoolState { - pub connections: u32, - pub idle_connections: u32, -} - -impl From for PoolState { - fn from(status: deadpool::Status) -> PoolState { - PoolState { - connections: status.size as u32, - idle_connections: status.available.max(0) as u32, - } - } +/// derive the current status of the pool, as represented by [deadpool::Status] +pub trait GetPoolStatus { + fn status(&self) -> deadpool::Status; } /// Establish an [AsyncConnection] logging diesel queries to the `debug` log diff --git a/syncserver/Cargo.toml b/syncserver/Cargo.toml index f99e46aa18..d3a91d4e8a 100644 --- a/syncserver/Cargo.toml +++ b/syncserver/Cargo.toml @@ -12,6 +12,7 @@ backtrace.workspace = true base64.workspace = true cadence.workspace = true chrono.workspace = true +deadpool.workspace = true docopt.workspace = true futures.workspace = true hex.workspace = true diff --git a/syncserver/src/server/mod.rs b/syncserver/src/server/mod.rs index 64152d7993..2736c4c196 100644 --- a/syncserver/src/server/mod.rs +++ b/syncserver/src/server/mod.rs @@ -19,7 +19,7 @@ use syncserver_common::{ BlockingThreadpool, BlockingThreadpoolMetrics, Metrics, Taggable, middleware::sentry::SentryWrapper, }; -use syncserver_db_common::{GetPoolState, PoolState}; +use syncserver_db_common::GetPoolStatus; use syncserver_settings::Settings; use syncstorage_db::{DbError, DbPool, DbPoolImpl}; use syncstorage_settings::{Deadman, ServerLimits}; @@ -554,7 +554,7 @@ impl FromRequest for MetricsWrapper { } /// Emit database pool and threadpool metrics periodically -fn spawn_metric_periodic_reporter( +fn spawn_metric_periodic_reporter( interval: Duration, metrics: Arc, pool: T, @@ -582,22 +582,14 @@ fn spawn_metric_periodic_reporter( }; loop { - let PoolState { - connections, - idle_connections, - } = pool.state(); + let deadpool::Status { + size, available, .. + } = pool.status(); send_gauge_with_maybe_hostname( "storage.pool.connections.active", - (connections - idle_connections) as u64, - ); - send_gauge_with_maybe_hostname( - "storage.pool.connections.active", - (connections - idle_connections) as u64, - ); - send_gauge_with_maybe_hostname( - "storage.pool.connections.idle", - idle_connections as u64, + (size - available) as u64, ); + send_gauge_with_maybe_hostname("storage.pool.connections.idle", available as u64); let BlockingThreadpoolMetrics { queued_tasks, diff --git a/syncserver/src/web/handlers.rs b/syncserver/src/web/handlers.rs index 3da8cc2ea8..3fc11c4148 100644 --- a/syncserver/src/web/handlers.rs +++ b/syncserver/src/web/handlers.rs @@ -846,34 +846,37 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result { let db_state = if cfg!(test) { use actix_web::http::header::HeaderValue; use std::str::FromStr; - use syncstorage_db::PoolState; - - PoolState { - connections: u32::from_str( - req.headers() - .get("TEST_CONNECTIONS") - .unwrap_or(&HeaderValue::from_static("0")) - .to_str() - .unwrap_or("0"), - ) - .unwrap_or_default(), - idle_connections: u32::from_str( - req.headers() - .get("TEST_IDLES") - .unwrap_or(&HeaderValue::from_static("0")) - .to_str() - .unwrap_or("0"), - ) - .unwrap_or_default(), + + let size = usize::from_str( + req.headers() + .get("TEST_CONNECTIONS") + .unwrap_or(&HeaderValue::from_static("0")) + .to_str() + .unwrap_or("0"), + ) + .unwrap_or_default(); + let available = usize::from_str( + req.headers() + .get("TEST_IDLES") + .unwrap_or(&HeaderValue::from_static("0")) + .to_str() + .unwrap_or("0"), + ) + .unwrap_or_default(); + deadpool::Status { + max_size: size, + size, + available, + waiting: 0, } } else { - state.db_pool.clone().state() + state.db_pool.status() }; - let active = db_state.connections - db_state.idle_connections; + let active = db_state.size - db_state.available; let mut status_code = StatusCode::OK; - if active >= deadman.max_size && db_state.idle_connections == 0 { + if active >= deadman.max_size as usize && db_state.available == 0 { if deadman.clock_start.is_none() { deadman.clock_start = Some(Instant::now()); } @@ -881,14 +884,14 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result { } else if deadman.clock_start.is_some() { deadman.clock_start = None } - deadman.previous_count = db_state.idle_connections as usize; + deadman.previous_count = db_state.available; { *deadarc.write().await = deadman; } resp.insert("active_connections".to_string(), Value::from(active)); resp.insert( "idle_connections".to_string(), - Value::from(db_state.idle_connections), + Value::from(db_state.available), ); if let Some(clock) = deadman.clock_start { let duration: Duration = Instant::now() - clock; diff --git a/syncstorage-db-common/src/lib.rs b/syncstorage-db-common/src/lib.rs index 0b89e1a28c..bd500f132c 100644 --- a/syncstorage-db-common/src/lib.rs +++ b/syncstorage-db-common/src/lib.rs @@ -10,7 +10,7 @@ use std::fmt::Debug; use async_trait::async_trait; use lazy_static::lazy_static; use serde::Deserialize; -use syncserver_db_common::GetPoolState; +use syncserver_db_common::GetPoolStatus; use error::DbErrorIntrospect; use util::SyncTimestamp; @@ -49,7 +49,7 @@ pub const DEFAULT_BSO_TTL: u32 = 2_100_000_000; pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101; #[async_trait] -pub trait DbPool: Sync + Send + Debug + GetPoolState { +pub trait DbPool: Sync + Send + Debug + GetPoolStatus { type Error; async fn init(&mut self) -> Result<(), Self::Error> { diff --git a/syncstorage-db/Cargo.toml b/syncstorage-db/Cargo.toml index 231f8b7992..acc2e1326b 100644 --- a/syncstorage-db/Cargo.toml +++ b/syncstorage-db/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] async-trait.workspace = true +deadpool.workspace = true env_logger.workspace = true lazy_static.workspace = true log.workspace = true diff --git a/syncstorage-db/src/lib.rs b/syncstorage-db/src/lib.rs index 87bd22fc79..376fe2856b 100644 --- a/syncstorage-db/src/lib.rs +++ b/syncstorage-db/src/lib.rs @@ -29,7 +29,7 @@ pub use syncstorage_spanner::DbError; #[cfg(feature = "spanner")] pub type DbImpl = syncstorage_spanner::SpannerDb; -pub use syncserver_db_common::{GetPoolState, PoolState}; +pub use syncserver_db_common::GetPoolStatus; pub use syncstorage_db_common::error::DbErrorIntrospect; pub use syncstorage_db_common::{ diff --git a/syncstorage-db/src/mock.rs b/syncstorage-db/src/mock.rs index 9bd0e389f3..84a8ba9903 100644 --- a/syncstorage-db/src/mock.rs +++ b/syncstorage-db/src/mock.rs @@ -1,7 +1,7 @@ //! Mock db implementation with methods stubbed to return default values. #![allow(clippy::new_without_default)] use async_trait::async_trait; -use syncserver_db_common::{GetPoolState, PoolState}; +use syncserver_db_common::GetPoolStatus; #[cfg(debug_assertions)] use syncstorage_db_common::util::SyncTimestamp; use syncstorage_db_common::{BatchDb, Db, DbPool, params, results}; @@ -34,9 +34,14 @@ impl DbPool for MockDbPool { } } -impl GetPoolState for MockDbPool { - fn state(&self) -> PoolState { - PoolState::default() +impl GetPoolStatus for MockDbPool { + fn status(&self) -> deadpool::Status { + deadpool::Status { + max_size: 0, + size: 0, + available: 0, + waiting: 0, + } } } diff --git a/syncstorage-mysql/src/pool.rs b/syncstorage-mysql/src/pool.rs index da9166bb68..a86adf77b9 100644 --- a/syncstorage-mysql/src/pool.rs +++ b/syncstorage-mysql/src/pool.rs @@ -20,7 +20,7 @@ use syncserver_common::{BlockingThreadpool, Metrics}; #[cfg(debug_assertions)] use syncserver_db_common::test::test_transaction_hook; use syncserver_db_common::{ - GetPoolState, PoolState, establish_connection_with_logging, manager_config_with_logging, + GetPoolStatus, establish_connection_with_logging, manager_config_with_logging, run_embedded_migrations, }; use syncstorage_db_common::{Db, DbPool, STD_COLLS}; @@ -170,9 +170,9 @@ impl fmt::Debug for MysqlDbPool { } } -impl GetPoolState for MysqlDbPool { - fn state(&self) -> PoolState { - self.pool.status().into() +impl GetPoolStatus for MysqlDbPool { + fn status(&self) -> deadpool::Status { + self.pool.status() } } diff --git a/syncstorage-postgres/src/pool.rs b/syncstorage-postgres/src/pool.rs index 6f8b953219..51d79c7e12 100644 --- a/syncstorage-postgres/src/pool.rs +++ b/syncstorage-postgres/src/pool.rs @@ -19,9 +19,7 @@ use diesel_migrations::{EmbeddedMigrations, embed_migrations}; use syncserver_common::{BlockingThreadpool, Metrics}; #[cfg(debug_assertions)] use syncserver_db_common::test::test_transaction_hook; -use syncserver_db_common::{ - GetPoolState, PoolState, manager_config_with_logging, run_embedded_migrations, -}; +use syncserver_db_common::{GetPoolStatus, manager_config_with_logging, run_embedded_migrations}; use syncstorage_db_common::{Db, DbPool, STD_COLLS}; use syncstorage_settings::{Quota, Settings}; @@ -168,9 +166,9 @@ impl fmt::Debug for PgDbPool { } } -impl GetPoolState for PgDbPool { - fn state(&self) -> PoolState { - self.pool.status().into() +impl GetPoolStatus for PgDbPool { + fn status(&self) -> deadpool::Status { + self.pool.status() } } diff --git a/syncstorage-spanner/src/manager/bb8.rs b/syncstorage-spanner/src/manager/bb8.rs deleted file mode 100644 index cb551e2128..0000000000 --- a/syncstorage-spanner/src/manager/bb8.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::marker::PhantomData; -use std::{fmt, sync::Arc}; - -use async_trait::async_trait; -use bb8::{ManageConnection, PooledConnection}; -use grpcio::{EnvBuilder, Environment}; - -use crate::{ - db::{ - error::{DbError, DbErrorKind}, - PoolState, - }, - server::Metrics, - settings::Settings, -}; - -use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession}; - -#[allow(dead_code)] -pub type Conn<'a> = PooledConnection<'a, SpannerSessionManager>; - -pub(super) struct SpannerSessionManager { - database_name: String, - /// The gRPC environment - env: Arc, - metrics: Metrics, - test_transactions: bool, - phantom: PhantomData, -} - -impl fmt::Debug for SpannerSessionManager { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("bb8::SpannerSessionManager") - .field("database_name", &self.database_name) - .field("test_transactions", &self.test_transactions) - .finish() - } -} - -#[async_trait] -impl ManageConnection for SpannerSessionManager { - type Connection = SpannerSession; - type Error = DbError; - - async fn connect(&self) -> Result { - create_spanner_session( - Arc::clone(&self.env), - self.metrics.clone(), - &self.database_name, - self.test_transactions, - ) - .await - } - - async fn is_valid(&self, mut conn: Self::Connection) -> Result { - recycle_spanner_session(&mut conn, &self.database_name).await?; - Ok(conn) - } - - fn has_broken(&self, _conn: &mut Self::Connection) -> bool { - false - } -} - -impl From for PoolState { - fn from(state: bb8::State) -> PoolState { - PoolState { - connections: state.connections, - idle_connections: state.idle_connections, - } - } -} diff --git a/syncstorage-spanner/src/manager/mod.rs b/syncstorage-spanner/src/manager/mod.rs index 55de797496..cc59e77fb6 100644 --- a/syncstorage-spanner/src/manager/mod.rs +++ b/syncstorage-spanner/src/manager/mod.rs @@ -1,4 +1,3 @@ -// mod bb8; mod deadpool; mod session; diff --git a/syncstorage-spanner/src/pool.rs b/syncstorage-spanner/src/pool.rs index 3a90b94a4d..229600c84f 100644 --- a/syncstorage-spanner/src/pool.rs +++ b/syncstorage-spanner/src/pool.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; use actix_web::rt; use async_trait::async_trait; use syncserver_common::{BlockingThreadpool, Metrics}; -use syncserver_db_common::{GetPoolState, PoolState}; +use syncserver_db_common::GetPoolStatus; use syncstorage_db_common::{Db, DbPool, STD_COLLS}; use syncstorage_settings::{Quota, Settings}; use tokio::sync::RwLock; @@ -135,9 +135,9 @@ impl DbPool for SpannerDbPool { } } -impl GetPoolState for SpannerDbPool { - fn state(&self) -> PoolState { - self.pool.status().into() +impl GetPoolStatus for SpannerDbPool { + fn status(&self) -> deadpool::Status { + self.pool.status() } } diff --git a/tokenserver-db-common/src/lib.rs b/tokenserver-db-common/src/lib.rs index 52d1ac1e6d..e338a347f3 100644 --- a/tokenserver-db-common/src/lib.rs +++ b/tokenserver-db-common/src/lib.rs @@ -10,7 +10,7 @@ use std::{cmp, time::Duration}; use async_trait::async_trait; use chrono::Utc; use syncserver_common::Metrics; -use syncserver_db_common::{GetPoolState, PoolState}; +use syncserver_db_common::GetPoolStatus; pub use crate::error::DbError; @@ -21,7 +21,7 @@ pub type DbResult = Result; pub const MAX_GENERATION: i64 = i64::MAX; #[async_trait(?Send)] -pub trait DbPool: Sync + Send + GetPoolState { +pub trait DbPool: Sync + Send + GetPoolStatus { async fn init(&mut self) -> DbResult<()>; async fn get(&self) -> DbResult>; @@ -29,9 +29,9 @@ pub trait DbPool: Sync + Send + GetPoolState { fn box_clone(&self) -> Box; } -impl GetPoolState for Box { - fn state(&self) -> PoolState { - (**self).state() +impl GetPoolStatus for Box { + fn status(&self) -> deadpool::Status { + (**self).status() } } diff --git a/tokenserver-db/Cargo.toml b/tokenserver-db/Cargo.toml index 52957f560b..c55b77f95a 100644 --- a/tokenserver-db/Cargo.toml +++ b/tokenserver-db/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] async-trait.workspace = true chrono.workspace = true +deadpool.workspace = true syncserver-common = { path = "../syncserver-common" } syncserver-db-common = { path = "../syncserver-db-common" } tokenserver-db-common = { path = "../tokenserver-db-common" } diff --git a/tokenserver-db/src/mock.rs b/tokenserver-db/src/mock.rs index 73f19dd7ba..2e42750a09 100644 --- a/tokenserver-db/src/mock.rs +++ b/tokenserver-db/src/mock.rs @@ -4,7 +4,7 @@ use std::sync::LazyLock; use async_trait::async_trait; use syncserver_common::Metrics; -use syncserver_db_common::{GetPoolState, PoolState}; +use syncserver_db_common::GetPoolStatus; use tokenserver_db_common::{Db, DbError, DbPool, params, results}; #[derive(Clone, Debug)] @@ -31,9 +31,14 @@ impl DbPool for MockDbPool { } } -impl GetPoolState for MockDbPool { - fn state(&self) -> PoolState { - PoolState::default() +impl GetPoolStatus for MockDbPool { + fn status(&self) -> deadpool::Status { + deadpool::Status { + max_size: 0, + size: 0, + available: 0, + waiting: 0, + } } } diff --git a/tokenserver-mysql/src/pool.rs b/tokenserver-mysql/src/pool.rs index f6ad2f2e57..26ebaaa4ab 100644 --- a/tokenserver-mysql/src/pool.rs +++ b/tokenserver-mysql/src/pool.rs @@ -13,7 +13,7 @@ use syncserver_common::Metrics; #[cfg(debug_assertions)] use syncserver_db_common::test::test_transaction_hook; use syncserver_db_common::{ - GetPoolState, PoolState, establish_connection_with_logging, manager_config_with_logging, + GetPoolStatus, establish_connection_with_logging, manager_config_with_logging, run_embedded_migrations, }; use tokenserver_db_common::{Db, DbError, DbPool, DbResult, params}; @@ -174,8 +174,8 @@ impl DbPool for TokenserverPool { } } -impl GetPoolState for TokenserverPool { - fn state(&self) -> PoolState { - self.inner.status().into() +impl GetPoolStatus for TokenserverPool { + fn status(&self) -> deadpool::Status { + self.inner.status() } } diff --git a/tokenserver-postgres/src/pool.rs b/tokenserver-postgres/src/pool.rs index a9a051c253..77834f96a5 100644 --- a/tokenserver-postgres/src/pool.rs +++ b/tokenserver-postgres/src/pool.rs @@ -13,9 +13,7 @@ use diesel_migrations::{EmbeddedMigrations, embed_migrations}; use syncserver_common::Metrics; #[cfg(debug_assertions)] use syncserver_db_common::test::test_transaction_hook; -use syncserver_db_common::{ - GetPoolState, PoolState, manager_config_with_logging, run_embedded_migrations, -}; +use syncserver_db_common::{GetPoolStatus, manager_config_with_logging, run_embedded_migrations}; use tokenserver_db_common::{Db, DbError, DbPool, DbResult, params}; use crate::db::TokenserverPgDb; @@ -174,8 +172,8 @@ impl DbPool for TokenserverPgPool { } } -impl GetPoolState for TokenserverPgPool { - fn state(&self) -> PoolState { - self.inner.status().into() +impl GetPoolStatus for TokenserverPgPool { + fn status(&self) -> deadpool::Status { + self.inner.status() } }