Skip to content

Commit 935b468

Browse files
authored
feat: optimize postgres get_or_create_collection_id (#2028)
making it (and mysql's) match spanner's, also fixing/simplifying their create_collection test method and fix mysql/postgres transactions to begin first and foremost Closes STOR-465 Closes STOR-371
1 parent 646e516 commit 935b468

6 files changed

Lines changed: 65 additions & 65 deletions

File tree

syncstorage-mysql/src/db/db_impl.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ impl Db for MysqlDb {
3939
/// than explicit locking, but our ops team have expressed concerns about
4040
/// the efficiency of that approach at scale.
4141
async fn lock_for_read(&mut self, params: params::LockCollection) -> DbResult<()> {
42+
// Lock the db
43+
self.begin(false).await?;
4244
let collection_id = self
4345
.get_collection_id(&params.collection)
4446
.await
@@ -60,8 +62,6 @@ impl Db for MysqlDb {
6062
return Ok(());
6163
}
6264

63-
// Lock the db
64-
self.begin(false).await?;
6565
let modified = user_collections::table
6666
.select(user_collections::modified)
6767
.filter(user_collections::user_id.eq(user_id))
@@ -82,6 +82,8 @@ impl Db for MysqlDb {
8282
}
8383

8484
async fn lock_for_write(&mut self, params: params::LockCollection) -> DbResult<()> {
85+
// Lock the db
86+
self.begin(true).await?;
8587
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
8688
let user_id = params.user_id.legacy_id as i64;
8789
let key = (params.user_id, collection_id);
@@ -92,8 +94,6 @@ impl Db for MysqlDb {
9294
));
9395
}
9496

95-
// Lock the db
96-
self.begin(true).await?;
9797
let modified = user_collections::table
9898
.select(user_collections::modified)
9999
.filter(user_collections::user_id.eq(user_id))
@@ -754,7 +754,7 @@ impl Db for MysqlDb {
754754

755755
#[cfg(debug_assertions)]
756756
async fn create_collection(&mut self, name: &str) -> Result<i32, Self::Error> {
757-
self.get_or_create_collection_id(name).await
757+
self._create_collection(name).await
758758
}
759759

760760
#[cfg(debug_assertions)]

syncstorage-mysql/src/db/mod.rs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@ use diesel::{
88
};
99
use diesel_async::RunQueryDsl;
1010
use syncserver_common::Metrics;
11-
use syncstorage_db_common::{results, util::SyncTimestamp, UserIdentifier};
11+
use syncstorage_db_common::{
12+
error::DbErrorIntrospect, results, util::SyncTimestamp, Db, UserIdentifier,
13+
FIRST_CUSTOM_COLLECTION_ID,
14+
};
1215
use syncstorage_settings::Quota;
1316

1417
use crate::{
1518
pool::{CollectionCache, Conn},
1619
DbError, DbResult,
1720
};
18-
use schema::{bso, collections};
21+
use schema::{bso, collections, last_insert_id};
1922

2023
mod batch_impl;
2124
mod db_impl;
@@ -110,45 +113,36 @@ impl MysqlDb {
110113
}
111114

112115
pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
113-
if let Some(id) = self.coll_cache.get_id(name)? {
114-
return Ok(id);
116+
match self.get_collection_id(name).await {
117+
Err(e) if e.is_collection_not_found() => self._create_collection(name).await,
118+
result => result,
115119
}
120+
}
116121

117-
diesel::insert_or_ignore_into(collections::table)
122+
async fn _create_collection(&mut self, name: &str) -> DbResult<i32> {
123+
if !cfg!(debug_assertions) && !self.session.in_write_transaction {
124+
return Err(DbError::internal(
125+
"Can't escalate read-lock to write-lock".to_owned(),
126+
));
127+
}
128+
diesel::insert_into(collections::table)
118129
.values(collections::name.eq(name))
119130
.execute(&mut self.conn)
120131
.await?;
121-
122-
let id = collections::table
123-
.select(collections::id)
124-
.filter(collections::name.eq(name))
125-
.first(&mut self.conn)
126-
.await?;
127-
128-
if !self.session.in_write_transaction {
129-
self.coll_cache.put(id, name.to_owned())?;
132+
let collection_id = diesel::select(last_insert_id())
133+
.first::<u64>(&mut self.conn)
134+
.await?
135+
.try_into()
136+
.map_err(|e| {
137+
DbError::internal(format!("Couldn't convert last_insert_id() to i32: {e}"))
138+
})?;
139+
if collection_id < FIRST_CUSTOM_COLLECTION_ID {
140+
// DDL ensures this should never occur
141+
return Err(DbError::internal(
142+
"create_collection < {FIRST_CUSTOM_COLLECTION_ID}".to_owned(),
143+
));
130144
}
131-
132-
Ok(id)
133-
}
134-
135-
async fn _get_collection_name(&mut self, id: i32) -> DbResult<String> {
136-
let name = if let Some(name) = self.coll_cache.get_name(id)? {
137-
name
138-
} else {
139-
sql_query(
140-
"SELECT name
141-
FROM collections
142-
WHERE id = ?",
143-
)
144-
.bind::<Integer, _>(&id)
145-
.get_result::<NameResult>(&mut self.conn)
146-
.await
147-
.optional()?
148-
.ok_or_else(DbError::collection_not_found)?
149-
.name
150-
};
151-
Ok(name)
145+
Ok(collection_id)
152146
}
153147

154148
async fn map_collection_names<T>(

syncstorage-mysql/src/db/schema.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,8 @@ allow_tables_to_appear_in_same_query!(
6969
collections,
7070
user_collections,
7171
);
72+
73+
define_sql_function! {
74+
/// MySQL's LAST_INSERT_ID()
75+
fn last_insert_id() -> Unsigned<BigInt>
76+
}

syncstorage-postgres/src/db/db_impl.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl Db for PgDb {
7272
&mut self,
7373
params: params::LockCollection,
7474
) -> DbResult<results::LockCollection> {
75+
self.begin(false).await?;
7576
let user_id = params.user_id.legacy_id as i64;
7677
let collection_id = self
7778
.get_collection_id(&params.collection)
@@ -95,8 +96,6 @@ impl Db for PgDb {
9596

9697
// `FOR SHARE`
9798
// Obtains shared lock, allowing multiple transactions to read rows simultaneously.
98-
self.begin(false).await?;
99-
10099
let modified = user_collections::table
101100
.select(user_collections::modified)
102101
.filter(user_collections::user_id.eq(user_id))
@@ -116,6 +115,7 @@ impl Db for PgDb {
116115
}
117116

118117
async fn lock_for_write(&mut self, params: params::LockCollection) -> DbResult<()> {
118+
self.begin(true).await?;
119119
let user_id = params.user_id.legacy_id as i64;
120120
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
121121
let key = (params.user_id, collection_id);
@@ -130,7 +130,6 @@ impl Db for PgDb {
130130
// Acquires exclusive lock on select rows, prohibits other transactions from modifying
131131
// until complete.
132132
let nowtz = now.into_sql::<Timestamptz>();
133-
self.begin(true).await?;
134133
let row = user_collections::table
135134
.select((user_collections::modified, nowtz))
136135
.filter(user_collections::user_id.eq(user_id))
@@ -549,7 +548,7 @@ impl Db for PgDb {
549548

550549
#[cfg(debug_assertions)]
551550
async fn create_collection(&mut self, name: &str) -> DbResult<i32> {
552-
self.get_or_create_collection_id(name).await
551+
self._create_collection(name).await
553552
}
554553

555554
#[cfg(debug_assertions)]

syncstorage-postgres/src/db/mod.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use std::{collections::HashMap, fmt, sync::Arc};
1010

1111
use syncserver_common::Metrics;
1212
use syncstorage_db_common::{
13-
diesel::DbError, params, results, util::SyncTimestamp, Db, UserIdentifier,
13+
diesel::DbError, error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db,
14+
UserIdentifier, FIRST_CUSTOM_COLLECTION_ID,
1415
};
1516
use syncstorage_settings::Quota;
1617

@@ -98,30 +99,31 @@ impl PgDb {
9899
/// Gets the provided collection by name and creates it if not present.
99100
/// Checks collection cache first to see if matching collection stored.
100101
/// Uses logic to not make change sif there is a conflict during insert.
101-
pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
102-
if let Some(id) = self.coll_cache.get_id(name)? {
103-
return Ok(id);
102+
async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
103+
match self.get_collection_id(name).await {
104+
Err(e) if e.is_collection_not_found() => self._create_collection(name).await,
105+
result => result,
104106
}
107+
}
105108

106-
// Postgres specific ON CONFLICT DO NOTHING statement.
107-
// https://docs.diesel.rs/2.0.x/diesel/query_builder/struct.InsertStatement.html#method.on_conflict_do_nothing
108-
diesel::insert_into(collections::table)
109+
async fn _create_collection(&mut self, name: &str) -> DbResult<i32> {
110+
if !cfg!(debug_assertions) && !self.session.in_write_transaction {
111+
return Err(DbError::internal(
112+
"Can't escalate read-lock to write-lock".to_owned(),
113+
));
114+
}
115+
let collection_id = diesel::insert_into(collections::table)
109116
.values(collections::name.eq(name))
110-
.on_conflict_do_nothing()
111-
.execute(&mut self.conn)
112-
.await?;
113-
114-
let id = collections::table
115-
.select(collections::collection_id)
116-
.filter(collections::name.eq(name))
117-
.first(&mut self.conn)
117+
.returning(collections::collection_id)
118+
.get_result(&mut self.conn)
118119
.await?;
119-
120-
if !self.session.in_write_transaction {
121-
self.coll_cache.put(id, name.to_owned())?;
120+
if collection_id < FIRST_CUSTOM_COLLECTION_ID {
121+
// DDL ensures this should never occur
122+
return Err(DbError::internal(
123+
"create_collection < {FIRST_CUSTOM_COLLECTION_ID}".to_owned(),
124+
));
122125
}
123-
124-
Ok(id)
126+
Ok(collection_id)
125127
}
126128

127129
/// Given a set of collection_ids, return a HashMap of collection_id's

syncstorage-spanner/src/db/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl SpannerDb {
113113

114114
async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
115115
match self.get_collection_id(name).await {
116-
Err(err) if err.is_collection_not_found() => self._create_collection(name).await,
116+
Err(e) if e.is_collection_not_found() => self._create_collection(name).await,
117117
result => result,
118118
}
119119
}

0 commit comments

Comments
 (0)