diff --git a/syncstorage-db/src/tests/batch.rs b/syncstorage-db/src/tests/batch.rs index 102ae2eef6..3511acf867 100644 --- a/syncstorage-db/src/tests/batch.rs +++ b/syncstorage-db/src/tests/batch.rs @@ -420,6 +420,53 @@ async fn test_append_upsert_overwrites_same_batch_bso_id() -> Result<(), DbError .await } +#[tokio::test] +async fn test_quota_pending_batch_accumulates() -> Result<(), DbError> { + let settings = Settings::test_settings(); + if !settings.syncstorage.enable_quota { + debug!("[test] Skipping test"); + return Ok(()); + } + + with_test_transaction( + settings.syncstorage, + async |db: &mut dyn Db| { + let uid = 9001; + let coll = "clients"; + + let chunk = 3838; + let payload = "z".repeat(chunk); + // hits quota on second append. + db.set_quota(true, chunk + chunk / 2, true); + + let new_batch = db.create_batch(cb(uid, coll, vec![])).await?; + + db.append_to_batch(ab( + uid, + coll, + new_batch.clone(), + vec![postbso("b0", Some(&payload), None, None)], + )) + .await?; + + let result = db + .append_to_batch(ab( + uid, + coll, + new_batch.clone(), + vec![postbso("b1", Some(&payload), None, None)], + )) + .await; + assert!( + result.as_ref().err().is_some_and(|e| e.is_quota()), + "expected an over-quota error, got {result:?}" + ); + Ok(()) + }, + ) + .await +} + #[tokio::test] async fn test_commit_batch_partial_overlap() -> Result<(), DbError> { let settings = Settings::test_settings().syncstorage; diff --git a/syncstorage-db/src/tests/db.rs b/syncstorage-db/src/tests/db.rs index 7206cbdfc5..4ae0c48044 100644 --- a/syncstorage-db/src/tests/db.rs +++ b/syncstorage-db/src/tests/db.rs @@ -654,11 +654,8 @@ async fn get_collection_usage() -> Result<(), DbError> { collection: "ignored".to_owned(), }) .await?; - assert_eq!( - &(quota.total_bytes as i64), - expected.get("bookmarks").unwrap() - ); - assert_eq!(quota.count, 5); // 3 collections, 5 records + assert_eq!(&(quota.total_bytes as i64), &sum); + assert_eq!(quota.count, 15); // 3 collections of 5 records } Ok(()) }) diff --git a/syncstorage-mysql/src/db/batch_impl.rs b/syncstorage-mysql/src/db/batch_impl.rs index 88a27dcc2c..f5df34cac5 100644 --- a/syncstorage-mysql/src/db/batch_impl.rs +++ b/syncstorage-mysql/src/db/batch_impl.rs @@ -1,5 +1,5 @@ use base64::Engine; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use diesel::{ @@ -69,11 +69,25 @@ impl BatchDb for MysqlDb { } } - do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?; - Ok(results::CreateBatch { + let batch = results::CreateBatch { id: encode_id(batch_id), - size: None, - }) + // `size` is the committed size + size: self + .check_quota(¶ms.user_id, ¶ms.collection) + .await?, + }; + + do_append( + self, + params.user_id, + collection_id, + batch.clone(), + params.bsos, + ¶ms.collection, + ) + .await?; + + Ok(batch) } async fn validate_batch(&mut self, params: params::ValidateBatch) -> DbResult { @@ -110,9 +124,24 @@ impl BatchDb for MysqlDb { return Err(DbError::batch_not_found()); } - let batch_id = decode_id(¶ms.batch.id)?; let collection_id = self._get_collection_id(¶ms.collection).await?; - do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?; + + // `batch.size` is the committed size + let mut batch = params.batch; + batch.size = self + .check_quota(¶ms.user_id, ¶ms.collection) + .await?; + + do_append( + self, + params.user_id, + collection_id, + batch, + params.bsos, + ¶ms.collection, + ) + .await?; + Ok(()) } @@ -191,10 +220,11 @@ impl BatchDb for MysqlDb { pub async fn do_append( db: &mut MysqlDb, - batch_id: i64, user_id: UserIdentifier, _collection_id: i32, + batch: results::CreateBatch, bsos: Vec, + collection: &str, ) -> DbResult<()> { fn exist_idx(user_id: u64, batch_id: i64, bso_id: &str) -> String { // Construct something that matches the key for batch_upload_items @@ -206,6 +236,32 @@ pub async fn do_append( ) } + let batch_id = decode_id(&batch.id)?; + let uid = user_id.legacy_id as i64; + + if db.quota.enabled + && let Some(size) = batch.size + { + let running_size: usize = bsos + .iter() + .filter_map(|bso| bso.payload.as_ref().map(|p| p.len())) + .sum(); + let incoming_ids: Vec = bsos.iter().map(|bso| bso.id.clone()).collect(); + let pending_size = pending_batch_size(db, uid, batch_id, &incoming_ids).await?; + let projected_total = size + pending_size + running_size; + if projected_total >= db.quota.size { + let mut tags = HashMap::default(); + tags.insert("collection".to_owned(), collection.to_owned()); + db.metrics.incr_with_tags("storage.quota.at_limit", tags); + if db.quota.enforced { + return Err(DbError::quota()); + } else { + warn!("Quota at limit for user ({} bytes)", projected_total; + "collection" => collection); + } + } + } + // It's possible for the list of items to contain a duplicate key entry. // This means that we can't really call `ON DUPLICATE` here, because that's // more about inserting one item at a time. (e.g. it works great if the @@ -281,6 +337,25 @@ pub async fn do_append( Ok(()) } +/// Sum the pending payload bytes, excluding any ids in `incoming_ids`. +async fn pending_batch_size( + db: &mut MysqlDb, + user_id: i64, + batch_id: i64, + incoming_ids: &[String], +) -> DbResult { + let current_batch_size: i64 = batch_upload_items::table + .select(sql::("COALESCE(SUM(payload_size), 0)")) + .filter(batch_upload_items::user_id.eq(user_id)) + .filter(batch_upload_items::batch_id.eq(batch_id)) + .filter(batch_upload_items::id.ne_all(incoming_ids)) + .get_result(&mut db.conn) + .await + .optional()? + .unwrap_or_default(); + Ok(current_batch_size.max(0) as usize) +} + pub fn validate_batch_id(id: &str) -> DbResult<()> { decode_id(id).map(|_| ()) } diff --git a/syncstorage-mysql/src/db/db_impl.rs b/syncstorage-mysql/src/db/db_impl.rs index 2905eb01f5..3cda4432e3 100644 --- a/syncstorage-mysql/src/db/db_impl.rs +++ b/syncstorage-mysql/src/db/db_impl.rs @@ -210,7 +210,7 @@ impl Db for MysqlDb { if self.quota.enforced { return Err(DbError::quota()); } else { - warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone()); + warn!("Quota at limit for user ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone()); } } } @@ -682,14 +682,12 @@ impl Db for MysqlDb { params: params::GetQuotaUsage, ) -> DbResult { let uid = params.user_id.legacy_id as i64; - let collection_id = self._get_collection_id(¶ms.collection).await?; let (total_bytes, count): (i64, i32) = user_collections::table .select(( sql::("COALESCE(SUM(COALESCE(total_bytes, 0)), 0)"), sql::("COALESCE(SUM(COALESCE(count, 0)), 0)"), )) .filter(user_collections::user_id.eq(uid)) - .filter(user_collections::collection_id.eq(collection_id)) .get_result(&mut self.conn) .await .optional()? diff --git a/syncstorage-mysql/src/db/mod.rs b/syncstorage-mysql/src/db/mod.rs index e386d7aa4f..4e25472f6f 100644 --- a/syncstorage-mysql/src/db/mod.rs +++ b/syncstorage-mysql/src/db/mod.rs @@ -9,7 +9,7 @@ use diesel::{ use diesel_async::RunQueryDsl; use syncserver_common::Metrics; use syncstorage_db_common::{ - FIRST_CUSTOM_COLLECTION_ID, UserIdentifier, error::DbErrorIntrospect, results, + Db, FIRST_CUSTOM_COLLECTION_ID, UserIdentifier, error::DbErrorIntrospect, params, results, util::SyncTimestamp, }; use syncstorage_settings::Quota; @@ -237,6 +237,34 @@ impl MysqlDb { count, }) } + + async fn check_quota( + &mut self, + user_id: &UserIdentifier, + collection: &str, + ) -> DbResult> { + if !self.quota.enabled { + return Ok(None); + } + let usage = self + .get_quota_usage(params::GetQuotaUsage { + user_id: user_id.clone(), + collection: collection.to_owned(), + }) + .await?; + if usage.total_bytes >= self.quota.size { + let mut tags = HashMap::default(); + tags.insert("collection".to_owned(), collection.to_owned()); + self.metrics.incr_with_tags("storage.quota.at_limit", tags); + if self.quota.enforced { + return Err(DbError::quota()); + } else { + warn!("Quota at limit for user ({} bytes)", usage.total_bytes; + "collection" => collection); + } + } + Ok(Some(usage.total_bytes)) + } } #[allow(dead_code)] // Not really dead, Rust can't see the use above diff --git a/syncstorage-postgres/src/db/batch_impl.rs b/syncstorage-postgres/src/db/batch_impl.rs index d9cdbff3dc..828d54bb75 100644 --- a/syncstorage-postgres/src/db/batch_impl.rs +++ b/syncstorage-postgres/src/db/batch_impl.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use async_trait::async_trait; use diesel::{ self, ExpressionMethods, OptionalExtension, QueryDsl, delete, @@ -45,7 +47,10 @@ impl BatchDb for PgDb { let batch = results::CreateBatch { id: batch_id.to_string(), - size: None, + // `size` is the committed size + size: self + .check_quota(¶ms.user_id, ¶ms.collection) + .await?, }; do_append( @@ -54,6 +59,7 @@ impl BatchDb for PgDb { collection_id, batch.clone(), params.bsos, + ¶ms.collection, ) .await?; @@ -100,12 +106,19 @@ impl BatchDb for PgDb { let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; + // `batch.size` is the committed size + let mut batch = params.batch; + batch.size = self + .check_quota(¶ms.user_id, ¶ms.collection) + .await?; + do_append( self, params.user_id, collection_id, - params.batch, + batch, params.bsos, + ¶ms.collection, ) .await?; @@ -132,13 +145,7 @@ impl BatchDb for PgDb { let user_id = params.user_id.legacy_id as i64; let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; - let timestamp = self - .update_collection(params::UpdateCollection { - user_id: params.user_id.clone(), - collection_id, - collection: params.collection.clone(), - }) - .await?; + let timestamp = self.checked_timestamp()?; let default_ttl_seconds = DEFAULT_BSO_TTL as i64; let ts_datetime = timestamp.as_datetime()?; @@ -151,6 +158,14 @@ impl BatchDb for PgDb { .execute(&mut self.conn) .await?; + let timestamp = self + .update_collection(params::UpdateCollection { + user_id: params.user_id.clone(), + collection_id, + collection: params.collection.clone(), + }) + .await?; + self.delete_batch(params::DeleteBatch { user_id: params.user_id, collection: params.collection, @@ -194,14 +209,39 @@ pub async fn do_append( collection_id: i32, batch: results::CreateBatch, bsos: Vec, + collection: &str, ) -> DbResult<()> { let batch_id = Uuid::parse_str(&batch.id) .map_err(|e| DbError::internal(format!("Invalid batch_id in batch: {}", e)))?; + let user_id_i64 = user_id.legacy_id as i64; + + if db.quota.enabled + && let Some(size) = batch.size + { + let running_size: usize = bsos + .iter() + .filter_map(|bso| bso.payload.as_ref().map(|p| p.len())) + .sum(); + let incoming_ids: Vec = bsos.iter().map(|bso| bso.id.clone()).collect(); + let pending_size = + pending_batch_size(db, user_id_i64, collection_id, &batch_id, &incoming_ids).await?; + let projected_total = size + pending_size + running_size; + if projected_total >= db.quota.size { + let mut tags = HashMap::default(); + tags.insert("collection".to_owned(), collection.to_owned()); + db.metrics.incr_with_tags("storage.quota.at_limit", tags); + if db.quota.enforced { + return Err(DbError::quota()); + } else { + warn!("Quota at limit for user ({} bytes)", projected_total; + "collection" => collection); + } + } + } for bso in bsos { let ttl = bso.ttl.map(|t| t as i64); let sortindex = bso.sortindex; - let user_id_i64 = user_id.legacy_id as i64; sql_query( "INSERT INTO batch_bsos (user_id, collection_id, batch_id, batch_bso_id, sortindex, payload, ttl) @@ -225,6 +265,29 @@ pub async fn do_append( Ok(()) } +/// Sum the pending payload bytes, excluding any ids in `incoming_ids`. +async fn pending_batch_size( + db: &mut PgDb, + user_id: i64, + collection_id: i32, + batch_id: &Uuid, + incoming_ids: &[String], +) -> DbResult { + let current_batch_size: i64 = batch_bsos::table + .select(sql::( + "COALESCE(SUM(LENGTH(COALESCE(payload, ''))), 0)::BIGINT", + )) + .filter(batch_bsos::user_id.eq(user_id)) + .filter(batch_bsos::collection_id.eq(collection_id)) + .filter(batch_bsos::batch_id.eq(batch_id)) + .filter(batch_bsos::batch_bso_id.ne_all(incoming_ids)) + .get_result(&mut db.conn) + .await + .optional()? + .unwrap_or_default(); + Ok(current_batch_size.max(0) as usize) +} + pub fn validate_batch_id(id: &str) -> DbResult { Uuid::parse_str(id).map_err(|e| DbError::internal(format!("Invalid batch_id: {}", e))) } diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index 87d17d4c12..3012542905 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -265,14 +265,12 @@ impl Db for PgDb { &mut self, params: params::GetQuotaUsage, ) -> DbResult { - let collection_id = self._get_collection_id(¶ms.collection).await?; let (total_bytes, count): (i64, i64) = user_collections::table .select(( sql::("COALESCE(SUM(COALESCE(total_bytes, 0)), 0)::BIGINT"), sql::("COALESCE(SUM(COALESCE(count, 0)), 0)::BIGINT"), )) .filter(user_collections::user_id.eq(params.user_id.legacy_id as i64)) - .filter(user_collections::collection_id.eq(collection_id)) .get_result(&mut self.conn) .await .optional()? diff --git a/syncstorage-postgres/src/db/mod.rs b/syncstorage-postgres/src/db/mod.rs index 8fed555c29..dabe154ead 100644 --- a/syncstorage-postgres/src/db/mod.rs +++ b/syncstorage-postgres/src/db/mod.rs @@ -248,7 +248,7 @@ impl PgDb { if self.quota.enforced { return Err(DbError::quota()); } else { - warn!("Quota at limit for user's collection: ({} bytes)", usage.total_bytes; + warn!("Quota at limit for user ({} bytes)", usage.total_bytes; "collection" => collection); } } diff --git a/syncstorage-settings/src/lib.rs b/syncstorage-settings/src/lib.rs index b1680291f0..76e2171add 100644 --- a/syncstorage-settings/src/lib.rs +++ b/syncstorage-settings/src/lib.rs @@ -144,11 +144,6 @@ impl Settings { if self.uses_spanner() { self.limits.max_total_bytes = min(self.limits.max_total_bytes, MAX_SPANNER_LOAD_SIZE as u32); - } else { - // No quotas for stand alone servers - self.limits.max_quota_limit = 0; - self.enable_quota = false; - self.enforce_quota = false; } } diff --git a/syncstorage-spanner/src/db/batch_impl.rs b/syncstorage-spanner/src/db/batch_impl.rs index 1b4b02bdba..ab214a7aa3 100644 --- a/syncstorage-spanner/src/db/batch_impl.rs +++ b/syncstorage-spanner/src/db/batch_impl.rs @@ -82,13 +82,11 @@ impl BatchDb for SpannerDb { metrics.start_timer("storage.spanner.append_items_to_batch", None); let collection_id = self._get_collection_id(¶ms.collection).await?; - let current_size = self + // `batch.size` is the committed size + let mut batch = params.batch; + batch.size = self .check_quota(¶ms.user_id, ¶ms.collection) .await?; - let mut batch = params.batch; - if let Some(size) = current_size { - batch.size = Some(size + batch.size.unwrap_or(0)); - } // confirm that this batch exists or has not yet been committed. let exists = self @@ -250,10 +248,13 @@ pub async fn do_append( // Build an ARRAY of incoming rows for a `INSERT OR UPDATE`. COALESCE(new, existing) // is used so an update only overwrites fields the request supplied. let mut rows: Vec = Vec::with_capacity(bsos.len()); + // The incoming ids are used to exclude the payloads from the running total. + let mut incoming_ids: Vec = Vec::with_capacity(bsos.len()); for bso in bsos { if let Some(ref payload) = bso.payload { running_size += payload.len(); } + incoming_ids.push(bso.id.clone()); let sortindex = bso .sortindex .map(IntoSpannerValue::into_spanner_value) @@ -285,12 +286,16 @@ pub async fn do_append( if db.quota.enabled && let Some(size) = batch.size - && size + running_size >= db.quota.size { - if db.quota.enforced { - return Err(db.quota_error(collection)); - } else { - warn!("Quota at limit for user's collection ({} bytes)", size + running_size; "collection"=>collection); + let pending_size = + pending_batch_size(db, &user_id, collection_id, &batch.id, &incoming_ids).await?; + let projected_total = size + pending_size + running_size; + if projected_total >= db.quota.size { + if db.quota.enforced { + return Err(db.quota_error(collection)); + } else { + warn!("Quota at limit for user ({} bytes)", projected_total; "collection"=>collection); + } } } @@ -363,6 +368,46 @@ pub async fn do_append( Ok(()) } +/// Sum the pending payload bytes, excluding any ids in `incoming_ids`. +async fn pending_batch_size( + db: &mut SpannerDb, + user_id: &UserIdentifier, + collection_id: i32, + batch_id: &str, + incoming_ids: &[String], +) -> DbResult { + let (mut sqlparams, mut sqlparam_types) = params! { + "fxa_uid" => user_id.fxa_uid.clone(), + "fxa_kid" => user_id.fxa_kid.clone(), + "collection_id" => collection_id, + "batch_id" => batch_id.to_owned(), + }; + let incoming_ids = incoming_ids.to_vec(); + sqlparam_types.insert("incoming_ids".to_owned(), incoming_ids.spanner_type()); + sqlparams.insert("incoming_ids".to_owned(), incoming_ids.into_spanner_value()); + let result = db + .sql( + "SELECT COALESCE(SUM(BYTE_LENGTH(payload)), 0) + FROM batch_bsos + WHERE fxa_uid = @fxa_uid + AND fxa_kid = @fxa_kid + AND collection_id = @collection_id + AND batch_id = @batch_id + AND batch_bso_id NOT IN UNNEST(@incoming_ids)", + ) + .await? + .params(sqlparams) + .param_types(sqlparam_types) + .execute(&db.conn)? + .one() + .await?; + let bytes = result[0] + .get_string_value() + .parse::() + .map_err(|e| DbError::integrity(e.to_string()))?; + Ok(bytes) +} + /// Ensure a parent row exists in user_collections prior to creating a child /// row in the batches table. /// diff --git a/syncstorage-spanner/src/db/db_impl.rs b/syncstorage-spanner/src/db/db_impl.rs index c120f09cce..21cbcaf7f4 100644 --- a/syncstorage-spanner/src/db/db_impl.rs +++ b/syncstorage-spanner/src/db/db_impl.rs @@ -411,16 +411,14 @@ impl Db for SpannerDb { if !self.quota.enabled { return Ok(results::GetQuotaUsage::default()); } - let collection_id = self._get_collection_id(¶ms.collection).await?; - let check_sql = "SELECT COALESCE(total_bytes,0), COALESCE(count,0) + let check_sql = "SELECT COALESCE(SUM(COALESCE(total_bytes, 0)), 0), \ + COALESCE(SUM(COALESCE(count, 0)), 0) FROM user_collections WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id"; + AND fxa_kid = @fxa_kid"; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid.clone(), "fxa_kid" => params.user_id.fxa_kid.clone(), - "collection_id" => collection_id, }; let result = self .sql(check_sql) diff --git a/syncstorage-spanner/src/db/mod.rs b/syncstorage-spanner/src/db/mod.rs index ab378ee7aa..316aed3372 100644 --- a/syncstorage-spanner/src/db/mod.rs +++ b/syncstorage-spanner/src/db/mod.rs @@ -557,7 +557,7 @@ impl SpannerDb { if self.quota.enforced { return Err(self.quota_error(collection)); } else { - warn!("Quota at limit for user's collection: ({} bytes)", usage.total_bytes; "collection"=>collection); + warn!("Quota at limit for user ({} bytes)", usage.total_bytes; "collection"=>collection); } } Ok(Some(usage.total_bytes))