Skip to content

Commit 8a4ed1e

Browse files
committed
sinks: drop the arrangement's trace reader, pass only batches
SinkRender::render_sink now takes a Stream<Vec<SinkBatch>> instead of an Arranged<SinkTrace>. Sinks only ever walk incoming batches (via for_each_diff_pair) — they never use the TraceAgent for random access — so the reader handle is dead weight. arrange_sink_input still calls arrange_named, but immediately extracts arranged.stream and lets the surrounding Arranged (and its TraceAgent) drop. With no reader holding compaction frontiers, the arrange operator's spine can compact to the empty antichain as batches flow, releasing historical batch state instead of accumulating it. Mirrors the pattern used by DD's consolidate_named, which builds an arrangement only to call as_collection on it and drop the trace.
1 parent ade492d commit 8a4ed1e

3 files changed

Lines changed: 58 additions & 41 deletions

File tree

src/storage/src/render/sinks.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
use std::sync::Arc;
1313
use std::time::{Duration, Instant};
1414

15-
use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
15+
use differential_dataflow::operators::arrange::{Arrange, TraceAgent};
16+
use differential_dataflow::trace::TraceReader;
1617
use differential_dataflow::trace::implementations::ord_neu::{
1718
OrdValBatcher, OrdValSpine, RcOrdValBuilder,
1819
};
@@ -31,10 +32,21 @@ use tracing::warn;
3132
use crate::healthcheck::HealthStatusMessage;
3233
use crate::storage_state::StorageState;
3334

34-
/// The concrete trace type used to hand arranged sink input to
35-
/// [`SinkRender::render_sink`].
35+
/// The concrete trace type produced internally when arranging a sink's input.
36+
/// The sink never sees this directly — only the batches flowing through it —
37+
/// but it's the anchor for the batch type in [`SinkBatchStream`].
3638
pub(crate) type SinkTrace = TraceAgent<OrdValSpine<Option<Row>, Row, Timestamp, Diff>>;
3739

40+
/// Stream of arrangement batches handed to [`SinkRender::render_sink`].
41+
///
42+
/// This is `Arranged::stream` with the trace reader dropped: sinks only need
43+
/// batch-level access (no random-access reads via a cursor), so we don't keep
44+
/// a `TraceAgent` alive. Dropping the reader lets the spine's compaction
45+
/// frontiers advance to the empty antichain, so the arrange operator can
46+
/// aggressively compact / release batch state as updates flow through.
47+
pub(crate) type SinkBatchStream<'scope> =
48+
StreamVec<'scope, Timestamp, <SinkTrace as TraceReader>::Batch>;
49+
3850
/// _Renders_ complete _differential_ collections
3951
/// that represent the sink and its errors as requested
4052
/// by the original `CREATE SINK` statement.
@@ -79,15 +91,15 @@ pub(crate) fn render_sink<'scope>(
7991
);
8092
tokens.extend(persist_tokens);
8193

82-
let arranged = arrange_sink_input(&*sink_render, ok_collection.as_collection());
94+
let batches = arrange_sink_input(&*sink_render, ok_collection.as_collection());
8395
let key_is_synthetic = sink_render.get_key_indices().is_none()
8496
&& sink_render.get_relation_key_indices().is_none();
8597

8698
let (health, sink_tokens) = sink_render.render_sink(
8799
storage_state,
88100
sink,
89101
sink_id,
90-
arranged,
102+
batches,
91103
key_is_synthetic,
92104
err_collection.as_collection(),
93105
);
@@ -96,17 +108,22 @@ pub(crate) fn render_sink<'scope>(
96108
})
97109
}
98110

99-
/// Extract the sink's key column(s) from each row and arrange the resulting
100-
/// `(Option<Row>, Row)` collection by key.
111+
/// Extract the sink's key column(s) from each row, arrange the resulting
112+
/// `(Option<Row>, Row)` collection by key, and return just the stream of
113+
/// batches — dropping the trace reader.
101114
///
102115
/// Prefers the user-specified sink key, falling back to any natural key of the
103116
/// underlying relation. When neither exists, a synthetic per-row hash is used
104117
/// purely to distribute work across workers — in that case the sink should
105118
/// treat the key as absent (`key_is_synthetic`).
119+
///
120+
/// Partial-moving `arranged.stream` lets the surrounding `Arranged` (and the
121+
/// `TraceAgent` it holds) drop, releasing the spine's compaction holds so the
122+
/// arrange operator can compact batch state as it's emitted.
106123
fn arrange_sink_input<'scope>(
107124
sink_render: &dyn SinkRender<'scope>,
108125
collection: VecCollection<'scope, Timestamp, Row, Diff>,
109-
) -> Arranged<'scope, SinkTrace> {
126+
) -> SinkBatchStream<'scope> {
110127
let key_indices = sink_render
111128
.get_key_indices()
112129
.or_else(|| sink_render.get_relation_key_indices())
@@ -131,7 +148,8 @@ fn arrange_sink_input<'scope>(
131148
// Allow access to `arrange_named` because we cannot access Mz's wrapper
132149
// from here. TODO(database-issues#5046): Revisit with cluster unification.
133150
#[allow(clippy::disallowed_methods)]
134-
keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink")
151+
let arranged = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink");
152+
arranged.stream
135153
}
136154

