Skip to content

Commit bda54df

Browse files
committed
Respond to review feedback!
1 parent 46f9ca4 commit bda54df

2 files changed

Lines changed: 10 additions & 16 deletions

File tree

src/interchange/src/envelopes.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::sync::LazyLock;
1313

1414
use differential_dataflow::trace::implementations::BatchContainer;
1515
use differential_dataflow::trace::{BatchReader, Cursor};
16-
use itertools::{Either, EitherOrBoth};
16+
use itertools::EitherOrBoth;
1717
use maplit::btreemap;
1818
use mz_ore::cast::CastFrom;
1919
use mz_repr::{
@@ -68,18 +68,12 @@ where
6868
befores.sort_by_key(|(t, _v, _diff)| *t);
6969
afters.sort_by_key(|(t, _v, _diff)| *t);
7070

71-
// Fan `(time, val, count)` out to `count` copies of `(time, val)`.
72-
// `iter::repeat(x).take(n)` clones on every `next()` — including the
73-
// first — so using it with `count == 1` (the snapshot common case)
74-
// does a gratuitous clone. `iter::once` moves the value; we only
75-
// fall back to `iter::repeat` when fan-out is actually needed.
76-
let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| {
77-
if cnt == 1 {
78-
Either::Left(iter::once((t, v)))
79-
} else {
80-
Either::Right(iter::repeat((t, v)).take(cnt))
81-
}
82-
};
71+
// The use of `repeat_n()` here is a bit subtle, but load bearing.
72+
// In the common case, cnt = 1, and we want to avoid cloning the value if possible. In the naive
73+
// implementation, we might use `iter::repeat((t, v)).take(cnt)`, but that would clone `v` `cnt` times even
74+
// when `cnt = 1`. By contrast, `repeat_n((t, v), cnt)` will return the original `(t, v)` when `cnt = 1`,
75+
// and only clone when `cnt > 1`.
76+
let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt);
8377
let befores_iter = befores.drain(..).flat_map(fan_out);
8478
let afters_iter = afters.drain(..).flat_map(fan_out);
8579

src/storage/src/render/sinks.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use std::sync::Arc;
1313
use std::time::{Duration, Instant};
1414

15-
use differential_dataflow::operators::arrange::{Arrange, TraceAgent};
15+
use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
1616
use differential_dataflow::trace::TraceReader;
1717
use differential_dataflow::trace::implementations::ord_neu::{
1818
OrdValBatcher, OrdValSpine, RcOrdValBuilder,
@@ -148,8 +148,8 @@ fn arrange_sink_input<'scope>(
148148
// Allow access to `arrange_named` because we cannot access Mz's wrapper
149149
// from here. TODO(database-issues#5046): Revisit with cluster unification.
150150
#[allow(clippy::disallowed_methods)]
151-
let arranged = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink");
152-
arranged.stream
151+
let Arranged {stream, trace: _} = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink");
152+
stream
153153
}
154154

155155
/// Rate-limited detector for primary-key uniqueness violations as a sink's

0 commit comments

Comments
 (0)