Skip to content

Commit 17fdc0a

Browse files
jamiepineclaude
andcommitted
fix library sync backfill O(N^2) hotspots on both sides of the protocol
Addresses #3058 (shared models, content_identity) and #3060 (device-owned entries). The progressive slowdown on initial pair came from several independent hotspots that compound: Sender side - PeerLog::get_since now pushes LIMIT into SQL; callers fetch limit + 1 to derive has_more. Previously every SharedChangeRequest reloaded and parsed the entire remaining shared_changes log in memory before truncating, making sender work O(N * batches). - Entry and content_identity query_for_sync batch FK to UUID conversion across the whole batch via a new convert_fks_to_uuids_batch helper — one DB round trip per FK type instead of per record per FK. Schema - New index idx_entries_indexed_at_uuid backing the (indexed_at, uuid) cursor in Entry::query_for_sync. Without it every batch request fell back to a full-table scan. Receiver side - New task-local in_backfill scope wraps the backfill apply phases. Entry::apply_state_change uses it to skip per-entry entry_closure rebuild; the existing post_backfill_rebuild pass does a single bulk rebuild at the end. emit_batch_resource_events short-circuits during backfill and the coordinator emits one Event::Refresh after post- backfill rebuild so the UI invalidates cached views. - Entry FK resolution in backfill splits mappings into self-referential (parent_id) and the rest. Non-self FKs resolve in one batch across the whole batch; only parent_id still runs per-entry so children can see just-inserted parents. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9945639 commit 17fdc0a

12 files changed

Lines changed: 419 additions & 117 deletions

File tree

