Skip to content

Commit ade492d

Browse files
committed
sinks: pass arrangements directly to SinkRender impls
Replaces the pre-grouped VecCollection<(Option<Row>, DiffPair<Row>)> that render_sink previously handed sinks with an Arranged<SinkTrace> — each sink now walks cursors itself via for_each_diff_pair. - The shared key-extraction + arrangement lives in render_sink on sinks.rs; zip_into_diff_pairs and combine_at_timestamp are gone. - Kafka's encode_collection walks the arrangement inside its async operator, emitting a KafkaMessage per DiffPair. - Iceberg gains a small walker operator at its input that walks the arrangement into the same (Option<Row>, DiffPair<Row>) stream shape its mint / write / commit pipeline already consumed. - Per-(key, timestamp) primary-key-violation detection is lifted into a new PkViolationWarner in sinks.rs, observed incrementally as each sink walks the arrangement instead of after a Vec<DiffPair> materialization. The immediate win is skipping the Vec<DiffPair> allocation per (key, time) in combine_at_timestamp; the larger architectural win is that sinks can now make envelope- and batch-specific decisions about cursor navigation (e.g., snapshot fast paths) without touching shared plumbing.
1 parent 5816f2f commit ade492d

4 files changed

Lines changed: 292 additions & 268 deletions

File tree

src/interchange/src/envelopes.rs

Lines changed: 5 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@ use std::collections::BTreeMap;
1111
use std::iter;
1212
use std::sync::LazyLock;
1313

14-
use differential_dataflow::operators::arrange::Arranged;
1514
use differential_dataflow::trace::implementations::BatchContainer;
16-
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
17-
use differential_dataflow::{AsCollection, VecCollection};
18-
use itertools::{Either, EitherOrBoth, Itertools};
15+
use differential_dataflow::trace::{BatchReader, Cursor};
16+
use itertools::{Either, EitherOrBoth};
1917
use maplit::btreemap;
2018
use mz_ore::cast::CastFrom;
2119
use mz_repr::{
2220
CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
2321
};
24-
use timely::dataflow::channels::pact::Pipeline;
25-
use timely::dataflow::operators::Operator;
2622

2723
use crate::avro::DiffPair;
2824

@@ -32,11 +28,9 @@ use crate::avro::DiffPair;
3228
/// Within a key, diffs are partitioned by sign into retractions (befores) and
3329
/// insertions (afters), sorted by timestamp, and zipped into `DiffPair`s via a
3430
/// merge-join. Pairs are emitted in ascending timestamp order for a given key;
35-
/// no ordering is guaranteed across keys.
36-
///
37-
/// This is the batch-level cursor walk underlying [`combine_at_timestamp`].
38-
/// Sinks that consume an arrangement directly can call this inside their own
39-
/// operator to stream pairs without materializing a `Vec<DiffPair>` per group.
31+
/// no ordering is guaranteed across keys. Callers are responsible for tracking
32+
/// `(key, timestamp)` boundaries themselves if they need to detect groups
33+
/// with more than one pair (e.g., for primary-key violation checks).
4034
pub fn for_each_diff_pair<B, F>(batch: &B, mut on_diff_pair: F)
4135
where
4236
B: BatchReader<Diff = Diff>,
@@ -106,111 +100,6 @@ where
106100
}
107101
}
108102

