Skip to content

Commit b16ef50

Browse files
authored
Merge pull request #2013 from mozilla-services/feat/post_bsos-STOR-423
feat: optimize post_bsos w/ MERGE INTO
2 parents f125cf6 + ee45437 commit b16ef50

5 files changed

Lines changed: 157 additions & 87 deletions

File tree

syncstorage-postgres/migrations/2025-10-20-155711_create_schema/up.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,14 @@ CREATE TABLE batch_bsos (
112112
) ON DELETE CASCADE
113113
);
114114

115+
116+
-- solely used for the ease of passing post_bsos data via a single
117+
-- bind parameter
118+
CREATE TYPE post_bso AS (
119+
bso_id TEXT,
120+
121+
sortindex INTEGER,
122+
payload TEXT,
123+
124+
ttl BIGINT
125+
);

syncstorage-postgres/src/db/db_impl.rs

Lines changed: 49 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ use chrono::{offset::Utc, DateTime, TimeDelta};
33
use diesel::{
44
delete,
55
dsl::{count, max, now, sql},
6-
sql_types::{BigInt, Nullable, Timestamptz},
6+
sql_types::{Array, BigInt, Integer, Nullable, Timestamptz},
77
upsert::excluded,
88
ExpressionMethods, IntoSql, OptionalExtension, QueryDsl, SelectableHelper,
99
};
1010
use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager};
1111
use futures::TryStreamExt;
12-
use std::collections::HashMap;
1312
use syncstorage_db_common::{
1413
error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, DEFAULT_BSO_TTL,
1514
};
@@ -18,7 +17,7 @@ use super::{PgDb, TOMBSTONE};
1817
use crate::{
1918
bsos_query,
2019
db::{CollectionLock, PRETOUCH_DT},
21-
orm_models::BsoChangeset,
20+
orm_models::{sql_types::PostBso, BsoChangeset},
2221
pool::Conn,
2322
schema::{bsos, collections, user_collections},
2423
DbError, DbResult,
@@ -73,6 +72,7 @@ impl Db for PgDb {
7372
&mut self,
7473
params: params::LockCollection,
7574
) -> DbResult<results::LockCollection> {
75+
let user_id = params.user_id.legacy_id as i64;
7676
let collection_id = self
7777
.get_collection_id(&params.collection)
7878
.await
@@ -86,7 +86,6 @@ impl Db for PgDb {
8686
}
8787
})?;
8888

89-
let user_id = params.user_id.legacy_id as i64;
9089
let key = (params.user_id, collection_id);
9190
// If we already have a read or write lock then it's safe to
9291
// use it as-is.
@@ -117,8 +116,8 @@ impl Db for PgDb {
117116
}
118117

