Skip to content

Commit 96debbd

Browse files
jamiepineclaude
andcommitted
address PR review feedback: drop unresolved FKs, guard limits
convert_fks_to_uuids_batch now returns the set of record indices whose FK could not be resolved (missing target row or non-integer value). Callers in entry::query_for_sync and content_identity::query_for_sync drop those records from the outgoing sync batch instead of zipping the partially converted payload back. Previously the sender would ship a record with its local int field intact and no *_uuid field; the receiver's map_sync_json_to_local interpreted that as already-resolved and wrote the sender-local integer directly to the local DB, corrupting the FK. Also: reject limit == 0 on SharedChangeRequest / get_shared_changes up front (would have returned 0 rows with has_more = true and spun), and clamp the SQL LIMIT bind with i64::try_from(..).unwrap_or(i64::MAX) instead of a wrapping `as i64`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 17fdc0a commit 96debbd

6 files changed

Lines changed: 141 additions & 66 deletions

File tree

core/src/infra/db/entities/content_identity.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -266,35 +266,55 @@ impl Syncable for Model {
266266
}
267267

268268
// Batch FK → UUID conversion across the whole batch: one DB round trip
269-
// per FK type instead of one per (record × FK).
269+
// per FK type instead of one per (record × FK). Records that fail
270+
// resolution are dropped so peers never see a sender-local int in the
271+
// payload.
270272
let fk_mappings = Self::foreign_key_mappings();
271273
if !fk_mappings.is_empty() && !sync_results.is_empty() {
272274
let mut payloads: Vec<serde_json::Value> =
273275
sync_results.iter().map(|(_, json, _)| json.clone()).collect();
276+
let mut failed_indices: std::collections::HashSet<usize> =
277+
std::collections::HashSet::new();
274278

275279
for fk in &fk_mappings {
276-
if let Err(e) = crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
280+
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
277281
&mut payloads,
278282
fk,
279283
db,
280284
)
281285
.await
282286
{
283-
tracing::warn!(
284-
error = %e,
285-
fk_field = fk.local_field,
286-
"Batch FK conversion failed for content_identity"
287-
);
288-
return Err(sea_orm::DbErr::Custom(format!(
289-
"ContentIdentity FK batch conversion failed: {}",
290-
e
291-
)));
287+
Ok(failed) => failed_indices.extend(failed),
288+
Err(e) => {
289+
tracing::warn!(
290+
error = %e,
291+
fk_field = fk.local_field,
292+
"Batch FK conversion failed for content_identity"
293+
);
294+
return Err(sea_orm::DbErr::Custom(format!(
295+
"ContentIdentity FK batch conversion failed: {}",
296+
e
297+
)));
298+
}
292299
}
293300
}
294301

295-
for ((_, json, _), resolved) in sync_results.iter_mut().zip(payloads.into_iter()) {
296-
*json = resolved;
297-
}
302+
sync_results = sync_results
303+
.into_iter()
304+
.zip(payloads.into_iter())
305+
.enumerate()
306+
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
307+
if failed_indices.contains(&idx) {
308+
tracing::warn!(
309+
uuid = %uuid,
310+
"Dropping content_identity with unresolved FK from sync batch"
311+
);
312+
None
313+
} else {
314+
Some((uuid, resolved, ts))
315+
}
316+
})
317+
.collect();
298318
}
299319

300320
Ok(sync_results)

core/src/infra/db/entities/entry.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -329,34 +329,54 @@ impl crate::infra::sync::Syncable for Model {
329329

330330
// Batch-convert FK integer IDs to UUIDs one FK type at a time across
331331
// the whole batch — single DB round trip per FK, not per record × FK.
332+
// Any record that fails resolution (missing target, bad value) is
333+
// dropped before we return so peers never see a sender-local int.
332334
let fk_mappings = <Model as Syncable>::foreign_key_mappings();
333335
if !fk_mappings.is_empty() && !staged.is_empty() {
334336
let mut payloads: Vec<serde_json::Value> =
335337
staged.iter().map(|(_, json, _)| json.clone()).collect();
338+
let mut failed_indices: std::collections::HashSet<usize> =
339+
std::collections::HashSet::new();
336340

337341
for fk in &fk_mappings {
338-
if let Err(e) = crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
342+
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
339343
&mut payloads,
340344
fk,
341345
db,
342346
)
343347
.await
344348
{
345-
tracing::warn!(
346-
error = %e,
347-
fk_field = fk.local_field,
348-
"Batch FK conversion failed for entries"
349-
);
350-
return Err(sea_orm::DbErr::Custom(format!(
351-
"Entry FK batch conversion failed: {}",
352-
e
353-
)));
349+
Ok(failed) => failed_indices.extend(failed),
350+
Err(e) => {
351+
tracing::warn!(
352+
error = %e,
353+
fk_field = fk.local_field,
354+
"Batch FK conversion failed for entries"
355+
);
356+
return Err(sea_orm::DbErr::Custom(format!(
357+
"Entry FK batch conversion failed: {}",
358+
e
359+
)));
360+
}
354361
}
355362
}
356363

357-
for ((_, json, _), resolved) in staged.iter_mut().zip(payloads.into_iter()) {
358-
*json = resolved;
359-
}
364+
staged = staged
365+
.into_iter()
366+
.zip(payloads.into_iter())
367+
.enumerate()
368+
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
369+
if failed_indices.contains(&idx) {
370+
tracing::warn!(
371+
uuid = %uuid,
372+
"Dropping entry with unresolved FK from sync batch"
373+
);
374+
None
375+
} else {
376+
Some((uuid, resolved, ts))
377+
}
378+
})
379+
.collect();
360380
}
361381

