diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py index e2ab9ea961927..ffedaeff6e020 100644 --- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py +++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py @@ -1520,6 +1520,119 @@ def benchmark(self) -> MeasurementSource: """) +class IcebergSnapshot(Sink): + """Measure time for a freshly-created Iceberg sink to snapshot 1M pre-existing records. + + Mirrors ExactlyOnce's shape: data pre-exists in a source table; `benchmark()` then + creates a brand-new sink that must emit the whole snapshot from as_of. This + exercises the sink's snapshot path (arrangement build, batch emission) rather + than the steady-state streaming path that IcebergSink measures. + """ + + FIXED_SCALE = True + + def __init__( + self, scale: float, mz_version: MzVersion, default_size: int, seed: int + ) -> None: + super().__init__(scale, mz_version, default_size, seed) + self._run_counter = 0 + self._invocation_id = uuid.uuid4() + self._current_table_name: str | None = None + + @classmethod + def can_run(cls, version: MzVersion) -> bool: + return version > MzVersion.create(26, 9, 0) + + def version(self) -> ScenarioVersion: + return ScenarioVersion.create(1, 0, 0) + + def init(self) -> Action: + # Set up connections, a dedicated sink cluster, and a source table + # pre-populated with the records the sink will snapshot on each + # iteration. Population happens once — `benchmark()` only measures sink + # creation + snapshot emission. + return TdAction(f""" +> CREATE SECRET iceberg_secret AS '${{arg.s3-access-key}}'; + +> CREATE CONNECTION aws_conn TO AWS ( + ACCESS KEY ID = 'tduser', + SECRET ACCESS KEY = SECRET iceberg_secret, + ENDPOINT = '${{arg.aws-iceberg-endpoint}}', + REGION = 'us-east-1' + ); + +> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG ( + CATALOG TYPE = 'REST', + URL = 'http://polaris:8181/api/catalog', + CREDENTIAL = 'root:root', + WAREHOUSE = 'default_catalog', + SCOPE = 'PRINCIPAL_ROLE:ALL' + ); + +> DROP CLUSTER IF EXISTS sink_cluster CASCADE + +> CREATE CLUSTER sink_cluster SIZE 'scale={self._default_size},workers=1', REPLICATION FACTOR 1; + +> DROP TABLE IF EXISTS snapshot_source_tbl CASCADE + +> CREATE TABLE snapshot_source_tbl (pk BIGINT, data BIGINT); + +> INSERT INTO snapshot_source_tbl SELECT x, x * 2 FROM generate_series(1, {self.n()}) AS x; + +> SELECT COUNT(*) FROM snapshot_source_tbl; +{self.n()} +""") + + def before(self) -> Action: + # Use a unique Iceberg table name per run so that each snapshot writes + # to a fresh destination — no catalog conflicts, no lingering state + # from previous iterations. + self._run_counter += 1 + version_str = f"v{self._mz_version.major}_{self._mz_version.minor}_{self._mz_version.patch}" + self._current_table_name = f"snapshot_${{testdrive.seed}}_{self._invocation_id}_{version_str}_{self._run_counter}" + return TdAction(""" +> DROP SINK IF EXISTS iceberg_sink; +""") + + def benchmark(self) -> MeasurementSource: + # Measure creation of a fresh sink and completion of its snapshot emission. + # Total rows the sink must emit = n (pre-populated at init()). + table_name = self._current_table_name + n = self.n() + return Td(f""" +> SELECT 1; + /* A */ +1 + +> CREATE SINK iceberg_sink + IN CLUSTER sink_cluster + FROM snapshot_source_tbl + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE '{table_name}' + ) + USING AWS CONNECTION aws_conn + KEY (pk) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + +> SELECT messages_committed >= {n} + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink'; + /* B */ +true + +$ duckdb-execute name=iceberg +CREATE SECRET s3_secret (TYPE S3, KEY_ID '${{arg.s3-access-user}}', SECRET '${{arg.s3-access-key}}', ENDPOINT '${{arg.aws-iceberg-endpoint}}', URL_STYLE 'path', USE_SSL false, REGION 'minio'); +SET unsafe_enable_version_guessing = true; + +$ duckdb-query name=iceberg +SELECT COUNT(*) FROM iceberg_scan('s3://test-bucket/default_namespace/{table_name}') +{n} +""") + + class ManyKafkaSourcesOnSameCluster(Scenario): """Measure the time it takes to ingest data from many Kafka sources""" diff --git a/src/interchange/src/envelopes.rs b/src/interchange/src/envelopes.rs index 1c1ef95dedfc5..70154c642bf67 100644 --- a/src/interchange/src/envelopes.rs +++ b/src/interchange/src/envelopes.rs @@ -11,124 +11,87 @@ use std::collections::BTreeMap; use std::iter; use std::sync::LazyLock; -use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; -use differential_dataflow::{AsCollection, VecCollection}; -use itertools::{EitherOrBoth, Itertools}; +use differential_dataflow::trace::{BatchReader, Cursor}; +use itertools::EitherOrBoth; use maplit::btreemap; use mz_ore::cast::CastFrom; use mz_repr::{ CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType, }; -use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::Operator; use crate::avro::DiffPair; -/// Given a stream of batches, produce a stream of groups of DiffPairs, grouped -/// by key, at each timestamp. +/// Walks `batch` and invokes `on_diff_pair` for each `DiffPair` at each +/// `(key, timestamp)`. /// -// This is useful for some sink envelopes (e.g., Debezium and Upsert), which -// need to do specific logic based on the _entire_ set of before/after diffs for -// a given key at each timestamp. -pub fn combine_at_timestamp<'scope, Tr>( - arranged: Arranged<'scope, Tr>, -) -> VecCollection< - 'scope, - Tr::Time, - ( - ::Owned, - Vec>, - ), - Diff, -> +/// Within a key, diffs are partitioned by sign into retractions (befores) and +/// insertions (afters), sorted by timestamp, and zipped into `DiffPair`s via a +/// merge-join. Pairs are emitted in ascending timestamp order for a given key; +/// no ordering is guaranteed across keys. Callers are responsible for tracking +/// `(key, timestamp)` boundaries themselves if they need to detect groups +/// with more than one pair (e.g., for primary-key violation checks). +pub fn for_each_diff_pair(batch: &B, mut on_diff_pair: F) where - Tr: Clone + TraceReader, - ::Owned: Clone + 'static, + B: BatchReader, + B::Time: Copy, + B::ValOwn: 'static, + F: FnMut(&::Owned, B::Time, DiffPair), { - arranged - .stream - .unary(Pipeline, "combine_at_timestamp", move |_, _| { - move |input, output| { - input.for_each_time(|time, batches| { - let mut session = output.session(&time); - for batch in batches.flat_map(IntoIterator::into_iter) { - let mut befores = vec![]; - let mut afters = vec![]; - - let mut cursor = batch.cursor(); - while cursor.key_valid(batch) { - let k = cursor.key(batch); - - // Partition updates into retractions (befores) - // and insertions (afters). - while cursor.val_valid(batch) { - let v = cursor.val(batch); - cursor.map_times(batch, |t, diff| { - let diff = Tr::owned_diff(diff); - let update = ( - Tr::owned_time(t), - Tr::owned_val(v), - usize::cast_from(diff.unsigned_abs()), - ); - if diff < Diff::ZERO { - befores.push(update); - } else { - afters.push(update); - } - }); - cursor.step_val(batch); - } - - // Sort by timestamp. - befores.sort_by_key(|(t, _v, _diff)| *t); - afters.sort_by_key(|(t, _v, _diff)| *t); - - // Convert diff into unary representation. - let befores = befores - .drain(..) - .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt)); - let afters = afters - .drain(..) - .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt)); - - // At each timestamp, zip together the insertions - // and retractions into diff pairs. - let groups = itertools::merge_join_by( - befores, - afters, - |(t1, _v1), (t2, _v2)| t1.cmp(t2), - ) - .map(|pair| match pair { - EitherOrBoth::Both((t, before), (_t, after)) => { - (t, Some(before.clone()), Some(after.clone())) - } - EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None), - EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())), - }) - .chunk_by(|(t, _before, _after)| *t); - - // For each timestamp, emit the group of - // `DiffPair`s. - for (t, group) in &groups { - let group = group - .map(|(_t, before, after)| DiffPair { before, after }) - .collect(); - session.give(( - (::into_owned(k), group), - t, - Diff::ONE, - )); - } - - cursor.step_key(batch); - } - } - }); - } - }) - .as_collection() + let mut befores: Vec<(B::Time, B::ValOwn, usize)> = vec![]; + let mut afters: Vec<(B::Time, B::ValOwn, usize)> = vec![]; + + let mut cursor = batch.cursor(); + while cursor.key_valid(batch) { + let k = cursor.key(batch); + + // Partition updates at this key into retractions (befores) and + // insertions (afters). + while cursor.val_valid(batch) { + let v = cursor.val(batch); + cursor.map_times(batch, |t, diff| { + let diff = B::owned_diff(diff); + let update = ( + B::owned_time(t), + B::owned_val(v), + usize::cast_from(diff.unsigned_abs()), + ); + if diff < Diff::ZERO { + befores.push(update); + } else { + afters.push(update); + } + }); + cursor.step_val(batch); + } + + befores.sort_by_key(|(t, _v, _diff)| *t); + afters.sort_by_key(|(t, _v, _diff)| *t); + + // The use of `repeat_n()` here is a bit subtle, but load bearing. + // In the common case, cnt = 1, and we want to avoid cloning the value if possible. In the naive + // implementation, we might use `iter::repeat((t, v)).take(cnt)`, but that would clone `v` `cnt` times even + // when `cnt = 1`. By contrast, `repeat_n((t, v), cnt)` will return the original `(t, v)` when `cnt = 1`, + // and only clone when `cnt > 1`. + let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt); + let befores_iter = befores.drain(..).flat_map(fan_out); + let afters_iter = afters.drain(..).flat_map(fan_out); + + let key_owned = ::into_owned(k); + + for pair in + itertools::merge_join_by(befores_iter, afters_iter, |(t1, _v1), (t2, _v2)| t1.cmp(t2)) + { + let (t, before, after) = match pair { + EitherOrBoth::Both((t, before), (_t, after)) => (t, Some(before), Some(after)), + EitherOrBoth::Left((t, before)) => (t, Some(before), None), + EitherOrBoth::Right((t, after)) => (t, None, Some(after)), + }; + on_diff_pair(&key_owned, t, DiffPair { before, after }); + } + + cursor.step_key(batch); + } } // NOTE(benesch): statically allocating transient IDs for the @@ -173,3 +136,154 @@ pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair) { rp.push(Datum::Null); } } + +#[cfg(test)] +mod tests { + use differential_dataflow::trace::Batcher; + use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder}; + use timely::progress::Antichain; + + use super::*; + + /// Seals a single batch from an unordered list of `((key, val), time, diff)` + /// tuples upper-bounded at `upper`. + fn batch_from_tuples( + mut tuples: Vec<((String, String), u64, Diff)>, + upper: u64, + ) -> as differential_dataflow::trace::Builder>::Output + { + let mut batcher = ValBatcher::::new(None, 0); + batcher.push_container(&mut tuples); + batcher.seal::>(Antichain::from_elem(upper)) + } + + /// Collects `for_each_diff_pair` invocations into a flat, deterministically + /// sorted list for easy assertion. + fn collect_diff_pairs(batch: &B) -> Vec<(String, u64, Option, Option)> + where + B: BatchReader, + B::Time: Copy + Into, + B::ValOwn: 'static + Into, + ::Owned: Into + Clone, + { + let mut out = vec![]; + for_each_diff_pair(batch, |k, t, dp| { + out.push(( + k.clone().into(), + t.into(), + dp.before.map(Into::into), + dp.after.map(Into::into), + )); + }); + out.sort(); + out + } + + #[mz_ore::test] + fn single_insertion() { + let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::ONE)], 6); + let pairs = collect_diff_pairs(&batch); + assert_eq!(pairs, vec![("k1".into(), 5, None, Some("v1".into()))]); + } + + #[mz_ore::test] + fn single_retraction() { + let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, -Diff::ONE)], 6); + let pairs = collect_diff_pairs(&batch); + assert_eq!(pairs, vec![("k1".into(), 5, Some("v1".into()), None)]); + } + + #[mz_ore::test] + fn update_at_same_timestamp() { + // Retract v1 and insert v2 at the same timestamp → paired into a single + // DiffPair with both before and after populated. + let batch = batch_from_tuples( + vec![ + (("k1".into(), "v1".into()), 5, -Diff::ONE), + (("k1".into(), "v2".into()), 5, Diff::ONE), + ], + 6, + ); + let pairs = collect_diff_pairs(&batch); + assert_eq!( + pairs, + vec![("k1".into(), 5, Some("v1".into()), Some("v2".into()))] + ); + } + + #[mz_ore::test] + fn update_across_timestamps() { + // Insert v1 at t=5, then replace with v2 at t=10. + let batch = batch_from_tuples( + vec![ + (("k1".into(), "v1".into()), 5, Diff::ONE), + (("k1".into(), "v1".into()), 10, -Diff::ONE), + (("k1".into(), "v2".into()), 10, Diff::ONE), + ], + 11, + ); + let pairs = collect_diff_pairs(&batch); + assert_eq!( + pairs, + vec![ + ("k1".into(), 5, None, Some("v1".into())), + ("k1".into(), 10, Some("v1".into()), Some("v2".into())), + ] + ); + } + + #[mz_ore::test] + fn diff_greater_than_one_fans_out() { + // Diff=3 becomes three independent `DiffPair`s at the same timestamp. + let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::from(3))], 6); + let pairs = collect_diff_pairs(&batch); + assert_eq!( + pairs, + vec![ + ("k1".into(), 5, None, Some("v1".into())), + ("k1".into(), 5, None, Some("v1".into())), + ("k1".into(), 5, None, Some("v1".into())), + ] + ); + } + + #[mz_ore::test] + fn multiple_keys_are_independent() { + let batch = batch_from_tuples( + vec![ + (("k1".into(), "v1".into()), 5, Diff::ONE), + (("k2".into(), "v2".into()), 5, Diff::ONE), + ], + 6, + ); + let pairs = collect_diff_pairs(&batch); + assert_eq!( + pairs, + vec![ + ("k1".into(), 5, None, Some("v1".into())), + ("k2".into(), 5, None, Some("v2".into())), + ] + ); + } + + #[mz_ore::test] + fn unpaired_before_and_after_at_different_timestamps() { + // Retraction at t=5, insertion at t=10 — they do NOT pair because they + // live at different timestamps. + let batch = batch_from_tuples( + vec![ + (("k1".into(), "v1".into()), 5, -Diff::ONE), + (("k1".into(), "v2".into()), 10, Diff::ONE), + ], + 11, + ); + let pairs = collect_diff_pairs(&batch); + assert_eq!( + pairs, + vec![ + ("k1".into(), 5, Some("v1".into()), None), + ("k1".into(), 10, None, Some("v2".into())), + ] + ); + } +} diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 82c61eac71e3b..e27212700e331 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -12,13 +12,12 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use differential_dataflow::operators::arrange::Arrange; +use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent}; +use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::implementations::ord_neu::{ OrdValBatcher, OrdValSpine, RcOrdValBuilder, }; use differential_dataflow::{AsCollection, Hashable, VecCollection}; -use mz_interchange::avro::DiffPair; -use mz_interchange::envelopes::combine_at_timestamp; use mz_persist_client::operators::shard_source::SnapshotMode; use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp}; use mz_storage_operators::persist_source; @@ -33,6 +32,21 @@ use tracing::warn; use crate::healthcheck::HealthStatusMessage; use crate::storage_state::StorageState; +/// The concrete trace type produced internally when arranging a sink's input. +/// The sink never sees this directly — only the batches flowing through it — +/// but it's the anchor for the batch type in [`SinkBatchStream`]. +pub(crate) type SinkTrace = TraceAgent, Row, Timestamp, Diff>>; + +/// Stream of arrangement batches handed to [`SinkRender::render_sink`]. +/// +/// This is `Arranged::stream` with the trace reader dropped: sinks only need +/// batch-level access (no random-access reads via a cursor), so we don't keep +/// a `TraceAgent` alive. Dropping the reader lets the spine's compaction +/// frontiers advance to the empty antichain, so the arrange operator can +/// aggressively compact / release batch state as updates flow through. +pub(crate) type SinkBatchStream<'scope> = + StreamVec<'scope, Timestamp, ::Batch>; + /// _Renders_ complete _differential_ collections /// that represent the sink and its errors as requested /// by the original `CREATE SINK` statement. @@ -77,14 +91,16 @@ pub(crate) fn render_sink<'scope>( ); tokens.extend(persist_tokens); - let ok_collection = - zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection()); + let batches = arrange_sink_input(&*sink_render, ok_collection.as_collection()); + let key_is_synthetic = sink_render.get_key_indices().is_none() + && sink_render.get_relation_key_indices().is_none(); let (health, sink_tokens) = sink_render.render_sink( storage_state, sink, sink_id, - ok_collection, + batches, + key_is_synthetic, err_collection.as_collection(), ); tokens.extend(sink_tokens); @@ -92,31 +108,28 @@ pub(crate) fn render_sink<'scope>( }) } -/// Zip the input to a sink so that updates to the same key appear as -/// `DiffPair`s. -fn zip_into_diff_pairs<'scope>( - sink_id: GlobalId, - sink: &StorageSinkDesc, +/// Extract the sink's key column(s) from each row, arrange the resulting +/// `(Option, Row)` collection by key, and return just the stream of +/// batches — dropping the trace reader. +/// +/// Prefers the user-specified sink key, falling back to any natural key of the +/// underlying relation. When neither exists, a synthetic per-row hash is used +/// purely to distribute work across workers — in that case the sink should +/// treat the key as absent (`key_is_synthetic`). +/// +/// Partial-moving `arranged.stream` lets the surrounding `Arranged` (and the +/// `TraceAgent` it holds) drop, releasing the spine's compaction holds so the +/// arrange operator can compact batch state as it's emitted. +fn arrange_sink_input<'scope>( sink_render: &dyn SinkRender<'scope>, collection: VecCollection<'scope, Timestamp, Row, Diff>, -) -> VecCollection<'scope, Timestamp, (Option, DiffPair), Diff> { - // We need to consolidate the collection and group records by their key. - // We'll first attempt to use the explicitly declared key when the sink was - // created. If no such key exists, we'll use a key of the sink's underlying - // relation, if one exists. - // - // If no such key exists, we'll generate a synthetic key based on the hash - // of the row, just for purposes of distributing work among workers. In this - // case the key offers no uniqueness guarantee. - - let user_key_indices = sink_render.get_key_indices(); - let relation_key_indices = sink_render.get_relation_key_indices(); - let key_indices = user_key_indices - .or(relation_key_indices) +) -> SinkBatchStream<'scope> { + let key_indices = sink_render + .get_key_indices() + .or_else(|| sink_render.get_relation_key_indices()) .map(|k| k.to_vec()); - let key_is_synthetic = key_indices.is_none(); - let collection = match key_indices { + let keyed = match key_indices { None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)), Some(key_indices) => { let mut datum_vec = mz_repr::DatumVec::new(); @@ -132,55 +145,78 @@ fn zip_into_diff_pairs<'scope>( } }; - // Group messages by key at each timestamp. - // // Allow access to `arrange_named` because we cannot access Mz's wrapper // from here. TODO(database-issues#5046): Revisit with cluster unification. #[allow(clippy::disallowed_methods)] - let mut collection = - combine_at_timestamp(collection.arrange_named::, RcOrdValBuilder<_,_,_,_>, OrdValSpine<_, _, _, _>>("Arrange Sink")); - - // If there is no user-specified key, remove the synthetic key. - // - // We don't want the synthetic key to appear in the sink's actual output; we - // just needed a value to use to distribute work. - if user_key_indices.is_none() { - collection = collection.map(|(_key, value)| (None, value)) + let Arranged {stream, trace: _} = keyed.arrange_named::, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink"); + stream +} + +/// Rate-limited detector for primary-key uniqueness violations as a sink's +/// cursor walk observes `(key, timestamp)` groups. +/// +/// Call [`PkViolationWarner::observe`] once per emitted `DiffPair`. When the +/// current `(key, timestamp)` group changes — or when input batches finish — +/// call [`PkViolationWarner::flush`] so the accumulated count is evaluated. +/// +/// Keys are identified by their `Hashable::hashed()` value rather than held +/// by value, so the hot observe path does no `Row` clones. A hash collision +/// can mask a PK violation but this is a purely diagnostic check, so the +/// trade-off is acceptable. +pub(crate) struct PkViolationWarner { + sink_id: GlobalId, + from_id: GlobalId, + last_warning: Instant, + current: Option<(u64, Timestamp)>, + count: usize, +} + +impl PkViolationWarner { + pub fn new(sink_id: GlobalId, from_id: GlobalId) -> Self { + Self { + sink_id, + from_id, + last_warning: Instant::now(), + current: None, + count: 0, + } } - collection.flat_map({ - let mut last_warning = Instant::now(); - let from_id = sink.from; - move |(mut k, vs)| { - // If the key is not synthetic, emit a warning to internal logs if - // we discover a primary key violation. - // - // TODO: put the sink in a user-visible errored state instead of - // only logging internally. See: - // https://github.com/MaterializeInc/database-issues/issues/5099. - if !key_is_synthetic && vs.len() > 1 { - // We rate limit how often we emit this warning to avoid - // flooding logs. - let now = Instant::now(); - if now.duration_since(last_warning) >= Duration::from_secs(10) { - last_warning = now; - warn!( - ?sink_id, - ?from_id, - "primary key error: expected at most one update per key and timestamp; \ - this can happen when the configured sink key is not a primary key of \ - the sinked relation" - ) - } - } + /// Record that a `DiffPair` was observed at `(key, time)`. If this starts + /// a new group, the previous group's count is flushed (and warned about + /// if the count was > 1). + pub fn observe(&mut self, key: &Option, time: Timestamp) { + // `None` keys hash to a distinct sentinel from any `Row::hashed()`; + // the exact constant doesn't matter for correctness (it just needs + // to be stable). + let hash = key.as_ref().map(|k| k.hashed()).unwrap_or(u64::MAX); + let same = self.current == Some((hash, time)); + if !same { + self.flush(); + self.current = Some((hash, time)); + } + self.count += 1; + } - let max_idx = vs.len() - 1; - vs.into_iter().enumerate().map(move |(idx, dp)| { - let k = if idx == max_idx { k.take() } else { k.clone() }; - (k, dp) - }) + /// Flush the pending `(key, timestamp)` group count. Emits a + /// rate-limited warning if more than one `DiffPair` was observed. + pub fn flush(&mut self) { + if self.count > 1 { + let now = Instant::now(); + if now.duration_since(self.last_warning) >= Duration::from_secs(10) { + self.last_warning = now; + warn!( + sink_id = ?self.sink_id, + from_id = ?self.from_id, + "primary key error: expected at most one update per key and timestamp; \ + this can happen when the configured sink key is not a primary key of \ + the sinked relation" + ); + } } - }) + self.current = None; + self.count = 0; + } } /// A type that can be rendered as a dataflow sink. @@ -194,12 +230,21 @@ pub(crate) trait SinkRender<'scope> { fn get_relation_key_indices(&self) -> Option<&[usize]>; /// Renders the sink's dataflow. + /// + /// The sink receives a stream of arrangement batches keyed on `Option`. + /// The sink is responsible for walking each batch (typically via + /// [`mz_interchange::envelopes::for_each_diff_pair`]) and handling any + /// envelope-specific diff-pair construction. When `key_is_synthetic` is + /// true the arrangement's key is a per-row hash used only for worker + /// distribution — the sink should treat the key as absent when producing + /// output. fn render_sink( &self, storage_state: &mut StorageState, sink: &StorageSinkDesc, sink_id: GlobalId, - sinked_collection: VecCollection<'scope, Timestamp, (Option, DiffPair), Diff>, + batches: SinkBatchStream<'scope>, + key_is_synthetic: bool, err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>, ) -> ( StreamVec<'scope, Timestamp, HealthStatusMessage>, diff --git a/src/storage/src/sink/iceberg.rs b/src/storage/src/sink/iceberg.rs index 6e0c815bda37a..7e678d6cb7cae 100644 --- a/src/storage/src/sink/iceberg.rs +++ b/src/storage/src/sink/iceberg.rs @@ -10,14 +10,25 @@ //! Iceberg sink implementation. //! //! This code renders a [`IcebergSinkConnection`] into a dataflow that writes -//! data to an Iceberg table. The dataflow consists of three operators: +//! data to an Iceberg table. `SinkRender::render_sink` hands the sink a stream +//! of arrangement batches keyed on the sink key (the upstream arrangement's +//! trace reader is already dropped, so the spine is free to compact as +//! batches flow). A small `walk_sink_arrangement` operator consumes that +//! stream and emits one `DiffPair` per `(key, timestamp)` update into the +//! pipeline below. //! //! ```text //! ┏━━━━━━━━━━━━━━┓ //! ┃ persist ┃ //! ┃ source ┃ //! ┗━━━━━━┯━━━━━━━┛ -//! │ row data, the input to this module +//! │ stream of arrangement batches (trace reader dropped) +//! │ +//! ┏━━━━━━v━━━━━━━┓ +//! ┃ walk ┃ +//! ┃ arrangement ┃ yields individual DiffPairs per (key, timestamp) +//! ┗━━━━━━┯━━━━━━━┛ +//! │ (Option, DiffPair) rows //! │ //! ┏━━━━━━v━━━━━━━┓ //! ┃ mint ┃ (single worker) @@ -111,6 +122,7 @@ use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder}; use mz_interchange::avro::DiffPair; +use mz_interchange::envelopes::for_each_diff_pair; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; @@ -142,7 +154,7 @@ use tracing::debug; use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; use crate::metrics::sink::iceberg::IcebergSinkMetrics; -use crate::render::sinks::SinkRender; +use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender}; use crate::statistics::SinkStatistics; use crate::storage_state::StorageState; @@ -2240,13 +2252,22 @@ impl<'scope> SinkRender<'scope> for IcebergSinkConnection { storage_state: &mut StorageState, sink: &StorageSinkDesc, sink_id: GlobalId, - input: VecCollection<'scope, Timestamp, (Option, DiffPair), Diff>, + batches: SinkBatchStream<'scope>, + key_is_synthetic: bool, _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>, ) -> ( StreamVec<'scope, Timestamp, HealthStatusMessage>, Vec, ) { - let scope = input.scope(); + let scope = batches.scope(); + + let (input, walker_button) = walk_sink_arrangement( + format!("{sink_id}-iceberg-walker"), + batches, + sink_id, + sink.from, + key_is_synthetic, + ); let write_handle = { let persist = Arc::clone(&storage_state.persist_clients); @@ -2386,6 +2407,58 @@ impl<'scope> SinkRender<'scope> for IcebergSinkConnection { let statuses = scope.concatenate([running_status, mint_status, write_status, commit_status]); - (statuses, vec![mint_button, write_button, commit_button]) + ( + statuses, + vec![walker_button, mint_button, write_button, commit_button], + ) } } + +/// Walks each arrangement batch and emits a stream of individual +/// `(key, DiffPair)` records that feeds the rest of the Iceberg sink pipeline. +/// +/// Tracks per-`(key, timestamp)` group sizes and rate-limits a warning when a +/// non-synthetic key has more than one `DiffPair`. When `key_is_synthetic` the +/// arrangement's hash-based key is stripped before emission. +fn walk_sink_arrangement<'scope>( + name: String, + batches: SinkBatchStream<'scope>, + sink_id: GlobalId, + from_id: GlobalId, + key_is_synthetic: bool, +) -> ( + VecCollection<'scope, Timestamp, (Option, DiffPair), Diff>, + PressOnDropButton, +) { + let mut builder = OperatorBuilder::new(name, batches.scope()); + let (output, stream) = builder.new_output::>>(); + let mut input = builder.new_input_for(batches, Pipeline, &output); + + let button = builder.build(move |_caps| async move { + let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id)); + + while let Some(event) = input.next().await { + if let Event::Data(cap, mut batches) = event { + for batch in batches.drain(..) { + for_each_diff_pair(&batch, |key, time, diff_pair| { + if let Some(warner) = pk_warner.as_mut() { + warner.observe(key, time); + } + // The arrangement key is only used downstream for grouping and PK checks; + // `write_data_files` discards it on both the stash and drain paths. Emit + // None unconditionally to avoid per-`DiffPair` `Row` clones on this hot path. + output.give(&cap, ((None, diff_pair), time, Diff::ONE)); + }); + // Flush after each batch so the final `(key, time)` group of the walk is + // resolved immediately — a PK violation in the last group is otherwise held + // until more data arrives or the operator shuts down. + if let Some(warner) = pk_warner.as_mut() { + warner.flush(); + } + } + } + } + }); + + (stream.as_collection(), button.press_on_drop()) +} diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index 54077a1ea2379..27058e3adcba7 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -15,11 +15,11 @@ //! ┃ persist ┃ //! ┃ source ┃ //! ┗━━━━━━┯━━━━━━━┛ -//! │ row data, the input to this module +//! │ stream of arrangement batches (trace reader dropped) //! │ //! ┏━━━━━━v━━━━━━┓ -//! ┃ row ┃ -//! ┃ encoder ┃ +//! ┃ row ┃ walks each batch's cursor and emits one +//! ┃ encoder ┃ encoded `KafkaMessage` per DiffPair //! ┗━━━━━━┯━━━━━━┛ //! │ encoded data //! │ @@ -36,10 +36,11 @@ //! //! # Encoding //! -//! One part of the dataflow deals with encoding the rows that we read from persist. There isn't -//! anything surprizing here, it is *almost* just a `Collection::map` with the exception of an -//! initialization step that makes sure the schemas are published to the Schema Registry. After -//! that step the operator just encodes each batch it receives record by record. +//! One part of the dataflow deals with encoding the rows that we read from persist. The encoder +//! walks the input arrangement's batches via +//! [`mz_interchange::envelopes::for_each_diff_pair`], producing one encoded `KafkaMessage` per +//! `DiffPair` observed at each `(key, timestamp)`. An initialization step first ensures that the +//! schemas are published to the Schema Registry. //! //! # Sinking //! @@ -81,7 +82,7 @@ use std::time::Duration; use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; use crate::metrics::sink::kafka::KafkaSinkMetrics; -use crate::render::sinks::SinkRender; +use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender}; use crate::statistics::SinkStatistics; use crate::storage_state::StorageState; use anyhow::{Context, anyhow, bail}; @@ -89,9 +90,9 @@ use differential_dataflow::{AsCollection, Hashable, VecCollection}; use futures::StreamExt; use maplit::btreemap; use mz_expr::MirScalarExpr; -use mz_interchange::avro::{AvroEncoder, DiffPair}; +use mz_interchange::avro::AvroEncoder; use mz_interchange::encode::Encode; -use mz_interchange::envelopes::dbz_format; +use mz_interchange::envelopes::{dbz_format, for_each_diff_pair}; use mz_interchange::json::JsonEncoder; use mz_interchange::text_binary::{BinaryEncoder, TextEncoder}; use mz_kafka_util::admin::EnsureTopicConfig; @@ -159,7 +160,8 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection { storage_state: &mut StorageState, sink: &StorageSinkDesc, sink_id: GlobalId, - input: VecCollection<'scope, Timestamp, (Option, DiffPair), Diff>, + batches: SinkBatchStream<'scope>, + key_is_synthetic: bool, // TODO(benesch): errors should stream out through the sink, // if we figure out a protocol for that. _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>, @@ -167,7 +169,7 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection { StreamVec<'scope, Timestamp, HealthStatusMessage>, Vec, ) { - let scope = input.scope(); + let scope = batches.scope(); let write_handle = { let persist = Arc::clone(&storage_state.persist_clients); @@ -193,10 +195,13 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection { let (encoded, encode_status, encode_token) = encode_collection( format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()), - input, + batches, sink.envelope, self.clone(), storage_state.storage_configuration.clone(), + sink_id, + sink.from, + key_is_synthetic, ); let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id); @@ -1359,24 +1364,29 @@ async fn fetch_partition_count_loop( } } -/// Encodes a stream of `(Option, Option)` updates using the specified encoder. +/// Walks each arrangement batch and emits encoded Kafka messages, one per +/// `DiffPair` observed at each `(key, timestamp)`. /// -/// Input [`Row`] updates must me compatible with the given implementor of [`Encode`]. -fn encode_collection<'scope, T: timely::progress::Timestamp>( +/// When `key_is_synthetic`, the batch keys are per-row hashes used only for +/// worker distribution; the emitted `KafkaMessage` uses no key in that case. +fn encode_collection<'scope>( name: String, - input: VecCollection<'scope, T, (Option, DiffPair), Diff>, + batches: SinkBatchStream<'scope>, envelope: SinkEnvelope, connection: KafkaSinkConnection, storage_configuration: StorageConfiguration, + sink_id: GlobalId, + from_id: GlobalId, + key_is_synthetic: bool, ) -> ( - VecCollection<'scope, T, KafkaMessage, Diff>, - StreamVec<'scope, T, HealthStatusMessage>, + VecCollection<'scope, Timestamp, KafkaMessage, Diff>, + StreamVec<'scope, Timestamp, HealthStatusMessage>, PressOnDropButton, ) { - let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope()); + let mut builder = AsyncOperatorBuilder::new(name, batches.scope()); let (output, stream) = builder.new_output::>>(); - let mut input = builder.new_input_for(input.inner, Pipeline, &output); + let mut input = builder.new_input_for(batches, Pipeline, &output); let (button, errors) = builder.build_fallible(move |caps| { Box::pin(async move { @@ -1476,70 +1486,94 @@ fn encode_collection<'scope, T: timely::progress::Timestamp>( let mut row_buf = Row::default(); let mut datums = DatumVec::new(); + let mut pk_warner = + (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id)); while let Some(event) = input.next().await { - if let Event::Data(cap, rows) = event { - for ((key, value), time, diff) in rows { - let mut hash = None; - let mut headers = vec![]; - if connection.headers_index.is_some() || connection.partition_by.is_some() { - // Header values and partition by values are derived from the row that - // produces an event. But it is ambiguous whether to use the `before` or - // `after` from the event. The rule applied here is simple: use `after` - // if it exists (insertions and updates), otherwise fall back to `before` - // (deletions). - // - // It is up to the SQL planner to ensure this produces sensible results. - // (When using the upsert envelope and both `before` and `after` are - // present, it's always unambiguous to use `after` because that's all - // that will be present in the Kafka message; when using the Debezium - // envelope, it's okay to refer to columns in the key because those - // are guaranteed to be the same in both `before` and `after`.) - let row = value - .after - .as_ref() - .or(value.before.as_ref()) - .expect("one of before or after must be set"); - let row = datums.borrow_with(row); - - if let Some(i) = connection.headers_index { - headers = encode_headers(row[i]); + if let Event::Data(cap, mut batches) = event { + for batch in batches.drain(..) { + for_each_diff_pair(&batch, |key, time, value| { + if let Some(warner) = pk_warner.as_mut() { + warner.observe(key, time); } + // Only emit the arrangement key when the user configured one; relation-key + // and synthetic-hash arrangements exist purely for grouping / worker + // distribution and have no corresponding key encoder. + let key_for_message = if key_encoder.is_some() { key } else { &None }; + + let mut hash = None; + let mut headers = vec![]; + if connection.headers_index.is_some() + || connection.partition_by.is_some() + { + // Header values and partition by values are derived from the row + // that produces an event. But it is ambiguous whether to use the + // `before` or `after` from the event. The rule applied here is + // simple: use `after` if it exists (insertions and updates), + // otherwise fall back to `before` (deletions). + // + // It is up to the SQL planner to ensure this produces sensible + // results. (When using the upsert envelope and both `before` and + // `after` are present, it's always unambiguous to use `after` + // because that's all that will be present in the Kafka message; + // when using the Debezium envelope, it's okay to refer to columns + // in the key because those are guaranteed to be the same in both + // `before` and `after`.) + let row = value + .after + .as_ref() + .or(value.before.as_ref()) + .expect("one of before or after must be set"); + let row = datums.borrow_with(row); + + if let Some(i) = connection.headers_index { + headers = encode_headers(row[i]); + } - if let Some(partition_by) = &connection.partition_by { - hash = Some(evaluate_partition_by(partition_by, &row)); + if let Some(partition_by) = &connection.partition_by { + hash = Some(evaluate_partition_by(partition_by, &row)); + } } + let (encoded_key, hash) = match key_for_message { + Some(key) => { + let key_encoder = + key_encoder.as_ref().expect("key present"); + let encoded = key_encoder.encode_unchecked(key.clone()); + let hash = + hash.unwrap_or_else(|| key_encoder.hash(&encoded)); + (Some(encoded), hash) + } + None => (None, hash.unwrap_or(0)), + }; + let value = match envelope { + SinkEnvelope::Upsert => value.after, + SinkEnvelope::Debezium => { + dbz_format(&mut row_buf.packer(), value); + Some(row_buf.clone()) + } + SinkEnvelope::Append => { + unreachable!("Append envelope is not valid for Kafka sinks") + } + }; + let value = value.map(|value| value_encoder.encode_unchecked(value)); + let message = KafkaMessage { + hash, + key: encoded_key, + value, + headers, + }; + output.give(&cap, (message, time, Diff::ONE)); + }); + // Flush after each batch so the final `(key, time)` group of the walk is + // resolved immediately — a PK violation in the last group is otherwise + // held until more data arrives or the operator shuts down. + if let Some(warner) = pk_warner.as_mut() { + warner.flush(); } - let (key, hash) = match key { - Some(key) => { - let key_encoder = key_encoder.as_ref().expect("key present"); - let key = key_encoder.encode_unchecked(key); - let hash = hash.unwrap_or_else(|| key_encoder.hash(&key)); - (Some(key), hash) - } - None => (None, hash.unwrap_or(0)) - }; - let value = match envelope { - SinkEnvelope::Upsert => value.after, - SinkEnvelope::Debezium => { - dbz_format(&mut row_buf.packer(), value); - Some(row_buf.clone()) - } - SinkEnvelope::Append => { - unreachable!("Append envelope is not valid for Kafka sinks") - } - }; - let value = value.map(|value| value_encoder.encode_unchecked(value)); - let message = KafkaMessage { - hash, - key, - value, - headers, - }; - output.give(&cap, (message, time, diff)); } } } + Ok::<(), anyhow::Error>(()) }) });