Skip to content

Commit da21bc5

Browse files
authored
db: prefer batched deletes (#41)
Our invocation output table is huge and deleting so many rows causes major lock contention. Delete them a few at a time instead to decrease lock contention. A batch size of 10 was intentionally chosen to deal with large write amplification from cascading deletes.
1 parent 159bb33 commit da21bc5

4 files changed

Lines changed: 263 additions & 5 deletions

File tree

blade/db/postgres/mod.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,30 @@ impl state::DB for Postgres {
344344
.context(format!("failed to delete invocation since {ot:#?}"))
345345
}
346346

347+
fn delete_invocations_batch(
348+
&mut self,
349+
ts: &std::time::SystemTime,
350+
limit: i64,
351+
) -> anyhow::Result<usize> {
352+
let ot: time::OffsetDateTime = (*ts).into();
353+
let ids_to_delete: Vec<String> = schema::invocations::table
354+
.select(schema::invocations::id)
355+
.filter(schema::invocations::start.le(ot))
356+
.limit(limit)
357+
.load(&mut self.conn)
358+
.context("failed to query invocations for batch deletion")?;
359+
360+
if ids_to_delete.is_empty() {
361+
return Ok(0);
362+
}
363+
364+
diesel::delete(
365+
schema::invocations::table.filter(schema::invocations::id.eq_any(&ids_to_delete)),
366+
)
367+
.execute(&mut self.conn)
368+
.context("failed to delete invocation batch")
369+
}
370+
347371
fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> {
348372
use schema::invocations::dsl::*;
349373
let now: time::OffsetDateTime = std::time::SystemTime::now().into();
@@ -1048,6 +1072,99 @@ mod tests {
10481072
}
10491073
}
10501074

1075+
#[test]
1076+
fn test_delete_batch() {
1077+
let tmp = tempdir::TempDir::new("test_delete_batch").unwrap();
1078+
let harness = harness::new(tmp.path().to_str().unwrap()).unwrap();
1079+
let uri = harness.uri();
1080+
super::init_db(&uri).unwrap();
1081+
let mgr = crate::manager::PostgresManager::new(&uri).unwrap();
1082+
let mut conn = PgConnection::establish(&uri).unwrap();
1083+
let mut db = mgr.get().unwrap();
1084+
1085+
let start = UNIX_EPOCH;
1086+
let mut curr = start;
1087+
let day = Duration::from_secs(60 * 60 * 24);
1088+
1089+
// Create 15 invocations
1090+
for i in 0..15 {
1091+
db.upsert_shallow_invocation(&state::InvocationResults {
1092+
id: format!("id{i}"),
1093+
start: curr.checked_add(day).unwrap(),
1094+
..Default::default()
1095+
})
1096+
.unwrap();
1097+
curr += day;
1098+
}
1099+
1100+
// Verify all 15 exist
1101+
{
1102+
let res = super::schema::invocations::table
1103+
.select(super::models::Invocation::as_select())
1104+
.get_results(&mut conn)
1105+
.unwrap();
1106+
assert_eq!(res.len(), 15);
1107+
}
1108+
1109+
// Delete in batches of 3, targeting first 10 invocations
1110+
let cutoff = start.checked_add(day * 10).unwrap();
1111+
1112+
// First batch: should delete 3
1113+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1114+
assert_eq!(deleted, 3);
1115+
{
1116+
let res = super::schema::invocations::table
1117+
.select(super::models::Invocation::as_select())
1118+
.get_results(&mut conn)
1119+
.unwrap();
1120+
assert_eq!(res.len(), 12);
1121+
}
1122+
1123+
// Second batch: should delete 3 more
1124+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1125+
assert_eq!(deleted, 3);
1126+
{
1127+
let res = super::schema::invocations::table
1128+
.select(super::models::Invocation::as_select())
1129+
.get_results(&mut conn)
1130+
.unwrap();
1131+
assert_eq!(res.len(), 9);
1132+
}
1133+
1134+
// Third batch: should delete 3 more
1135+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1136+
assert_eq!(deleted, 3);
1137+
{
1138+
let res = super::schema::invocations::table
1139+
.select(super::models::Invocation::as_select())
1140+
.get_results(&mut conn)
1141+
.unwrap();
1142+
assert_eq!(res.len(), 6);
1143+
}
1144+
1145+
// Fourth batch: should delete 1 more (only 1 left before cutoff)
1146+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1147+
assert_eq!(deleted, 1);
1148+
{
1149+
let res = super::schema::invocations::table
1150+
.select(super::models::Invocation::as_select())
1151+
.get_results(&mut conn)
1152+
.unwrap();
1153+
assert_eq!(res.len(), 5);
1154+
}
1155+
1156+
// Fifth batch: should delete 0 (none left before cutoff)
1157+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1158+
assert_eq!(deleted, 0);
1159+
{
1160+
let res = super::schema::invocations::table
1161+
.select(super::models::Invocation::as_select())
1162+
.get_results(&mut conn)
1163+
.unwrap();
1164+
assert_eq!(res.len(), 5);
1165+
}
1166+
}
1167+
10511168
#[test]
10521169
fn test_options() {
10531170
let tmp = tempdir::TempDir::new("test_test").unwrap();

blade/db/sqlite/mod.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,30 @@ impl state::DB for Sqlite {
362362
.context(format!("failed to delete invocation since {ot:#?}"))
363363
}
364364

365+
fn delete_invocations_batch(
366+
&mut self,
367+
ts: &std::time::SystemTime,
368+
limit: i64,
369+
) -> anyhow::Result<usize> {
370+
let ot: time::OffsetDateTime = (*ts).into();
371+
let ids_to_delete: Vec<String> = schema::Invocations::table
372+
.select(schema::Invocations::id)
373+
.filter(unixepoch(schema::Invocations::start).le(unixepoch(ot)))
374+
.limit(limit)
375+
.load(&mut self.conn)
376+
.context("failed to query invocations for batch deletion")?;
377+
378+
if ids_to_delete.is_empty() {
379+
return Ok(0);
380+
}
381+
382+
diesel::delete(
383+
schema::Invocations::table.filter(schema::Invocations::id.eq_any(&ids_to_delete)),
384+
)
385+
.execute(&mut self.conn)
386+
.context("failed to delete invocation batch")
387+
}
388+
365389
fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> {
366390
use schema::Invocations::dsl::*;
367391
let now: time::OffsetDateTime = std::time::SystemTime::now().into();
@@ -1069,6 +1093,98 @@ mod tests {
10691093
}
10701094
}
10711095

1096+
#[test]
1097+
fn test_delete_batch() {
1098+
let tmp = tempdir::TempDir::new("test_delete_batch").unwrap();
1099+
let db_path = tmp.path().join("test.db");
1100+
super::init_db(db_path.to_str().unwrap()).unwrap();
1101+
let mut conn = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap();
1102+
let mgr = crate::manager::SqliteManager::new(db_path.to_str().unwrap()).unwrap();
1103+
let mut db = mgr.get().unwrap();
1104+
1105+
let start = UNIX_EPOCH;
1106+
let mut curr = start;
1107+
let day = Duration::from_secs(60 * 60 * 24);
1108+
1109+
// Create 15 invocations
1110+
for i in 0..15 {
1111+
db.upsert_shallow_invocation(&state::InvocationResults {
1112+
id: format!("id{i}"),
1113+
start: curr.checked_add(day).unwrap(),
1114+
..Default::default()
1115+
})
1116+
.unwrap();
1117+
curr += day;
1118+
}
1119+
1120+
// Verify all 15 exist
1121+
{
1122+
let res = super::schema::Invocations::table
1123+
.select(super::models::Invocation::as_select())
1124+
.get_results(&mut conn)
1125+
.unwrap();
1126+
assert_eq!(res.len(), 15);
1127+
}
1128+
1129+
// Delete in batches of 3, targeting first 10 invocations
1130+
let cutoff = start.checked_add(day * 10).unwrap();
1131+
1132+
// First batch: should delete 3
1133+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1134+
assert_eq!(deleted, 3);
1135+
{
1136+
let res = super::schema::Invocations::table
1137+
.select(super::models::Invocation::as_select())
1138+
.get_results(&mut conn)
1139+
.unwrap();
1140+
assert_eq!(res.len(), 12);
1141+
}
1142+
1143+
// Second batch: should delete 3 more
1144+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1145+
assert_eq!(deleted, 3);
1146+
{
1147+
let res = super::schema::Invocations::table
1148+
.select(super::models::Invocation::as_select())
1149+
.get_results(&mut conn)
1150+
.unwrap();
1151+
assert_eq!(res.len(), 9);
1152+
}
1153+
1154+
// Third batch: should delete 3 more
1155+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1156+
assert_eq!(deleted, 3);
1157+
{
1158+
let res = super::schema::Invocations::table
1159+
.select(super::models::Invocation::as_select())
1160+
.get_results(&mut conn)
1161+
.unwrap();
1162+
assert_eq!(res.len(), 6);
1163+
}
1164+
1165+
// Fourth batch: should delete 1 more (only 1 left before cutoff)
1166+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1167+
assert_eq!(deleted, 1);
1168+
{
1169+
let res = super::schema::Invocations::table
1170+
.select(super::models::Invocation::as_select())
1171+
.get_results(&mut conn)
1172+
.unwrap();
1173+
assert_eq!(res.len(), 5);
1174+
}
1175+
1176+
// Fifth batch: should delete 0 (none left before cutoff)
1177+
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
1178+
assert_eq!(deleted, 0);
1179+
{
1180+
let res = super::schema::Invocations::table
1181+
.select(super::models::Invocation::as_select())
1182+
.get_results(&mut conn)
1183+
.unwrap();
1184+
assert_eq!(res.len(), 5);
1185+
}
1186+
}
1187+
10721188
#[test]
10731189
fn test_options() {
10741190
let tmp = tempdir::TempDir::new("test_target").unwrap();

blade/main.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,9 @@ cfg_if! {
340340
let day = std::time::Duration::from_secs(60 * 60 * 24);
341341
let interval = global.retention.unwrap_or(day);
342342
let check_interval = std::cmp::min(day, interval/7);
343+
const BATCH_SIZE: i64 = 10;
344+
const BATCH_DELAY: std::time::Duration = std::time::Duration::from_millis(100);
345+
343346
loop {
344347
tokio::time::sleep(check_interval).await;
345348
if global.retention.is_none() {
@@ -349,11 +352,32 @@ cfg_if! {
349352
tracing::warn!("Overflow when clean up time");
350353
continue;
351354
};
352-
db::run(global.db_manager.clone(), move |db_mgr| db_mgr.delete_invocations_since(&since)).await.inspect_err(|e| {
353-
tracing::warn!("Failed to mark old invocations for deletion: {e:#?}");
354-
}).ok().inspect(|count| {
355-
tracing::info!("Marked {} invocations for deletion", count);
356-
});
355+
356+
let mut total_deleted = 0;
357+
loop {
358+
let deleted = db::run(global.db_manager.clone(), move |db_mgr| {
359+
db_mgr.delete_invocations_batch(&since, BATCH_SIZE)
360+
}).await;
361+
362+
match deleted {
363+
Ok(count) => {
364+
if count == 0 {
365+
break;
366+
}
367+
total_deleted += count;
368+
tracing::debug!("Deleted batch of {} invocations", count);
369+
tokio::time::sleep(BATCH_DELAY).await;
370+
},
371+
Err(e) => {
372+
tracing::warn!("Failed to delete invocation batch: {e:#?}");
373+
break;
374+
}
375+
}
376+
}
377+
378+
if total_deleted > 0 {
379+
tracing::info!("Deleted {} total invocations during cleanup", total_deleted);
380+
}
357381
}
358382
}
359383

blade/state/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ pub trait DB {
212212
fn get_shallow_invocation(&mut self, id: &str) -> anyhow::Result<InvocationResults>;
213213
fn delete_invocation(&mut self, id: &str) -> anyhow::Result<()>;
214214
fn delete_invocations_since(&mut self, ts: &std::time::SystemTime) -> anyhow::Result<usize>;
215+
fn delete_invocations_batch(&mut self, ts: &std::time::SystemTime, limit: i64) -> anyhow::Result<usize>;
215216
fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()>;
216217
fn insert_options(&mut self, id: &str, options: &BuildOptions) -> anyhow::Result<()>;
217218
fn get_options(&mut self, id: &str) -> anyhow::Result<BuildOptions>;

0 commit comments

Comments
 (0)