362382
Ok(staged)

core/src/infra/sync/fk_mapper.rs

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -104,32 +104,49 @@ pub async fn convert_fk_to_uuid(
104104

105105
/// Batch convert local integer FKs to UUIDs across multiple records.
106106
///
107-
/// Same contract as [`convert_fk_to_uuid`] but one DB round trip per FK type
108-
/// instead of one per (record × FK). Records that reference a missing target
109-
/// are left with their original local-id field intact; callers log/skip as
110-
/// needed. Called from `query_for_sync` implementations on the sync sender.
107+
/// Same per-record semantics as [`convert_fk_to_uuid`] but one DB round trip
108+
/// per FK type instead of one per (record × FK). Records whose target could
109+
/// not be resolved (missing target row, non-integer value) are reported by
110+
/// index — callers MUST drop those records rather than ship them. Silently
111+
/// leaving `local_field` in place would let the receiver's
112+
/// `map_sync_json_to_local` interpret the sender-local integer as
113+
/// already-resolved and write it straight into the receiver's DB, corrupting
114+
/// FKs across devices.
115+
///
116+
/// Successful records are mutated in place (`local_field` removed,
117+
/// `uuid_field` added). The indices of failed records are returned.
111118
pub async fn convert_fks_to_uuids_batch(
112119
records: &mut [Value],
113120
fk: &FKMapping,
114121
db: &DatabaseConnection,
115-
) -> Result<()> {
122+
) -> Result<HashSet<usize>> {
123+
let mut failed: HashSet<usize> = HashSet::new();
116124
if records.is_empty() {
117-
return Ok(());
125+
return Ok(failed);
118126
}
119127

120128
let uuid_field = fk.uuid_field_name();
121129

122-
// Collect all local IDs we need to resolve.
130+
// First pass: collect IDs and flag records whose local_field isn't a valid
131+
// integer so they can be dropped by the caller.
123132
let mut ids_to_lookup: HashSet<i32> = HashSet::new();
124-
for json in records.iter() {
133+
for (idx, json) in records.iter().enumerate() {
125134
match json.get(fk.local_field) {
126-
Some(v) if v.is_null() => { /* null FK, handled below */ }
127-
Some(v) => {
128-
if let Some(id) = v.as_i64() {
135+
None => { /* absent — treated as null below */ }
136+
Some(v) if v.is_null() => { /* null — treated as null below */ }
137+
Some(v) => match v.as_i64() {
138+
Some(id) => {
129139
ids_to_lookup.insert(id as i32);
130140
}
131-
}
132-
None => { /* field absent, handled below */ }
141+
None => {
142+
tracing::warn!(
143+
fk_field = fk.local_field,
144+
value = %v,
145+
"FK field has non-integer value in sync payload; dropping record"
146+
);
147+
failed.insert(idx);
148+
}
149+
},
133150
}
134151
}
135152

@@ -139,46 +156,48 @@ pub async fn convert_fks_to_uuids_batch(
139156
batch_lookup_uuids_for_local_ids(fk.target_table, ids_to_lookup, db).await?
140157
};
141158

142-
for json in records.iter_mut() {
159+
for (idx, json) in records.iter_mut().enumerate() {
160+
if failed.contains(&idx) {
161+
continue;
162+
}
163+
143164
let local_field_value = json.get(fk.local_field).cloned();
144165

145166
match local_field_value {
167+
None => {
168+
json[&uuid_field] = Value::Null;
169+
}
146170
Some(v) if v.is_null() => {
147171
json[&uuid_field] = Value::Null;
148172
if let Some(obj) = json.as_object_mut() {
149173
obj.remove(fk.local_field);
150174
}
151175
}
152176
Some(v) => {
153-
if let Some(id) = v.as_i64() {
154-
match id_to_uuid.get(&(id as i32)) {
155-
Some(uuid) => {
156-
json[&uuid_field] = json!(uuid.to_string());
157-
if let Some(obj) = json.as_object_mut() {
158-
obj.remove(fk.local_field);
159-
}
160-
}
161-
None => {
162-
// Target record missing locally — leave FK field intact
163-
// so the caller can decide whether to skip or warn.
164-
tracing::warn!(
165-
fk_field = fk.local_field,
166-
target_table = fk.target_table,
167-
id = id,
168-
"FK target not found during batch conversion; skipping record"
169-
);
177+
// Unwrap is safe: first pass validated i64 or flagged as failed.
178+
let id = v.as_i64().expect("validated in first pass") as i32;
179+
match id_to_uuid.get(&id) {
180+
Some(uuid) => {
181+
json[&uuid_field] = json!(uuid.to_string());
182+
if let Some(obj) = json.as_object_mut() {
183+
obj.remove(fk.local_field);
170184
}
171185
}
186+
None => {
187+
tracing::warn!(
188+
fk_field = fk.local_field,
189+
target_table = fk.target_table,
190+
id = id,
191+
"FK target not found during batch conversion; dropping record"
192+
);
193+
failed.insert(idx);
194+
}
172195
}
173196
}
174-
None => {
175-
// Field absent — treat as null for sync payloads.
176-
json[&uuid_field] = Value::Null;
177-
}
178197
}
179198
}
180199

181-
Ok(())
200+
Ok(failed)
182201
}
183202

184203
/// Look up UUID for a local integer ID via the registry

core/src/infra/sync/peer_log.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,14 @@ impl PeerLog {
221221
since: Option<HLC>,
222222
limit: Option<usize>,
223223
) -> Result<Vec<SharedChangeEntry>, PeerLogError> {
224-
let query = match (since, limit) {
224+
// Clamp to i64::MAX so a ludicrously large usize never wraps into a
225+
// negative value when SQLite binds the parameter.
226+
let sql_limit = limit.map(|lim| i64::try_from(lim).unwrap_or(i64::MAX));
227+
let query = match (since, sql_limit) {
225228
(Some(hlc), Some(lim)) => Statement::from_sql_and_values(
226229
DbBackend::Sqlite,
227230
"SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes WHERE hlc > ? ORDER BY hlc ASC LIMIT ?",
228-
vec![hlc.to_string().into(), (lim as i64).into()],
231+
vec![hlc.to_string().into(), lim.into()],
229232
),
230233
(Some(hlc), None) => Statement::from_sql_and_values(
231234
DbBackend::Sqlite,
@@ -235,7 +238,7 @@ impl PeerLog {
235238
(None, Some(lim)) => Statement::from_sql_and_values(
236239
DbBackend::Sqlite,
237240
"SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes ORDER BY hlc ASC LIMIT ?",
238-
vec![(lim as i64).into()],
241+
vec![lim.into()],
239242
),
240243
(None, None) => Statement::from_string(
241244
DbBackend::Sqlite,

core/src/service/sync/peer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2715,6 +2715,12 @@ impl PeerSync {
27152715
"Querying shared changes from peer log"
27162716
);
27172717

2718+
// limit == 0 with our `len > limit` rule would return 0 rows plus
2719+
// has_more = true, so a naive caller would spin forever.
2720+
if limit == 0 {
2721+
return Err(anyhow::anyhow!("shared changes limit must be > 0"));
2722+
}
2723+
27182724
// Query peer log with SQL-side LIMIT, fetching one extra row to detect has_more
27192725
let fetch_limit = limit.saturating_add(1);
27202726
let mut entries = self

core/src/service/sync/protocol_handler.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ impl LogSyncHandler {
114114
since_hlc: Option<HLC>,
115115
limit: usize,
116116
) -> Result<SyncMessage> {
117+
// Reject limit == 0 up front: with our `has_more = entries.len() > limit`
118+
// rule it would return 0 rows with has_more = true and the caller would
119+
// spin forever reissuing the same request.
120+
if limit == 0 {
121+
anyhow::bail!("SharedChangeRequest limit must be > 0");
122+
}
123+
117124
// Get changes from our peer log, fetching one extra row so we can derive has_more
118125
// without reloading the entire log on every batch request.
119126
let fetch_limit = limit.saturating_add(1);

0 commit comments

Comments
 (0)