109-
/// Given a stream of batches, produce a stream of groups of DiffPairs, grouped
110-
/// by key, at each timestamp.
111-
///
112-
// This is useful for some sink envelopes (e.g., Debezium and Upsert), which
113-
// need to do specific logic based on the _entire_ set of before/after diffs for
114-
// a given key at each timestamp.
115-
pub fn combine_at_timestamp<'scope, Tr>(
116-
arranged: Arranged<'scope, Tr>,
117-
) -> VecCollection<
118-
'scope,
119-
Tr::Time,
120-
(
121-
<Tr::KeyContainer as BatchContainer>::Owned,
122-
Vec<DiffPair<Tr::ValOwn>>,
123-
),
124-
Diff,
125-
>
126-
where
127-
Tr: Clone + TraceReader<Diff = Diff, ValOwn: 'static, Time: Copy>,
128-
<Tr::KeyContainer as BatchContainer>::Owned: Clone + 'static,
129-
{
130-
arranged
131-
.stream
132-
.unary(Pipeline, "combine_at_timestamp", move |_, _| {
133-
move |input, output| {
134-
input.for_each_time(|time, batches| {
135-
let mut session = output.session(&time);
136-
for batch in batches.flat_map(IntoIterator::into_iter) {
137-
let mut befores = vec![];
138-
let mut afters = vec![];
139-
140-
let mut cursor = batch.cursor();
141-
while cursor.key_valid(batch) {
142-
let k = cursor.key(batch);
143-
144-
// Partition updates into retractions (befores)
145-
// and insertions (afters).
146-
while cursor.val_valid(batch) {
147-
let v = cursor.val(batch);
148-
cursor.map_times(batch, |t, diff| {
149-
let diff = Tr::owned_diff(diff);
150-
let update = (
151-
Tr::owned_time(t),
152-
Tr::owned_val(v),
153-
usize::cast_from(diff.unsigned_abs()),
154-
);
155-
if diff < Diff::ZERO {
156-
befores.push(update);
157-
} else {
158-
afters.push(update);
159-
}
160-
});
161-
cursor.step_val(batch);
162-
}
163-
164-
// Sort by timestamp.
165-
befores.sort_by_key(|(t, _v, _diff)| *t);
166-
afters.sort_by_key(|(t, _v, _diff)| *t);
167-
168-
// Convert diff into unary representation.
169-
let befores = befores
170-
.drain(..)
171-
.flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
172-
let afters = afters
173-
.drain(..)
174-
.flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
175-
176-
// At each timestamp, zip together the insertions
177-
// and retractions into diff pairs.
178-
let groups = itertools::merge_join_by(
179-
befores,
180-
afters,
181-
|(t1, _v1), (t2, _v2)| t1.cmp(t2),
182-
)
183-
.map(|pair| match pair {
184-
EitherOrBoth::Both((t, before), (_t, after)) => {
185-
(t, Some(before.clone()), Some(after.clone()))
186-
}
187-
EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
188-
EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
189-
})
190-
.chunk_by(|(t, _before, _after)| *t);
191-
192-
// For each timestamp, emit the group of
193-
// `DiffPair`s.
194-
for (t, group) in &groups {
195-
let group = group
196-
.map(|(_t, before, after)| DiffPair { before, after })
197-
.collect();
198-
session.give((
199-
(<Tr::KeyContainer as BatchContainer>::into_owned(k), group),
200-
t,
201-
Diff::ONE,
202-
));
203-
}
204-
205-
cursor.step_key(batch);
206-
}
207-
}
208-
});
209-
}
210-
})
211-
.as_collection()
212-
}
213-
214103
// NOTE(benesch): statically allocating transient IDs for the
215104
// transaction and row types is a bit of a hack to allow us to attach
216105
// custom names to these types in the generated Avro schema. In the

src/storage/src/render/sinks.rs

Lines changed: 97 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
use std::sync::Arc;
1313
use std::time::{Duration, Instant};
1414

15-
use differential_dataflow::operators::arrange::Arrange;
15+
use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
1616
use differential_dataflow::trace::implementations::ord_neu::{
1717
OrdValBatcher, OrdValSpine, RcOrdValBuilder,
1818
};
1919
use differential_dataflow::{AsCollection, Hashable, VecCollection};
20-
use mz_interchange::avro::DiffPair;
21-
use mz_interchange::envelopes::combine_at_timestamp;
2220
use mz_persist_client::operators::shard_source::SnapshotMode;
2321
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
2422
use mz_storage_operators::persist_source;
@@ -33,6 +31,10 @@ use tracing::warn;
3331
use crate::healthcheck::HealthStatusMessage;
3432
use crate::storage_state::StorageState;
3533

34+
/// The concrete trace type used to hand arranged sink input to
35+
/// [`SinkRender::render_sink`].
36+
pub(crate) type SinkTrace = TraceAgent<OrdValSpine<Option<Row>, Row, Timestamp, Diff>>;
37+
3638
/// _Renders_ complete _differential_ collections
3739
/// that represent the sink and its errors as requested
3840
/// by the original `CREATE SINK` statement.
@@ -77,46 +79,40 @@ pub(crate) fn render_sink<'scope>(
7779
);
7880
tokens.extend(persist_tokens);
7981

