fix library sync backfill O(N^2) hotspots on both sides of the protocol#3061
Conversation
Addresses #3058 (shared models, content_identity) and #3060 (device-owned entries). The progressive slowdown on initial pair came from several independent hotspots that compound: Sender side - PeerLog::get_since now pushes LIMIT into SQL; callers fetch limit + 1 to derive has_more. Previously every SharedChangeRequest reloaded and parsed the entire remaining shared_changes log in memory before truncating, making sender work O(N * batches). - Entry and content_identity query_for_sync batch FK to UUID conversion across the whole batch via a new convert_fks_to_uuids_batch helper — one DB round trip per FK type instead of per record per FK. Schema - New index idx_entries_indexed_at_uuid backing the (indexed_at, uuid) cursor in Entry::query_for_sync. Without it every batch request fell back to a full-table scan. Receiver side - New task-local in_backfill scope wraps the backfill apply phases. Entry::apply_state_change uses it to skip per-entry entry_closure rebuild; the existing post_backfill_rebuild pass does a single bulk rebuild at the end. emit_batch_resource_events short-circuits during backfill and the coordinator emits one Event::Refresh after post- backfill rebuild so the UI invalidates cached views. - Entry FK resolution in backfill splits mappings into self-referential (parent_id) and the rest. Non-self FKs resolve in one batch across the whole batch; only parent_id still runs per-entry so children can see just-inserted parents. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PR SummaryMedium Risk Overview On the sender, On the receiver, phases 2–3 of backfill run inside a new task-local Adds a migration creating Reviewed by Cursor Bugbot for commit 17fdc0a. Configure here. |
WalkthroughAdds a task-local backfill marker and uses it to: skip per-record resource event fan-out and per-entry closure rebuilds during backfill; batch FK→UUID resolution on sender and receiver; add SQL LIMIT to peer log pagination and a composite cursor index for entries; and adjust backfill orchestration to run inside the backfill scope. Changes
Sequence DiagramsequenceDiagram
autonumber
actor BackfillLoop as Backfill Loop
participant DB as Database
participant FKBatch as FK Batch Resolver
participant Applier as State Applier
participant Closure as Closure Rebuilder
participant EventBus as Event Bus
BackfillLoop->>BackfillLoop: in_backfill(...) (set IN_BACKFILL)
BackfillLoop->>DB: Fetch next batch (cursor + LIMIT)
DB-->>BackfillLoop: Batch of rows (JSON payloads)
BackfillLoop->>FKBatch: Batch-resolve non-self FKs
FKBatch->>DB: Lookup UUIDs for collected local IDs
DB-->>FKBatch: UUID mappings
FKBatch-->>BackfillLoop: Resolved payloads
BackfillLoop->>Applier: Apply batch (per-entry for self-FKs as needed)
Applier->>DB: Insert/update rows
DB-->>Applier: OK
Applier->>Closure: check is_in_backfill()
Closure-->>Applier: true (skip per-entry rebuild)
Applier->>EventBus: emit_batch_resource_events()
EventBus-->>Applier: trace-only (no per-record fan-out)
BackfillLoop->>Closure: run_post_backfill_rebuild (bulk)
Closure->>DB: Rebuild all closures
DB-->>Closure: Done
BackfillLoop->>EventBus: emit Event::Refresh
BackfillLoop->>BackfillLoop: end in_backfill (clear IN_BACKFILL)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
core/src/service/sync/backfill.rs (2)
212-247:⚠️ Potential issue | 🟠 MajorDo not continue to Ready if deferred rebuilds fail.
Now that
in_backfillsuppresses per-record hooks,run_post_backfill_rebuildsis the only path that rebuilds derived tables for those records. The surrounding warn-and-continue path can leaveentry_closureincomplete, then emitRefreshand transition the device to ready with stale hierarchy data.Proposed change to make post-backfill rebuild mandatory
- if let Err(e) = - crate::infra::sync::registry::run_post_backfill_rebuilds(self.peer_sync.db().clone()) - .await - { - tracing::warn!("Post-backfill rebuild had errors: {}", e); - // Don't fail backfill, just warn - } + crate::infra::sync::registry::run_post_backfill_rebuilds(self.peer_sync.db().clone()) + .await + .map_err(|e| anyhow::anyhow!("Post-backfill rebuild failed: {}", e))?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/service/sync/backfill.rs` around lines 212 - 247, The post-backfill rebuild errors are currently only warned about, which lets the device continue to Ready with incomplete derived tables; change the code so failures in crate::infra::sync::registry::run_post_backfill_rebuilds(self.peer_sync.db().clone()).await are propagated to abort the backfill: replace the if-let-warn block with a fallible call (use ? or return Err(anyhow::Error::new(e))) so the function returns the error, and only emit self.peer_sync.event_bus().emit(crate::infra::event::Event::Refresh) after successful rebuilds; ensure the in_backfill scope, run_post_backfill_rebuilds, and event emission lines are updated accordingly.
627-690:⚠️ Potential issue | 🟠 MajorInterleave self-FK resolution with applying parents.
This loop resolves every
parent_uuidbefore any entry frompre_resolvedis inserted by the later apply loop. Same-batch children therefore cannot “see” just-inserted parents; they get routed through the dependency tracker instead, which reintroduces extra per-record retry work and can strand deeper same-batch descendants if dependency resolution is not cascaded.Consider applying entries immediately after successful self-FK resolution, or topologically processing parent chains so each parent is inserted before resolving its children.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/service/sync/backfill.rs` around lines 627 - 690, The loop currently resolves self-referential FKs for all pre_resolved entries before any inserts, causing same-batch children to miss newly-inserted parents; change it so that after crate::infra::sync::batch_map_sync_json_to_local(vec![data.clone()], self_ref_mappings.clone(), &db).await returns a non-empty result.succeeded you immediately apply/insert those succeeded entries (i.e., invoke the same per-entry "apply" logic used later in the file) instead of deferring to the later apply loop or routing to self.peer_sync.dependency_tracker().add_dependency; alternatively implement a simple topological parent-first ordering of pre_resolved chains so parents (identified by parent_id/parent_uuid in the JSON) are inserted before their children, ensuring super::state::StateChangeMessage is only sent to the dependency tracker for genuinely missing external parents.
🧹 Nitpick comments (1)
core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs (1)
1-36: LGTM — index matches the cursor shape.Composite
(indexed_at, uuid)matchesEntry::query_for_sync’sORDER BY indexed_at ASC, uuid ASC+ tie-breaker filter, so SQLite can serve pagination and the range predicate directly from the index.IF NOT EXISTS/IF EXISTSmakeup/downidempotent, which is appropriate here.One optional consideration: on very large
entriestables the initialCREATE INDEXcan hold a write lock for a noticeable duration during migration. If that's a concern for upgrade UX, surface it in release notes — no code change needed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs` around lines 1 - 36, No code changes required: the migration in Migration::up/Down creates/drops the composite index idx_entries_indexed_at_uuid that matches Entry::query_for_sync's ORDER BY indexed_at ASC, uuid ASC and uses IF NOT EXISTS/IF EXISTS for idempotence; proceed to approve/merge this migration, and optionally add a release note warning that CREATE INDEX on very large entries tables can hold a brief write lock during upgrades.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/src/infra/db/entities/content_identity.rs`:
- Around line 268-298: The batch FK conversion in query_for_sync (seen in
content_identity.rs: variables fk_mappings, payloads, sync_results) blindly
replaces per-record JSON with the converted payloads from
convert_fks_to_uuids_batch, which leaves unresolved rows unchanged (local FK
still present) and leads map_sync_json_to_local to write sender-local FK ints
into the DB; fix by detecting unresolved conversions after the batch call (for
each fk: if resolved_payload lacks the uuid_field but still has local_field) and
either skip those records when zipping back into sync_results or return an error
— i.e., only overwrite *json in the zip-back loop for records that have the
uuid_field present (and remove or mark others), and apply the same change to the
identical logic in entry.rs (or alternatively change convert_fks_to_uuids_batch
to return failed indices and handle them here).
In `@core/src/infra/db/entities/entry.rs`:
- Around line 337-359: After calling convert_fks_to_uuids_batch for each fk
mapping, verify that no payload still contains the source/local FK field
(fk.local_field) and fail the whole sender batch if any remain; specifically,
after the call that produces payloads (before the assignment into staged)
iterate the payloads' serde_json::Value items and check for the presence of the
fk.local_field key (or nested path used), and if found log a warning including
fk.local_field and return Err(sea_orm::DbErr::Custom(...)) to stop the send;
update the block around convert_fks_to_uuids_batch / payloads -> staged
assignment to perform this guard for each fk in fk_mappings.
In `@core/src/infra/sync/fk_mapper.rs`:
- Around line 111-182: convert_fks_to_uuids_batch currently leaves
fk.local_field intact when a target ID cannot be resolved, making unresolved
records indistinguishable from resolved ones; change its behavior so that for
any unresolved case (missing mapping or non-i64 local value) the function sets
fk.uuid_field to Value::Null and removes fk.local_field from the record (and
still emit the existing tracing::warn), so downstream code cannot accidentally
treat a stale integer as a resolved FK; locate and update the match arms in
convert_fks_to_uuids_batch (referencing FKMapping::local_field,
FKMapping::uuid_field_name, and batch_lookup_uuids_for_local_ids) to remove the
local field and set the uuid field to Null for both the None branch of
id_to_uuid.get and for the Some(v) but non-i64 case.
---
Outside diff comments:
In `@core/src/service/sync/backfill.rs`:
- Around line 212-247: The post-backfill rebuild errors are currently only
warned about, which lets the device continue to Ready with incomplete derived
tables; change the code so failures in
crate::infra::sync::registry::run_post_backfill_rebuilds(self.peer_sync.db().clone()).await
are propagated to abort the backfill: replace the if-let-warn block with a
fallible call (use ? or return Err(anyhow::Error::new(e))) so the function
returns the error, and only emit
self.peer_sync.event_bus().emit(crate::infra::event::Event::Refresh) after
successful rebuilds; ensure the in_backfill scope, run_post_backfill_rebuilds,
and event emission lines are updated accordingly.
- Around line 627-690: The loop currently resolves self-referential FKs for all
pre_resolved entries before any inserts, causing same-batch children to miss
newly-inserted parents; change it so that after
crate::infra::sync::batch_map_sync_json_to_local(vec![data.clone()],
self_ref_mappings.clone(), &db).await returns a non-empty result.succeeded you
immediately apply/insert those succeeded entries (i.e., invoke the same
per-entry "apply" logic used later in the file) instead of deferring to the
later apply loop or routing to
self.peer_sync.dependency_tracker().add_dependency; alternatively implement a
simple topological parent-first ordering of pre_resolved chains so parents
(identified by parent_id/parent_uuid in the JSON) are inserted before their
children, ensuring super::state::StateChangeMessage is only sent to the
dependency tracker for genuinely missing external parents.
---
Nitpick comments:
In
`@core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs`:
- Around line 1-36: No code changes required: the migration in
Migration::up/Down creates/drops the composite index idx_entries_indexed_at_uuid
that matches Entry::query_for_sync's ORDER BY indexed_at ASC, uuid ASC and uses
IF NOT EXISTS/IF EXISTS for idempotence; proceed to approve/merge this
migration, and optionally add a release note warning that CREATE INDEX on very
large entries tables can hold a brief write lock during upgrades.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8711006e-011f-489d-bf89-36bce7cd25f9
📒 Files selected for processing (12)
core/src/domain/resource_manager.rscore/src/infra/db/entities/content_identity.rscore/src/infra/db/entities/entry.rscore/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rscore/src/infra/db/migration/mod.rscore/src/infra/sync/backfill_context.rscore/src/infra/sync/fk_mapper.rscore/src/infra/sync/mod.rscore/src/infra/sync/peer_log.rscore/src/service/sync/backfill.rscore/src/service/sync/peer.rscore/src/service/sync/protocol_handler.rs
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 17fdc0a. Configure here.
convert_fks_to_uuids_batch now returns the set of record indices whose FK could not be resolved (missing target row or non-integer value). Callers in entry::query_for_sync and content_identity::query_for_sync drop those records from the outgoing sync batch instead of zipping the partially converted payload back. Previously the sender would ship a record with its local int field intact and no *_uuid field; the receiver's map_sync_json_to_local interpreted that as already-resolved and wrote the sender-local integer directly to the local DB, corrupting the FK. Also: reject limit == 0 on SharedChangeRequest / get_shared_changes up front (would have returned 0 rows with has_more = true and spun), and clamp the SQL LIMIT bind with i64::try_from(..).unwrap_or(i64::MAX) instead of a wrapping `as i64`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/src/infra/db/entities/content_identity.rs`:
- Around line 302-317: The current post-filtering of sync_results in
query_for_sync (the sync_results.zip(payloads).filter_map block) can shrink a
DB-limited page when unresolved FKs are dropped, causing empty/short pages and
no cursor to advance; instead either: 1) treat any presence of failed_indices as
a batch-level error and return an Err so the caller can advance/fix, or 2)
implement over-fetching/scanning: keep reading rows from the raw cursor beyond
the initial DB-limited page until you accumulate batch_size valid entries (using
failed_indices to skip invalid rows) or the cursor is exhausted. Update the
logic in query_for_sync (and the variables sync_results, payloads, batch_size,
failed_indices handling) to choose one approach and ensure the returned page
length is never silently reduced by dropping FK-failed rows.
In `@core/src/infra/db/entities/entry.rs`:
- Around line 326-327: The returned cursor uses
entry.indexed_at.unwrap_or(entry.modified_at) (assigned to timestamp and pushed
via staged.push((uuid, json, timestamp))) but the DB query filters/orders by
IndexedAt only, causing cursor mismatch for NULL indexed_at rows; fix by making
the DB query use the same COALESCE(indexed_at, modified_at) expression in its
SELECT/WHERE/ORDER BY so the returned timestamp matches the query cursor, or
alternatively change the code to skip rows with NULL entry.indexed_at (i.e.,
only use entry.indexed_at and do not fall back to modified_at) so the cursor
aligns with the IndexedAt predicate. Ensure the unique symbols entry.indexed_at,
entry.modified_at, timestamp, and staged.push are updated consistently.
- Around line 364-379: The current staged processing zips staged and payloads
and silently drops rows whose indices are in failed_indices, which can shrink a
SQL-limited page and cause the caller to stop scanning prematurely; instead,
when failed_indices is non-empty (the symbols to check are failed_indices,
staged, payloads, and the zip/enumerate/filter_map/collect block), either return
an explicit error for the batch (propagate Err) or implement over-fetching by
abandoning the filter_map approach and continuing to fetch/scan until the
returned page reaches the requested size or the raw cursor is exhausted; update
the block that currently does .zip(...).enumerate().filter_map(...) to detect
failed_indices first and handle it by returning Err or by looping to fetch
additional rows rather than dropping unresolved FK rows.
In `@core/src/infra/sync/fk_mapper.rs`:
- Around line 130-169: The None arm for local_field_value should mark the record
as failed instead of writing a null UUID; update the loop in fk_mapper.rs (the
block iterating records.iter_mut() that matches local_field_value) so that when
local_field_value is None you call failed.insert(idx) (and optionally
tracing::warn! with fk.local_field and idx) and continue, mirroring
convert_fk_to_uuid's behavior, rather than setting json[&uuid_field] =
Value::Null.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: e72066e9-25a0-43c7-ace9-171e218fb4bd
📒 Files selected for processing (6)
core/src/infra/db/entities/content_identity.rscore/src/infra/db/entities/entry.rscore/src/infra/sync/fk_mapper.rscore/src/infra/sync/peer_log.rscore/src/service/sync/peer.rscore/src/service/sync/protocol_handler.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- core/src/service/sync/protocol_handler.rs
- core/src/infra/sync/peer_log.rs
…exed_at convert_fks_to_uuids_batch previously treated a missing local_field as null. convert_fk_to_uuid errors in that case — only an explicit JSON null is a legitimate null FK — so the batch helper now flags absent fields as failed to match the per-record contract. Entry::query_for_sync no longer falls back to modified_at when indexed_at is NULL. The cursor filter/order uses indexed_at exclusively, so a returned cursor derived from modified_at wouldn't match the next query's predicate. The indexed_at backfill migration populated existing rows; any NULL here is a data bug, logged and skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Pushed
Not addressing the two "SQL-limited page shrinkage" comments in this PR: The concern is legitimate (tail-of-batch FK failures can stall the cursor) but the fix is out of scope here:
Filed as a follow-up rather than blocking this PR. Open to doing the over-fetch refactor as a separate change if reviewers prefer. |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
core/src/service/sync/backfill.rs (1)
624-687: Optional: avoid unnecessary clones in the per-entry self-ref loop.Minor:
datais already owned by thefor data in pre_resolvediteration, sovec![data.clone()]allocates twice.self_ref_mappings.clone()per iteration is also avoidable if you lift it once (e.g., pre-clone a reusableVecor take&[FKMapping]if the signature allows). None of this is correctness-affecting — it's a tight inner loop over potentially 100k entries, so every copy counts in a perf PR.♻️ Proposed tweak
- if self_ref_mappings.is_empty() { - succeeded.extend(pre_resolved); - } else { - for data in pre_resolved { - let result = crate::infra::sync::batch_map_sync_json_to_local( - vec![data.clone()], - self_ref_mappings.clone(), - &db, - ) + if self_ref_mappings.is_empty() { + succeeded.extend(pre_resolved); + } else { + for data in pre_resolved { + let result = crate::infra::sync::batch_map_sync_json_to_local( + vec![data], + self_ref_mappings.clone(), + &db, + )Also note: the
if !result.succeeded.is_empty() { ... } else if !result.failed.is_empty() { ... }structure implicitly assumes a single-record input (exactly one of the two is non-empty). That holds today, but a plainif/ifwould be more robust if someone later lifts this to multi-record batching.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/service/sync/backfill.rs` around lines 624 - 687, The per-entry loop currently does unnecessary cloning: when iterating `for data in pre_resolved` move `data` directly into the call to `crate::infra::sync::batch_map_sync_json_to_local` instead of `vec![data.clone()]` (use `vec![data]` or construct the one-item Vec by moving), and hoist a single clone of `self_ref_mappings` (or pass a `&[FKMapping]` if `batch_map_sync_json_to_local` can accept a slice) outside the loop so you don't clone it each iteration; also change the `if !result.succeeded.is_empty() { ... } else if !result.failed.is_empty() { ... }` into two independent `if` checks to avoid assuming single-record input.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@core/src/service/sync/backfill.rs`:
- Around line 624-687: The per-entry loop currently does unnecessary cloning:
when iterating `for data in pre_resolved` move `data` directly into the call to
`crate::infra::sync::batch_map_sync_json_to_local` instead of
`vec![data.clone()]` (use `vec![data]` or construct the one-item Vec by moving),
and hoist a single clone of `self_ref_mappings` (or pass a `&[FKMapping]` if
`batch_map_sync_json_to_local` can accept a slice) outside the loop so you don't
clone it each iteration; also change the `if !result.succeeded.is_empty() { ...
} else if !result.failed.is_empty() { ... }` into two independent `if` checks to
avoid assuming single-record input.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 75b075f0-0f9d-4d2c-a766-a9ac93eed003
📒 Files selected for processing (4)
core/src/infra/db/entities/content_identity.rscore/src/infra/db/entities/entry.rscore/src/infra/sync/fk_mapper.rscore/src/service/sync/backfill.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- core/src/infra/sync/fk_mapper.rs

Summary
content_identity) and Entries (device-owned) sync backfill is slow: missing cursor index, per-record FK resolution, redundant per-entry closure rebuild #3060 (device-ownedentries).backfill_batch_size.Changes
Sender
PeerLog::get_sincenow takeslimit: Option<usize>and pushesLIMIT ?into SQL. Callers fetchlimit + 1to derivehas_more. Previously everySharedChangeRequestreloaded and parsed the entire remainingshared_changeslog before truncating in memory.Entry::query_for_syncandContentIdentity::query_for_syncbatch FK→UUID conversion across the whole batch via a newconvert_fks_to_uuids_batchhelper — one DB round trip per FK type instead of per record × FK.Schema
m20260417_000001_add_entries_sync_cursor_indexaddingidx_entries_indexed_at_uuidto back the cursor used byEntry::query_for_sync. Without it each batch request fell back to a full-table scan ordered byindexed_at.Receiver
infra::sync::backfill_contextmodule exposes a task-localin_backfillscope. The backfill coordinator wraps phases 2 + 3 in it; per-record hooks checkis_in_backfill()to skip work that's redundant with the post-backfill pass.Entry::apply_state_changeskips per-entryentry_closurerebuild during backfill. The existingpost_backfill_rebuildpass already does a single bulk rebuild at the end.ResourceManager::emit_batch_resource_eventsshort-circuits during backfill (avoids the per-UUID fan-out queries infile.rs::route_from_dependency). The backfill coordinator emits oneEvent::Refreshafter the post-backfill rebuild so the frontend drops stale cached views.backfill.rssplits mappings into self-referential (parent_id) and the rest. Non-self FKs resolve in one batch across the whole batch; onlyparent_idstill runs per-entry so children can see just-inserted parents.Test plan
content_identityrecords — batch throughput holds at configuredbackfill_batch_sizeend to endentries— sameentry_closurecorrectly (catch-up path is outsidein_backfill)Event::Refreshfires and invalidates caches)cargo test -p sd-core --lib sync)Closes #3058
Closes #3060
Note
Performance fix for library sync backfill: eliminates O(N²) query patterns on both sender and receiver sides. Sender-side: implements database-level pagination in
PeerLog::get_sinceand batches FK→UUID resolution. Receiver-side: introducesbackfill_contextmodule to skip redundant per-record work during bulk sync, defersentry_closurerebuilds and resource event emissions to post-backfill pass. Result: batch throughput remains constant even with 100k+ records instead of degrading to 100 records/batch.Written by Tembo for commit 17fdc0a.