Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions syncstorage-db/src/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,53 @@ async fn test_append_upsert_overwrites_same_batch_bso_id() -> Result<(), DbError
.await
}

#[tokio::test]
async fn test_quota_pending_batch_accumulates() -> Result<(), DbError> {
let settings = Settings::test_settings();
if !settings.syncstorage.enable_quota {
debug!("[test] Skipping test");
return Ok(());
}

with_test_transaction(
settings.syncstorage,
async |db: &mut dyn Db<Error = DbError>| {
let uid = 9001;
let coll = "clients";

let chunk = 3838;
let payload = "z".repeat(chunk);
// hits quota on second append.
db.set_quota(true, chunk + chunk / 2, true);

let new_batch = db.create_batch(cb(uid, coll, vec![])).await?;

db.append_to_batch(ab(
uid,
coll,
new_batch.clone(),
vec![postbso("b0", Some(&payload), None, None)],
))
.await?;

let result = db
.append_to_batch(ab(
uid,
coll,
new_batch.clone(),
vec![postbso("b1", Some(&payload), None, None)],
))
.await;
assert!(
result.as_ref().err().is_some_and(|e| e.is_quota()),
"expected an over-quota error, got {result:?}"
);
Ok(())
},
)
.await
}

#[tokio::test]
async fn test_commit_batch_partial_overlap() -> Result<(), DbError> {
let settings = Settings::test_settings().syncstorage;
Expand Down
7 changes: 2 additions & 5 deletions syncstorage-db/src/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,8 @@ async fn get_collection_usage() -> Result<(), DbError> {
collection: "ignored".to_owned(),
})
.await?;
assert_eq!(
&(quota.total_bytes as i64),
expected.get("bookmarks").unwrap()
);
assert_eq!(quota.count, 5); // 3 collections, 5 records
assert_eq!(&(quota.total_bytes as i64), &sum);
assert_eq!(quota.count, 15); // 3 collections of 5 records
}
Ok(())
})
Expand Down
91 changes: 83 additions & 8 deletions syncstorage-mysql/src/db/batch_impl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use base64::Engine;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use async_trait::async_trait;
use diesel::{
Expand Down Expand Up @@ -69,11 +69,25 @@ impl BatchDb for MysqlDb {
}
}

do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(results::CreateBatch {
let batch = results::CreateBatch {
id: encode_id(batch_id),
size: None,
})
// `size` is the committed size
size: self
.check_quota(&params.user_id, &params.collection)
.await?,
};

do_append(
self,
params.user_id,
collection_id,
batch.clone(),
params.bsos,
&params.collection,
)
.await?;

Ok(batch)
}

async fn validate_batch(&mut self, params: params::ValidateBatch) -> DbResult<bool> {
Expand Down Expand Up @@ -110,9 +124,24 @@ impl BatchDb for MysqlDb {
return Err(DbError::batch_not_found());
}

let batch_id = decode_id(&params.batch.id)?;
let collection_id = self._get_collection_id(&params.collection).await?;
do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?;

// `batch.size` is the committed size
let mut batch = params.batch;
batch.size = self
.check_quota(&params.user_id, &params.collection)
.await?;

do_append(
self,
params.user_id,
collection_id,
batch,
params.bsos,
&params.collection,
)
.await?;

Ok(())
}