80-
let ok_collection =
81-
zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
82+
let arranged = arrange_sink_input(&*sink_render, ok_collection.as_collection());
83+
let key_is_synthetic = sink_render.get_key_indices().is_none()
84+
&& sink_render.get_relation_key_indices().is_none();
8285

8386
let (health, sink_tokens) = sink_render.render_sink(
8487
storage_state,
8588
sink,
8689
sink_id,
87-
ok_collection,
90+
arranged,
91+
key_is_synthetic,
8892
err_collection.as_collection(),
8993
);
9094
tokens.extend(sink_tokens);
9195
(health.leave(outer_scope), tokens)
9296
})
9397
}
9498

95-
/// Zip the input to a sink so that updates to the same key appear as
96-
/// `DiffPair`s.
97-
fn zip_into_diff_pairs<'scope>(
98-
sink_id: GlobalId,
99-
sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
99+
/// Extract the sink's key column(s) from each row and arrange the resulting
100+
/// `(Option<Row>, Row)` collection by key.
101+
///
102+
/// Prefers the user-specified sink key, falling back to any natural key of the
103+
/// underlying relation. When neither exists, a synthetic per-row hash is used
104+
/// purely to distribute work across workers — in that case the sink should
105+
/// treat the key as absent (`key_is_synthetic`).
106+
fn arrange_sink_input<'scope>(
100107
sink_render: &dyn SinkRender<'scope>,
101108
collection: VecCollection<'scope, Timestamp, Row, Diff>,
102-
) -> VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff> {
103-
// We need to consolidate the collection and group records by their key.
104-
// We'll first attempt to use the explicitly declared key when the sink was
105-
// created. If no such key exists, we'll use a key of the sink's underlying
106-
// relation, if one exists.
107-
//
108-
// If no such key exists, we'll generate a synthetic key based on the hash
109-
// of the row, just for purposes of distributing work among workers. In this
110-
// case the key offers no uniqueness guarantee.
111-
112-
let user_key_indices = sink_render.get_key_indices();
113-
let relation_key_indices = sink_render.get_relation_key_indices();
114-
let key_indices = user_key_indices
115-
.or(relation_key_indices)
109+
) -> Arranged<'scope, SinkTrace> {
110+
let key_indices = sink_render
111+
.get_key_indices()
112+
.or_else(|| sink_render.get_relation_key_indices())
116113
.map(|k| k.to_vec());
117-
let key_is_synthetic = key_indices.is_none();
118114