core/src/domain/resource_manager.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,19 @@ impl ResourceManager {
269269
resource_type: &str,
270270
resource_ids: Vec<Uuid>,
271271
) -> Result<()> {
272+
// During backfill we skip per-record fan-out. For models like
273+
// content_identity each UUID triggers 2 DB queries via dependency
274+
// routing, which dominates apply time on large libraries. The backfill
275+
// coordinator emits a single coarse invalidation after the scope ends.
276+
if crate::infra::sync::is_in_backfill() {
277+
tracing::trace!(
278+
resource_type = %resource_type,
279+
count = resource_ids.len(),
280+
"Skipping per-record resource event emission during backfill"
281+
);
282+
return Ok(());
283+
}
284+
272285
// For now, delegate to single-resource handler
273286
// In future, could optimize by batching virtual resource construction
274287
self.emit_resource_events(resource_type, resource_ids).await

core/src/infra/db/entities/content_identity.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -246,36 +246,55 @@ impl Syncable for Model {
246246

247247
let results = query.all(db).await?;
248248

249-
let mut sync_results = Vec::new();
249+
let mut sync_results: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
250+
Vec::with_capacity(results.len());
250251
for content in results {
251-
if content.uuid.is_none() {
252-
continue;
253-
}
252+
let uuid = match content.uuid {
253+
Some(u) => u,
254+
None => continue,
255+
};
254256

255-
let mut json = match content.to_sync_json() {
257+
let json = match content.to_sync_json() {
256258
Ok(j) => j,
257259
Err(e) => {
258260
tracing::warn!(error = %e, content_hash = %content.content_hash, "Failed to serialize content_identity for sync");
259261
continue;
260262
}
261263
};
262264

263-
// Convert FK to UUID for cross-device compatibility
264-
for fk in Self::foreign_key_mappings() {
265-
if let Err(e) =
266-
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
265+
sync_results.push((uuid, json, content.last_verified_at));
266+
}
267+
268+
// Batch FK → UUID conversion across the whole batch: one DB round trip
269+
// per FK type instead of one per (record × FK).
270+
let fk_mappings = Self::foreign_key_mappings();
271+
if !fk_mappings.is_empty() && !sync_results.is_empty() {
272+
let mut payloads: Vec<serde_json::Value> =
273+
sync_results.iter().map(|(_, json, _)| json.clone()).collect();
274+
275+
for fk in &fk_mappings {
276+
if let Err(e) = crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
277+
&mut payloads,
278+
fk,
279+
db,
280+
)
281+
.await
267282
{
268283
tracing::warn!(
269284
error = %e,
270-
uuid = %content.uuid.unwrap(),
271285
fk_field = fk.local_field,
272-
"Failed to convert FK to UUID, skipping content_identity"
286+
"Batch FK conversion failed for content_identity"
273287
);
274-
continue;
288+
return Err(sea_orm::DbErr::Custom(format!(
289+
"ContentIdentity FK batch conversion failed: {}",
290+
e
291+
)));
275292
}
276293
}
277294

278-
sync_results.push((content.uuid.unwrap(), json, content.last_verified_at));
295+
for ((_, json, _), resolved) in sync_results.iter_mut().zip(payloads.into_iter()) {
296+
*json = resolved;
297+
}
279298
}
280299

281300
Ok(sync_results)

core/src/infra/db/entities/entry.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -292,16 +292,16 @@ impl crate::infra::sync::Syncable for Model {
292292
std::collections::HashMap::new()
293293
};
294294

295-
// Convert to sync format with FK mapping
296-
let mut sync_results = Vec::new();
295+
// Serialize each row to JSON with its UUID and timestamp.
296+
let mut staged: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
297+
Vec::with_capacity(results.len());
297298

298299
for entry in results {
299300
let uuid = match entry.uuid {
300301
Some(u) => u,
301-
None => continue, // Skip entries without UUIDs
302+
None => continue,
302303
};
303304

304-
// Serialize to JSON
305305
let mut json = match entry.to_sync_json() {
306306
Ok(j) => j,
307307
Err(e) => {
@@ -311,9 +311,8 @@ impl crate::infra::sync::Syncable for Model {
311311
};
312312

313313
// For directories, include the absolute path from directory_paths
314-
// This ensures receiving devices get identical paths for universal addressing
314+
// so receiving devices get identical paths for universal addressing.
315315
if entry.kind == 1 {
316-
// Directory
317316
if let Some(path) = directory_paths_map.get(&entry.id) {
318317
if let Some(obj) = json.as_object_mut() {
319318
obj.insert(
@@ -324,27 +323,43 @@ impl crate::infra::sync::Syncable for Model {
324323
}
325324
}
326325

327-
// Convert FK integer IDs to UUIDs
328-
for fk in <Model as Syncable>::foreign_key_mappings() {
329-
if let Err(e) =
330-
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
326+
let timestamp = entry.indexed_at.unwrap_or(entry.modified_at);
327+
staged.push((uuid, json, timestamp));
328+
}
329+
330+
// Batch-convert FK integer IDs to UUIDs one FK type at a time across
331+
// the whole batch — single DB round trip per FK, not per record × FK.
332+
let fk_mappings = <Model as Syncable>::foreign_key_mappings();
333+
if !fk_mappings.is_empty() && !staged.is_empty() {
334+
let mut payloads: Vec<serde_json::Value> =
335+
staged.iter().map(|(_, json, _)| json.clone()).collect();
336+
337+
for fk in &fk_mappings {
338+
if let Err(e) = crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
339+
&mut payloads,
340+
fk,
341+
db,
342+
)
343+
.await
331344
{
332345
tracing::warn!(
333346
error = %e,
334-
uuid = %uuid,
335347
fk_field = fk.local_field,
336-
"Failed to convert FK to UUID, skipping entry"
348+
"Batch FK conversion failed for entries"
337349
);
338-
continue;
350+
return Err(sea_orm::DbErr::Custom(format!(
351+
"Entry FK batch conversion failed: {}",
352+
e
353+
)));
339354
}
340355
}
341356

342-
// Use indexed_at for checkpoint/watermark tracking, fallback to modified_at if NULL
343-
let timestamp = entry.indexed_at.unwrap_or(entry.modified_at);
344-
sync_results.push((uuid, json, timestamp));
357+
for ((_, json, _), resolved) in staged.iter_mut().zip(payloads.into_iter()) {
358+
*json = resolved;
359+
}
345360
}
346361

347-
Ok(sync_results)
362+
Ok(staged)
348363
}
349364

350365
/// Apply state change - already implemented in Model impl block below
@@ -564,10 +579,12 @@ impl Model {
564579
inserted.id
565580
};
566581

567-
// Rebuild entry_closure for this synced entry
568-
// Without this, the entry only has a self-reference and cannot be queried
569-
// for descendants, breaking subtree operations, location scoping, etc.
570-
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
582+
// Rebuild entry_closure for this synced entry, unless we're inside a
583+
// backfill apply loop — in that case the post_backfill_rebuild hook
584+
// does a single bulk rebuild at the end, so per-entry work is wasted.
585+
if !crate::infra::sync::is_in_backfill() {
586+
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
587+
}
571588

572589
// If this is a directory, create or update its entry in the directory_paths table
573590
if EntryKind::from(kind) == EntryKind::Directory {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//! Add composite index on entries(indexed_at, uuid) to back the device-owned
2+
//! sync cursor.
3+
//!
4+
//! `Entry::query_for_sync` paginates by `ORDER BY indexed_at ASC, uuid ASC`
5+
//! with a tie-breaker filter of the same shape. Without this index, SQLite
6+
//! does a full table scan per batch request — O(N) per batch, O(N^2) across
7+
//! an initial backfill of a large library.
8+
9+
use sea_orm_migration::prelude::*;
10+
11+
#[derive(DeriveMigrationName)]
12+
pub struct Migration;
13+
14+
#[async_trait::async_trait]
15+
impl MigrationTrait for Migration {
16+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
17+
manager
18+
.get_connection()
19+
.execute_unprepared(
20+
"CREATE INDEX IF NOT EXISTS idx_entries_indexed_at_uuid \
21+
ON entries(indexed_at, uuid)",
22+
)
23+
.await?;
24+
25+
Ok(())
26+
}
27+
28+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
29+
manager
30+
.get_connection()
31+
.execute_unprepared("DROP INDEX IF EXISTS idx_entries_indexed_at_uuid")
32+
.await?;
33+
34+
Ok(())
35+
}
36+
}

core/src/infra/db/migration/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod m20260104_000001_replace_device_id_with_volume_id;
3838
mod m20260105_000001_add_volume_id_to_locations;
3939
mod m20260114_000001_fix_search_index_include_directories;
4040
mod m20260123_000001_remove_legacy_sync_columns;
41+
mod m20260417_000001_add_entries_sync_cursor_index;
4142

4243
pub struct Migrator;
4344

@@ -81,6 +82,7 @@ impl MigratorTrait for Migrator {
8182
Box::new(m20260105_000001_add_volume_id_to_locations::Migration),
8283
Box::new(m20260114_000001_fix_search_index_include_directories::Migration),
8384
Box::new(m20260123_000001_remove_legacy_sync_columns::Migration),
85+
Box::new(m20260417_000001_add_entries_sync_cursor_index::Migration),
8486
]
8587
}
8688
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! Task-local flag identifying code running inside a backfill apply loop.
2+
//!
3+
//! A few per-record hooks (closure table rebuild, resource event emission)
4+
//! are redundant during backfill because the coordinator does bulk work at
5+
//! the end. Models check this flag to skip that per-record work.
6+
7+
tokio::task_local! {
8+
static IN_BACKFILL: ();
9+
}
10+
11+
/// Run `fut` with the in-backfill flag set. Nested scopes are allowed.
12+
pub async fn in_backfill<F, T>(fut: F) -> T
13+
where
14+
F: std::future::Future<Output = T>,
15+
{
16+
if is_in_backfill() {
17+
fut.await
18+
} else {
19+
IN_BACKFILL.scope((), fut).await
20+
}
21+
}
22+
23+
/// True when the current task is inside an `in_backfill` scope.
24+
pub fn is_in_backfill() -> bool {
25+
IN_BACKFILL.try_with(|_| ()).is_ok()
26+
}

core/src/infra/sync/fk_mapper.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,85 @@ pub async fn convert_fk_to_uuid(
102102
Ok(())
103103
}
104104

105+
/// Batch convert local integer FKs to UUIDs across multiple records.
106+
///
107+
/// Same contract as [`convert_fk_to_uuid`] but one DB round trip per FK type
108+
/// instead of one per (record × FK). Records that reference a missing target
109+
/// are left with their original local-id field intact; callers log/skip as
110+
/// needed. Called from `query_for_sync` implementations on the sync sender.
111+
pub async fn convert_fks_to_uuids_batch(
112+
records: &mut [Value],
113+
fk: &FKMapping,
114+
db: &DatabaseConnection,
115+
) -> Result<()> {
116+
if records.is_empty() {
117+
return Ok(());
118+
}
119+
120+
let uuid_field = fk.uuid_field_name();
121+
122+
// Collect all local IDs we need to resolve.
123+
let mut ids_to_lookup: HashSet<i32> = HashSet::new();
124+
for json in records.iter() {
125+
match json.get(fk.local_field) {
126+
Some(v) if v.is_null() => { /* null FK, handled below */ }
127+
Some(v) => {
128+
if let Some(id) = v.as_i64() {
129+
ids_to_lookup.insert(id as i32);
130+
}
131+
}
132+
None => { /* field absent, handled below */ }
133+
}
134+
}
135+
136+
let id_to_uuid = if ids_to_lookup.is_empty() {
137+
HashMap::new()
138+
} else {
139+
batch_lookup_uuids_for_local_ids(fk.target_table, ids_to_lookup, db).await?
140+
};
141+
142+
for json in records.iter_mut() {
143+
let local_field_value = json.get(fk.local_field).cloned();
144+
145+
match local_field_value {
146+
Some(v) if v.is_null() => {
147+
json[&uuid_field] = Value::Null;
148+
if let Some(obj) = json.as_object_mut() {
149+
obj.remove(fk.local_field);
150+
}
151+
}
152+
Some(v) => {
153+
if let Some(id) = v.as_i64() {
154+
match id_to_uuid.get(&(id as i32)) {
155+
Some(uuid) => {
156+
json[&uuid_field] = json!(uuid.to_string());
157+
if let Some(obj) = json.as_object_mut() {
158+
obj.remove(fk.local_field);
159+
}
160+
}
161+
None => {
162+
// Target record missing locally — leave FK field intact
163+
// so the caller can decide whether to skip or warn.
164+
tracing::warn!(
165+
fk_field = fk.local_field,
166+
target_table = fk.target_table,
167+
id = id,
168+
"FK target not found during batch conversion; skipping record"
169+
);
170+
}
171+
}
172+
}
173+
}
174+
None => {
175+
// Field absent — treat as null for sync payloads.
176+
json[&uuid_field] = Value::Null;
177+
}
178+
}
179+
}
180+
181+
Ok(())
182+
}
183+
105184
/// Look up UUID for a local integer ID via the registry
106185
async fn lookup_uuid_for_local_id(
107186
table: &str,

core/src/infra/sync/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//! - Checkpoint persistence for resumable backfill
1111
//!
1212
13+
pub mod backfill_context;
1314
pub mod checkpoints;
1415
pub mod config;
1516
pub mod dependency_graph;
@@ -27,6 +28,7 @@ pub mod transaction;
2728
pub mod transport;
2829
pub mod watermarks;
2930

31+
pub use backfill_context::{in_backfill, is_in_backfill};
3032
pub use checkpoints::{BackfillCheckpoint, BackfillCheckpointStore, CheckpointError};
3133
pub use config::{
3234
BatchingConfig, MonitoringConfig, NetworkConfig, PruningStrategy, RetentionConfig, SyncConfig,
@@ -42,8 +44,8 @@ pub use event_log::{
4244
SyncEventLogger, SyncEventQuery, SyncEventType,
4345
};
4446
pub use fk_mapper::{
45-
batch_map_sync_json_to_local, convert_fk_to_uuid, map_sync_json_to_local, BatchFkMapResult,
46-
FKMapping,
47+
batch_map_sync_json_to_local, convert_fk_to_uuid, convert_fks_to_uuids_batch,
48+
map_sync_json_to_local, BatchFkMapResult, FKMapping,
4749
};
4850
pub use hlc::{HLCGenerator, HLC};
4951
pub use peer_log::{ChangeType, PeerLog, PeerLogError, SharedChangeEntry};

0 commit comments

Comments
 (0)