Expand Down Expand Up @@ -191,10 +220,11 @@ impl BatchDb for MysqlDb {

pub async fn do_append(
db: &mut MysqlDb,
batch_id: i64,
user_id: UserIdentifier,
_collection_id: i32,
batch: results::CreateBatch,
bsos: Vec<params::PostCollectionBso>,
collection: &str,
) -> DbResult<()> {
fn exist_idx(user_id: u64, batch_id: i64, bso_id: &str) -> String {
// Construct something that matches the key for batch_upload_items
Expand All @@ -206,6 +236,32 @@ pub async fn do_append(
)
}

let batch_id = decode_id(&batch.id)?;
let uid = user_id.legacy_id as i64;

if db.quota.enabled
&& let Some(size) = batch.size
{
let running_size: usize = bsos
.iter()
.filter_map(|bso| bso.payload.as_ref().map(|p| p.len()))
.sum();
let incoming_ids: Vec<String> = bsos.iter().map(|bso| bso.id.clone()).collect();
let pending_size = pending_batch_size(db, uid, batch_id, &incoming_ids).await?;
let projected_total = size + pending_size + running_size;
if projected_total >= db.quota.size {
let mut tags = HashMap::default();
tags.insert("collection".to_owned(), collection.to_owned());
db.metrics.incr_with_tags("storage.quota.at_limit", tags);
if db.quota.enforced {
return Err(DbError::quota());
} else {
warn!("Quota at limit for user ({} bytes)", projected_total;
"collection" => collection);
}
}
}

// It's possible for the list of items to contain a duplicate key entry.
// This means that we can't really call `ON DUPLICATE` here, because that's
// more about inserting one item at a time. (e.g. it works great if the
Expand Down Expand Up @@ -281,6 +337,25 @@ pub async fn do_append(
Ok(())
}

/// Sum the pending payload bytes, excluding any ids in `incoming_ids`.
async fn pending_batch_size(
db: &mut MysqlDb,
user_id: i64,
batch_id: i64,
incoming_ids: &[String],
) -> DbResult<usize> {
let current_batch_size: i64 = batch_upload_items::table
.select(sql::<BigInt>("COALESCE(SUM(payload_size), 0)"))
.filter(batch_upload_items::user_id.eq(user_id))
.filter(batch_upload_items::batch_id.eq(batch_id))
.filter(batch_upload_items::id.ne_all(incoming_ids))
.get_result(&mut db.conn)
.await
.optional()?
.unwrap_or_default();
Ok(current_batch_size.max(0) as usize)
}

pub fn validate_batch_id(id: &str) -> DbResult<()> {
decode_id(id).map(|_| ())
}
Expand Down
4 changes: 1 addition & 3 deletions syncstorage-mysql/src/db/db_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl Db for MysqlDb {
if self.quota.enforced {
return Err(DbError::quota());
} else {
warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
warn!("Quota at limit for user ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
}
}
}
Expand Down Expand Up @@ -682,14 +682,12 @@ impl Db for MysqlDb {
params: params::GetQuotaUsage,
) -> DbResult<results::GetQuotaUsage> {
let uid = params.user_id.legacy_id as i64;
let collection_id = self._get_collection_id(&params.collection).await?;
let (total_bytes, count): (i64, i32) = user_collections::table
.select((
sql::<BigInt>("COALESCE(SUM(COALESCE(total_bytes, 0)), 0)"),
sql::<Integer>("COALESCE(SUM(COALESCE(count, 0)), 0)"),
))
.filter(user_collections::user_id.eq(uid))
.filter(user_collections::collection_id.eq(collection_id))
.get_result(&mut self.conn)
.await
.optional()?
Expand Down
30 changes: 29 additions & 1 deletion syncstorage-mysql/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use diesel::{
use diesel_async::RunQueryDsl;
use syncserver_common::Metrics;
use syncstorage_db_common::{
FIRST_CUSTOM_COLLECTION_ID, UserIdentifier, error::DbErrorIntrospect, results,
Db, FIRST_CUSTOM_COLLECTION_ID, UserIdentifier, error::DbErrorIntrospect, params, results,
util::SyncTimestamp,
};
use syncstorage_settings::Quota;
Expand Down Expand Up @@ -237,6 +237,34 @@ impl MysqlDb {
count,
})
}

async fn check_quota(
&mut self,
user_id: &UserIdentifier,
collection: &str,
) -> DbResult<Option<usize>> {
if !self.quota.enabled {
return Ok(None);
}
let usage = self
.get_quota_usage(params::GetQuotaUsage {
user_id: user_id.clone(),
collection: collection.to_owned(),
})
.await?;
if usage.total_bytes >= self.quota.size {
let mut tags = HashMap::default();
tags.insert("collection".to_owned(), collection.to_owned());
self.metrics.incr_with_tags("storage.quota.at_limit", tags);
if self.quota.enforced {
return Err(DbError::quota());
} else {
warn!("Quota at limit for user ({} bytes)", usage.total_bytes;
"collection" => collection);
}
}
Ok(Some(usage.total_bytes))
}
}

#[allow(dead_code)] // Not really dead, Rust can't see the use above
Expand Down
Loading
Loading