119-
let collection = match key_indices {
115+
let keyed = match key_indices {
120116
None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
121117
Some(key_indices) => {
122118
let mut datum_vec = mz_repr::DatumVec::new();
@@ -132,55 +128,77 @@ fn zip_into_diff_pairs<'scope>(
132128
}
133129
};
134130

135-
// Group messages by key at each timestamp.
136-
//
137131
// Allow access to `arrange_named` because we cannot access Mz's wrapper
138132
// from here. TODO(database-issues#5046): Revisit with cluster unification.
139133
#[allow(clippy::disallowed_methods)]
140-
let mut collection =
141-
combine_at_timestamp(collection.arrange_named::<OrdValBatcher<_,_,_,_>, RcOrdValBuilder<_,_,_,_>, OrdValSpine<_, _, _, _>>("Arrange Sink"));
142-
143-
// If there is no user-specified key, remove the synthetic key.
144-
//
145-
// We don't want the synthetic key to appear in the sink's actual output; we
146-
// just needed a value to use to distribute work.
147-
if user_key_indices.is_none() {
148-
collection = collection.map(|(_key, value)| (None, value))
134+
keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink")
135+
}
136+
137+
/// Rate-limited detector for primary-key uniqueness violations as a sink's
138+
/// cursor walk observes `(key, timestamp)` groups.
139+
///
140+
/// Call [`PkViolationWarner::observe`] once per emitted `DiffPair`. When the
141+
/// current `(key, timestamp)` group changes — or when input batches finish —
142+
/// call [`PkViolationWarner::flush`] so the accumulated count is evaluated.
143+
///
144+
/// Keys are identified by their `Hashable::hashed()` value rather than held
145+
/// by value, so the hot observe path does no `Row` clones. A hash collision
146+
/// can mask a PK violation but this is a purely diagnostic check, so the
147+
/// trade-off is acceptable.
148+
pub(crate) struct PkViolationWarner {
149+
sink_id: GlobalId,
150+
from_id: GlobalId,
151+
last_warning: Instant,
152+
current: Option<(u64, Timestamp)>,
153+
count: usize,
154+
}
155+
156+
impl PkViolationWarner {
157+
pub fn new(sink_id: GlobalId, from_id: GlobalId) -> Self {
158+
Self {
159+
sink_id,
160+
from_id,
161+
last_warning: Instant::now(),
162+
current: None,
163+
count: 0,
164+
}
149165
}
150166

151-
collection.flat_map({
152-
let mut last_warning = Instant::now();
153-
let from_id = sink.from;
154-
move |(mut k, vs)| {
155-
// If the key is not synthetic, emit a warning to internal logs if
156-
// we discover a primary key violation.
157-
//
158-
// TODO: put the sink in a user-visible errored state instead of
159-
// only logging internally. See:
160-
// https://github.com/MaterializeInc/database-issues/issues/5099.
161-
if !key_is_synthetic && vs.len() > 1 {
162-
// We rate limit how often we emit this warning to avoid
163-
// flooding logs.
164-
let now = Instant::now();
165-
if now.duration_since(last_warning) >= Duration::from_secs(10) {
166-
last_warning = now;
167-
warn!(
168-
?sink_id,
169-
?from_id,
170-
"primary key error: expected at most one update per key and timestamp; \
171-
this can happen when the configured sink key is not a primary key of \
172-
the sinked relation"
173-
)
174-
}
175-
}
167+
/// Record that a `DiffPair` was observed at `(key, time)`. If this starts
168+
/// a new group, the previous group's count is flushed (and warned about
169+
/// if the count was > 1).
170+
pub fn observe(&mut self, key: &Option<Row>, time: Timestamp) {
171+
// `None` keys hash to a distinct sentinel from any `Row::hashed()`;
172+
// the exact constant doesn't matter for correctness (it just needs
173+
// to be stable).
174+
let hash = key.as_ref().map(|k| k.hashed()).unwrap_or(u64::MAX);
175+
let same = self.current == Some((hash, time));
176+
if !same {
177+
self.flush();
178+
self.current = Some((hash, time));
179+
}
180+
self.count += 1;
181+
}
176182

177-
let max_idx = vs.len() - 1;
178-
vs.into_iter().enumerate().map(move |(idx, dp)| {
179-
let k = if idx == max_idx { k.take() } else { k.clone() };
180-
(k, dp)
181-
})
183+
/// Flush the pending `(key, timestamp)` group count. Emits a
184+
/// rate-limited warning if more than one `DiffPair` was observed.
185+
pub fn flush(&mut self) {
186+
if self.count > 1 {
187+
let now = Instant::now();
188+
if now.duration_since(self.last_warning) >= Duration::from_secs(10) {
189+
self.last_warning = now;
190+
warn!(
191+
sink_id = ?self.sink_id,
192+
from_id = ?self.from_id,
193+
"primary key error: expected at most one update per key and timestamp; \
194+
this can happen when the configured sink key is not a primary key of \
195+
the sinked relation"
196+
);
197+
}
182198
}
183-
})
199+
self.current = None;
200+
self.count = 0;
201+
}
184202
}
185203

186204
/// A type that can be rendered as a dataflow sink.
@@ -194,12 +212,21 @@ pub(crate) trait SinkRender<'scope> {
194212
fn get_relation_key_indices(&self) -> Option<&[usize]>;
195213

196214
/// Renders the sink's dataflow.
215+
///
216+
/// The sink receives the input as an arrangement keyed on `Option<Row>`.
217+
/// The sink is responsible for walking the arrangement (typically via
218+
/// [`mz_interchange::envelopes::for_each_diff_pair`]) and handling any
219+
/// envelope-specific diff-pair construction. When `key_is_synthetic` is
220+
/// true the arrangement's key is a per-row hash used only for worker
221+
/// distribution — the sink should treat the key as absent when producing
222+
/// output.
197223
fn render_sink(
198224
&self,
199225
storage_state: &mut StorageState,
200226
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
201227
sink_id: GlobalId,
202-
sinked_collection: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
228+
arranged: Arranged<'scope, SinkTrace>,
229+
key_is_synthetic: bool,
203230
err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
204231
) -> (
205232
StreamVec<'scope, Timestamp, HealthStatusMessage>,

0 commit comments

Comments
 (0)