137155
/// Rate-limited detector for primary-key uniqueness violations as a sink's
@@ -213,8 +231,8 @@ pub(crate) trait SinkRender<'scope> {
213231

214232
/// Renders the sink's dataflow.
215233
///
216-
/// The sink receives the input as an arrangement keyed on `Option<Row>`.
217-
/// The sink is responsible for walking the arrangement (typically via
234+
/// The sink receives a stream of arrangement batches keyed on `Option<Row>`.
235+
/// The sink is responsible for walking each batch (typically via
218236
/// [`mz_interchange::envelopes::for_each_diff_pair`]) and handling any
219237
/// envelope-specific diff-pair construction. When `key_is_synthetic` is
220238
/// true the arrangement's key is a per-row hash used only for worker
@@ -225,7 +243,7 @@ pub(crate) trait SinkRender<'scope> {
225243
storage_state: &mut StorageState,
226244
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
227245
sink_id: GlobalId,
228-
arranged: Arranged<'scope, SinkTrace>,
246+
batches: SinkBatchStream<'scope>,
229247
key_is_synthetic: bool,
230248
err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
231249
) -> (

src/storage/src/sink/iceberg.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@
1010
//! Iceberg sink implementation.
1111
//!
1212
//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13-
//! data to an Iceberg table. `SinkRender::render_sink` hands the sink an
14-
//! arranged collection keyed on the sink key; a small `walk_sink_arrangement`
15-
//! operator at the input walks the arrangement and emits one `DiffPair` per
16-
//! `(key, timestamp)` update into the pipeline below.
13+
//! data to an Iceberg table. `SinkRender::render_sink` hands the sink a stream
14+
//! of arrangement batches keyed on the sink key (the upstream arrangement's
15+
//! trace reader is already dropped, so the spine is free to compact as
16+
//! batches flow). A small `walk_sink_arrangement` operator consumes that
17+
//! stream and emits one `DiffPair` per `(key, timestamp)` update into the
18+
//! pipeline below.
1719
//!
1820
//! ```text
1921
//! ┏━━━━━━━━━━━━━━┓
2022
//! ┃ persist ┃
2123
//! ┃ source ┃
2224
//! ┗━━━━━━┯━━━━━━━┛
23-
//! │ arranged (Option<Row>, Row) keyed by sink key
25+
//! │ stream of arrangement batches (trace reader dropped)
2426
//! │
2527
//! ┏━━━━━━v━━━━━━━┓
2628
//! ┃ walk ┃
@@ -91,7 +93,6 @@ use anyhow::{Context, anyhow};
9193
use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
9294
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
9395
use differential_dataflow::lattice::Lattice;
94-
use differential_dataflow::operators::arrange::Arranged;
9596
use differential_dataflow::{AsCollection, Hashable, VecCollection};
9697
use futures::StreamExt;
9798
use iceberg::ErrorKind;
@@ -153,7 +154,7 @@ use tracing::debug;
153154

154155
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
155156
use crate::metrics::sink::iceberg::IcebergSinkMetrics;
156-
use crate::render::sinks::{PkViolationWarner, SinkRender, SinkTrace};
157+
use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
157158
use crate::statistics::SinkStatistics;
158159
use crate::storage_state::StorageState;
159160

@@ -2251,18 +2252,18 @@ impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
22512252
storage_state: &mut StorageState,
22522253
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
22532254
sink_id: GlobalId,
2254-
arranged: Arranged<'scope, SinkTrace>,
2255+
batches: SinkBatchStream<'scope>,
22552256
key_is_synthetic: bool,
22562257
_err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
22572258
) -> (
22582259
StreamVec<'scope, Timestamp, HealthStatusMessage>,
22592260
Vec<PressOnDropButton>,
22602261
) {
2261-
let scope = arranged.stream.scope();
2262+
let scope = batches.scope();
22622263

22632264
let (input, walker_button) = walk_sink_arrangement(
22642265
format!("{sink_id}-iceberg-walker"),
2265-
arranged,
2266+
batches,
22662267
sink_id,
22672268
sink.from,
22682269
key_is_synthetic,
@@ -2413,25 +2414,25 @@ impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
24132414
}
24142415
}
24152416