119118
async fn lock_for_write(&mut self, params: params::LockCollection) -> DbResult<()> {
120-
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
121119
let user_id = params.user_id.legacy_id as i64;
120+
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
122121
let key = (params.user_id, collection_id);
123122

124123
if let Some(CollectionLock::Read) = self.session.coll_locks.get(&key) {
@@ -353,38 +352,11 @@ impl Db for PgDb {
353352
Ok(results::GetBsoIds { items, offset })
354353
}
355354

356-
async fn post_bsos(&mut self, params: params::PostBsos) -> DbResult<results::PostBsos> {
357-
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
358-
let modified = self.checked_timestamp()?;
359-
360-
self.ensure_user_collection(params.user_id.legacy_id as i64, collection_id)
361-
.await?;
362-
for pbso in params.bsos {
363-
self.put_bso(params::PutBso {
364-
user_id: params.user_id.clone(),
365-
collection: params.collection.clone(),
366-
id: pbso.id.clone(),
367-
payload: pbso.payload,
368-
sortindex: pbso.sortindex,
369-
ttl: pbso.ttl,
370-
})
371-
.await?;
372-
}
373-
self.update_collection(params::UpdateCollection {
374-
user_id: params.user_id,
375-
collection_id,
376-
collection: params.collection,
377-
})
378-
.await?;
379-
380-
Ok(modified)
381-
}
382-
383355
async fn delete_bso(&mut self, params: params::DeleteBso) -> DbResult<results::DeleteBso> {
384-
let user_id = params.user_id.legacy_id;
356+
let user_id = params.user_id.legacy_id as i64;
385357
let collection_id = self.get_collection_id(&params.collection).await?;
386358
let affected_rows = delete(bsos::table)
387-
.filter(bsos::user_id.eq(user_id as i64))
359+
.filter(bsos::user_id.eq(user_id))
388360
.filter(bsos::collection_id.eq(&collection_id))
389361
.filter(bsos::bso_id.eq(params.id))
390362
.filter(bsos::expiry.gt(now))
@@ -437,65 +409,31 @@ impl Db for PgDb {
437409
}
438410

439411
async fn put_bso(&mut self, bso: params::PutBso) -> DbResult<results::PutBso> {
412+
let user_id = bso.user_id.legacy_id as i64;
440413
let collection_id = self.get_or_create_collection_id(&bso.collection).await?;
441-
let user_id: u64 = bso.user_id.legacy_id;
442-
if self.quota.enabled {
443-
let usage = self
444-
.get_quota_usage(params::GetQuotaUsage {
445-
user_id: bso.user_id.clone(),
446-
collection: bso.collection.clone(),
447-
collection_id,
448-
})
449-
.await?;
450-
if usage.total_bytes >= self.quota.size {
451-
let mut tags = HashMap::default();
452-
tags.insert("collection".to_owned(), bso.collection.clone());
453-
self.metrics.incr_with_tags("storage.quota.at_limit", tags);
454-
if self.quota.enforced {
455-
return Err(DbError::quota());
456-
} else {
457-
warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
458-
}
459-
}
460-
}
414+
415+
self.check_quota(&bso.user_id, &bso.collection, collection_id)
416+
.await?;
461417

462418
let payload = bso.payload.as_deref().unwrap_or_default();
463419
let sortindex = bso.sortindex;
464-
let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl);
420+
let ttl = bso.ttl.unwrap_or(DEFAULT_BSO_TTL);
465421

466422
let modified = self.checked_timestamp()?.as_datetime()?;
467423
// Expiry originally required millisecond conversion
468424
let expiry = modified + TimeDelta::seconds(ttl as i64);
469-
470425
// The changeset utilizes Diesel's `AsChangeset` trait.
471426
// This allows selective updates of fields if and only if they are `Some(<T>)`
472427
let changeset = BsoChangeset {
473-
sortindex: if bso.sortindex.is_some() {
474-
sortindex // sortindex is already an Option of type `Option<i32>`
475-
} else {
476-
None
477-
},
478-
payload: if bso.payload.is_some() {
479-
Some(payload)
480-
} else {
481-
None
482-
},
483-
expiry: if bso.ttl.is_some() {
484-
Some(expiry)
485-
} else {
486-
None
487-
},
488-
modified: if bso.payload.is_some() || bso.sortindex.is_some() {
489-
Some(modified)
490-
} else {
491-
None
492-
},
428+
sortindex: bso.sortindex,
429+
payload: bso.payload.as_deref(),
430+
modified: (bso.payload.is_some() || bso.sortindex.is_some()).then_some(modified),
431+
expiry: bso.ttl.map(|_| expiry),
493432
};
494-
self.ensure_user_collection(user_id as i64, collection_id)
495-
.await?;
433+
self.ensure_user_collection(user_id, collection_id).await?;
496434
diesel::insert_into(bsos::table)
497435
.values((
498-
bsos::user_id.eq(user_id as i64),
436+
bsos::user_id.eq(user_id),
499437
bsos::collection_id.eq(&collection_id),
500438
bsos::bso_id.eq(&bso.id),
501439
bsos::sortindex.eq(sortindex),
@@ -517,6 +455,35 @@ impl Db for PgDb {
517455
.await
518456
}
519457

458+
async fn post_bsos(&mut self, params: params::PostBsos) -> DbResult<results::PostBsos> {
459+
let user_id = params.user_id.legacy_id as i64;
460+
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
461+
self.check_quota(&params.user_id, &params.collection, collection_id)
462+
.await?;
463+
self.ensure_user_collection(user_id, collection_id).await?;
464+
465+
// Rendering a VALUES statement for MERGE INTO here is painful so we
466+
// pass the bsos in a single bind param as an Array of a named
467+
// composite type (post_bso[]). The composite type must be explicitly
468+
// named/declared as postgres disallows unnamed/anonymous composite
469+
// types to be used as bind parameters
470+
diesel::sql_query(include_str!("post_bsos.sql"))
471+
.bind::<BigInt, _>(user_id)
472+
.bind::<Integer, _>(collection_id)
473+
.bind::<Array<PostBso>, _>(params.bsos)
474+
.bind::<Timestamptz, _>(self.checked_timestamp()?.as_datetime()?)
475+
.bind::<BigInt, _>(DEFAULT_BSO_TTL as i64)
476+
.execute(&mut self.conn)
477+
.await?;
478+
479+
self.update_collection(params::UpdateCollection {
480+
user_id: params.user_id,
481+
collection_id,
482+
collection: params.collection,
483+
})
484+
.await
485+
}
486+
520487
async fn get_collection_id(&mut self, name: &str) -> DbResult<results::GetCollectionId> {
521488
if let Some(id) = self.coll_cache.get_id(name)? {
522489
return Ok(id);
@@ -548,9 +515,9 @@ impl Db for PgDb {
548515
&mut self,
549516
params: params::UpdateCollection,
550517
) -> DbResult<SyncTimestamp> {
518+
let user_id = params.user_id.legacy_id as i64;
551519
let quota = if self.quota.enabled {
552-
self.calc_quota_usage(params.user_id.legacy_id as i64, params.collection_id)
553-
.await?
520+
self.calc_quota_usage(user_id, params.collection_id).await?
554521
} else {
555522
results::GetQuotaUsage {
556523
count: 0,
@@ -562,7 +529,7 @@ impl Db for PgDb {
562529

563530
diesel::insert_into(user_collections::table)
564531
.values((
565-
user_collections::user_id.eq(params.user_id.legacy_id as i64),
532+
user_collections::user_id.eq(user_id),
566533
user_collections::collection_id.eq(params.collection_id),
567534
user_collections::modified.eq(modified.as_datetime()?),
568535
user_collections::count.eq(quota.count as i64),

syncstorage-postgres/src/db/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use diesel_async::RunQueryDsl;
99
use std::{collections::HashMap, fmt, sync::Arc};
1010

1111
use syncserver_common::Metrics;
12-
use syncstorage_db_common::diesel::DbError;
13-
use syncstorage_db_common::{results, util::SyncTimestamp, UserIdentifier};
12+
use syncstorage_db_common::{
13+
diesel::DbError, params, results, util::SyncTimestamp, Db, UserIdentifier,
14+
};
1415
use syncstorage_settings::Quota;
1516

1617
use super::schema::{bsos, collections, user_collections};
@@ -205,6 +206,36 @@ impl PgDb {
205206
Ok(())
206207
}
207208

209+
async fn check_quota(
210+
&mut self,
211+
user_id: &UserIdentifier,
212+
collection: &str,
213+
collection_id: i32,
214+
) -> DbResult<Option<usize>> {
215+
if !self.quota.enabled {
216+
return Ok(None);
217+
}
218+
let usage = self
219+
.get_quota_usage(params::GetQuotaUsage {
220+
user_id: user_id.clone(),
221+
collection: collection.to_owned(),
222+
collection_id,
223+
})
224+
.await?;
225+
if usage.total_bytes >= self.quota.size {
226+
let mut tags = HashMap::default();
227+
tags.insert("collection".to_owned(), collection.to_owned());
228+
self.metrics.incr_with_tags("storage.quota.at_limit", tags);
229+
if self.quota.enforced {
230+
return Err(DbError::quota());
231+
} else {
232+
warn!("Quota at limit for user's collection: ({} bytes)", usage.total_bytes;
233+
"collection" => collection);
234+
}
235+
}
236+
Ok(Some(usage.total_bytes))
237+
}
238+
208239
// perform a heavier weight quota calculation
209240
async fn calc_quota_usage(
210241
&mut self,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
MERGE INTO bsos
2+
USING UNNEST($3) AS post
3+
ON bsos.user_id = $1
4+
AND bsos.collection_id = $2
5+
AND bsos.bso_id = post.bso_id
6+
WHEN MATCHED THEN
7+
UPDATE SET
8+
sortindex = COALESCE(post.sortindex, bsos.sortindex),
9+
payload = COALESCE(post.payload, bsos.payload),
10+
modified = COALESCE(
11+
CASE
12+
WHEN post.payload is NOT NULL OR post.sortindex IS NOT NULL THEN $4
13+
ELSE NULL
14+
END,
15+
bsos.modified
16+
),
17+
expiry = COALESCE(
18+
CASE
19+
WHEN post.ttl IS NOT NULL THEN $4 + (post.ttl || ' seconds')::INTERVAL
20+
ELSE NULL
21+
END,
22+
bsos.expiry
23+
)
24+
WHEN NOT MATCHED THEN
25+
INSERT (user_id, collection_id, bso_id, sortindex, payload, modified, expiry)
26+
VALUES ($1,
27+
$2,
28+
post.bso_id,
29+
post.sortindex,
30+
COALESCE(post.payload, ''),
31+
$4,
32+
$4 + (COALESCE(post.ttl, $5) || ' seconds')::INTERVAL
33+
)

syncstorage-postgres/src/orm_models.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use chrono::{offset::Utc, DateTime};
2+
use diesel::{AsChangeset, Identifiable, Queryable};
23
use uuid::Uuid;
34

45
use crate::schema::{batch_bsos, batches, bsos, collections, user_collections};
5-
use diesel::{AsChangeset, Identifiable, Queryable};
66

77
#[derive(Queryable, Debug, Identifiable)]
88
#[diesel(primary_key(user_id, collection_id, batch_id, batch_bso_id))]
@@ -18,16 +18,17 @@ pub struct BatchBso {
1818

1919
#[derive(Queryable, Debug, Identifiable)]
2020
#[diesel(primary_key(user_id, collection_id, batch_id))]
21-
#[diesel(table_name=batches)]
21+
#[diesel(table_name = batches)]
2222
pub struct Batch {
2323
pub user_id: i64,
2424
pub collection_id: i32,
2525
pub batch_id: Uuid,
2626
pub expiry: DateTime<Utc>,
2727
}
2828

29-
#[derive(Queryable, Debug, Identifiable)]
29+
#[derive(Queryable, Debug, Identifiable, Insertable)]
3030
#[diesel(primary_key(user_id, collection_id, bso_id))]
31+
#[diesel(table_name = bsos)]
3132
pub struct Bso {
3233
pub user_id: i64,
3334
pub collection_id: i32,
@@ -64,3 +65,30 @@ pub struct UserCollection {
6465
pub count: Option<i64>,
6566
pub total_bytes: Option<i64>,
6667
}
68+
69+
pub mod sql_types {
70+
use diesel::{
71+
pg::Pg,
72+
serialize::{self, Output, ToSql, WriteTuple},
73+
sql_types::{BigInt, Integer, Nullable, Text},
74+
};
75+
use syncstorage_db_common::params;
76+
77+
#[derive(SqlType, QueryId)]
78+
#[diesel(postgres_type(name = "post_bso"))]
79+
pub struct PostBso;
80+
81+
impl ToSql<PostBso, Pg> for params::PostCollectionBso {
82+
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result {
83+
WriteTuple::<(Text, Nullable<Integer>, Nullable<Text>, Nullable<BigInt>)>::write_tuple(
84+
&(
85+
&self.id,
86+
&self.sortindex,
87+
&self.payload,
88+
&self.ttl.map(|ttl| ttl as i64),
89+
),
90+
&mut out.reborrow(),
91+
)
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)