Skip to content

Commit 264cfea

Browse files
committed
sinks: skip gratuitous key clones on the hot DiffPair path
Two changes to remove per-DiffPair Row clones that showed up as an Iceberg perf regression in the feature benchmark: 1. Iceberg walker emits None for the key unconditionally. write_data_files destructures the key as _row on both the stash and drain paths and never uses it; the PK warner still sees the real key before we replace it with None, so violation detection is unchanged. Removes one Row clone per DiffPair. 2. PkViolationWarner stores Row::hashed() (u64) instead of a cloned Option<Row> to identify the current group. A hash collision could mask a single PK violation but this is a purely diagnostic code path, so the trade-off is acceptable. Removes another Row clone per group transition — which for snapshot workloads with unique keys is every DiffPair.
1 parent 6f8b1e4 commit 264cfea

2 files changed

Lines changed: 16 additions & 8 deletions

File tree

src/storage/src/render/sinks.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,16 @@ fn arrange_sink_input<'scope>(
158158
/// Call [`PkViolationWarner::observe`] once per emitted `DiffPair`. When the
159159
/// current `(key, timestamp)` group changes — or when input batches finish —
160160
/// call [`PkViolationWarner::flush`] so the accumulated count is evaluated.
161+
///
162+
/// Keys are identified by their `Hashable::hashed()` value rather than held
163+
/// by value, so the hot observe path does no `Row` clones. A hash collision
164+
/// can mask a PK violation but this is a purely diagnostic check, so the
165+
/// trade-off is acceptable.
161166
pub(crate) struct PkViolationWarner {
162167
sink_id: GlobalId,
163168
from_id: GlobalId,
164169
last_warning: Instant,
165-
current: Option<(Option<Row>, Timestamp)>,
170+
current: Option<(u64, Timestamp)>,
166171
count: usize,
167172
}
168173

@@ -181,13 +186,14 @@ impl PkViolationWarner {
181186
/// a new group, the previous group's count is flushed (and warned about
182187
/// if the count was > 1).
183188
pub fn observe(&mut self, key: &Option<Row>, time: Timestamp) {
184-
let same = match &self.current {
185-
Some((k, t)) => k == key && *t == time,
186-
None => false,
187-
};
189+
// `None` keys hash to a distinct sentinel from any `Row::hashed()`;
190+
// the exact constant doesn't matter for correctness (it just needs
191+
// to be stable).
192+
let hash = key.as_ref().map(|k| k.hashed()).unwrap_or(u64::MAX);
193+
let same = self.current == Some((hash, time));
188194
if !same {
189195
self.flush();
190-
self.current = Some((key.clone(), time));
196+
self.current = Some((hash, time));
191197
}
192198
self.count += 1;
193199
}

src/storage/src/sink/iceberg.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2444,8 +2444,10 @@ fn walk_sink_arrangement<'scope>(
24442444
if let Some(warner) = pk_warner.as_mut() {
24452445
warner.observe(key, time);
24462446
}
2447-
let emitted_key = if key_is_synthetic { None } else { key.clone() };
2448-
output.give(&cap, ((emitted_key, diff_pair), time, Diff::ONE));
2447+
// The arrangement key is only used downstream for grouping and PK checks;
2448+
// `write_data_files` discards it on both the stash and drain paths. Emit
2449+
// None unconditionally to avoid per-`DiffPair` `Row` clones on this hot path.
2450+
output.give(&cap, ((None, diff_pair), time, Diff::ONE));
24492451
});
24502452
}
24512453
}

0 commit comments

Comments
 (0)