2416-
/// Walks `arranged` and emits a stream of individual `(key, DiffPair)` records
2417-
/// that feeds the rest of the Iceberg sink pipeline.
2417+
/// Walks each arrangement batch and emits a stream of individual
2418+
/// `(key, DiffPair)` records that feeds the rest of the Iceberg sink pipeline.
24182419
///
24192420
/// Tracks per-`(key, timestamp)` group sizes and rate-limits a warning when a
24202421
/// non-synthetic key has more than one `DiffPair`. When `key_is_synthetic` the
24212422
/// arrangement's hash-based key is stripped before emission.
24222423
fn walk_sink_arrangement<'scope>(
24232424
name: String,
2424-
arranged: Arranged<'scope, SinkTrace>,
2425+
batches: SinkBatchStream<'scope>,
24252426
sink_id: GlobalId,
24262427
from_id: GlobalId,
24272428
key_is_synthetic: bool,
24282429
) -> (
24292430
VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
24302431
PressOnDropButton,
24312432
) {
2432-
let mut builder = OperatorBuilder::new(name, arranged.stream.scope());
2433+
let mut builder = OperatorBuilder::new(name, batches.scope());
24332434
let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
2434-
let mut input = builder.new_input_for(arranged.stream, Pipeline, &output);
2435+
let mut input = builder.new_input_for(batches, Pipeline, &output);
24352436

24362437
let button = builder.build(move |_caps| async move {
24372438
let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));

src/storage/src/sink/kafka.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
//! ┃ persist ┃
1616
//! ┃ source ┃
1717
//! ┗━━━━━━┯━━━━━━━┛
18-
//! │ arranged (Option<Row>, Row) keyed by sink key
18+
//! │ stream of arrangement batches (trace reader dropped)
1919
//! │
2020
//! ┏━━━━━━v━━━━━━┓
21-
//! ┃ row ┃ walks the arrangement and emits one
21+
//! ┃ row ┃ walks each batch's cursor and emits one
2222
//! ┃ encoder ┃ encoded `KafkaMessage` per DiffPair
2323
//! ┗━━━━━━┯━━━━━━┛
2424
//! │ encoded data
@@ -82,11 +82,10 @@ use std::time::Duration;
8282

8383
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
8484
use crate::metrics::sink::kafka::KafkaSinkMetrics;
85-
use crate::render::sinks::{PkViolationWarner, SinkRender, SinkTrace};
85+
use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
8686
use crate::statistics::SinkStatistics;
8787
use crate::storage_state::StorageState;
8888
use anyhow::{Context, anyhow, bail};
89-
use differential_dataflow::operators::arrange::Arranged;
9089
use differential_dataflow::{AsCollection, Hashable, VecCollection};
9190
use futures::StreamExt;
9291
use maplit::btreemap;
@@ -161,7 +160,7 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
161160
storage_state: &mut StorageState,
162161
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
163162
sink_id: GlobalId,
164-
arranged: Arranged<'scope, SinkTrace>,
163+
batches: SinkBatchStream<'scope>,
165164
key_is_synthetic: bool,
166165
// TODO(benesch): errors should stream out through the sink,
167166
// if we figure out a protocol for that.
@@ -170,7 +169,7 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
170169
StreamVec<'scope, Timestamp, HealthStatusMessage>,
171170
Vec<PressOnDropButton>,
172171
) {
173-
let scope = arranged.stream.scope();
172+
let scope = batches.scope();
174173

175174
let write_handle = {
176175
let persist = Arc::clone(&storage_state.persist_clients);
@@ -196,7 +195,7 @@ impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
196195

197196
let (encoded, encode_status, encode_token) = encode_collection(
198197
format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
199-
arranged,
198+
batches,
200199
sink.envelope,
201200
self.clone(),
202201
storage_state.storage_configuration.clone(),
@@ -1365,15 +1364,14 @@ async fn fetch_partition_count_loop<F>(
13651364
}
13661365
}
13671366

1368-
/// Walks the sink's arrangement and emits encoded Kafka messages, one per
1367+
/// Walks each arrangement batch and emits encoded Kafka messages, one per
13691368
/// `DiffPair` observed at each `(key, timestamp)`.
13701369
///
1371-
/// When `key_is_synthetic`, the arrangement's key is a per-row hash used only
1372-
/// for worker distribution; the emitted `KafkaMessage` uses no key in that
1373-
/// case.
1370+
/// When `key_is_synthetic`, the batch keys are per-row hashes used only for
1371+
/// worker distribution; the emitted `KafkaMessage` uses no key in that case.
13741372
fn encode_collection<'scope>(
13751373
name: String,
1376-
arranged: Arranged<'scope, SinkTrace>,
1374+
batches: SinkBatchStream<'scope>,
13771375
envelope: SinkEnvelope,
13781376
connection: KafkaSinkConnection,
13791377
storage_configuration: StorageConfiguration,
@@ -1385,10 +1383,10 @@ fn encode_collection<'scope>(
13851383
StreamVec<'scope, Timestamp, HealthStatusMessage>,
13861384
PressOnDropButton,
13871385
) {
1388-
let mut builder = AsyncOperatorBuilder::new(name, arranged.stream.scope());
1386+
let mut builder = AsyncOperatorBuilder::new(name, batches.scope());
13891387

13901388
let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1391-
let mut input = builder.new_input_for(arranged.stream, Pipeline, &output);
1389+
let mut input = builder.new_input_for(batches, Pipeline, &output);
13921390

13931391
let (button, errors) = builder.build_fallible(move |caps| {
13941392
Box::pin(async move {

0 commit comments

Comments
 (0)