Skip to content

Commit 84b2975

Browse files
authored
storage: make reclock to latest the only reclock variant (#35046)
The old strategy of reclocking to whatever upper we have consumed is no longer used. This PR deletes the relevant code paths and simplifies the `SourceRender` trait.
1 parent 1b6e3df commit 84b2975

12 files changed

Lines changed: 124 additions & 292 deletions

File tree

src/storage-types/src/dyncfgs.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,6 @@ pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
290290
"Delay interval when reconnecting to a source / sink after halt.",
291291
);
292292

293-
/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
294-
/// frontier.
295-
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
296-
"storage_reclock_to_latest",
297-
true,
298-
"Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset.",
299-
);
300-
301293
/// Whether to use the new continual feedback upsert operator.
302294
pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
303295
"storage_use_continual_feedback_upsert",
@@ -373,7 +365,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
373365
.add(&SINK_PROGRESS_SEARCH)
374366
.add(&SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY)
375367
.add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
376-
.add(&STORAGE_RECLOCK_TO_LATEST)
377368
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
378369
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
379370
.add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)

src/storage/src/source/generator.rs

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,28 @@ use std::ops::Rem;
1313
use std::sync::Arc;
1414
use std::time::Duration;
1515

16-
use differential_dataflow::AsCollection;
16+
use differential_dataflow::{AsCollection, Hashable};
1717
use futures::StreamExt;
1818
use itertools::Itertools;
1919
use mz_ore::cast::CastFrom;
2020
use mz_ore::iter::IteratorExt;
21+
use mz_ore::now::NowFn;
2122
use mz_repr::{Diff, GlobalId, Row};
2223
use mz_storage_types::errors::DataflowError;
2324
use mz_storage_types::sources::load_generator::{
2425
Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
2526
LoadGeneratorSourceConnection,
2627
};
2728
use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
28-
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
29+
use mz_timely_util::builder_async::{
30+
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
31+
};
2932
use mz_timely_util::containers::stack::AccountedStackBuilder;
3033
use timely::container::CapacityContainerBuilder;
34+
use timely::dataflow::channels::pact::Pipeline;
3135
use timely::dataflow::operators::core::Partition;
3236
use timely::dataflow::{Scope, Stream};
33-
use timely::progress::Antichain;
37+
use timely::progress::{Antichain, Timestamp};
3438
use tokio::time::{Instant, interval_at};
3539

3640
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
@@ -207,9 +211,8 @@ impl SourceRender for LoadGeneratorSourceConnection {
207211
start_signal: impl std::future::Future<Output = ()> + 'static,
208212
) -> (
209213
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
210-
Stream<G, Infallible>,
211214
Stream<G, HealthStatusMessage>,
212-
Option<Stream<G, Probe<MzOffset>>>,
215+
Stream<G, Probe<MzOffset>>,
213216
Vec<PressOnDropButton>,
214217
) {
215218
let generator_kind = GeneratorKind::new(
@@ -218,10 +221,17 @@ impl SourceRender for LoadGeneratorSourceConnection {
218221
self.as_of,
219222
self.up_to,
220223
);
221-
let (updates, uppers, health, button) =
224+
let (updates, progress, health, button) =
222225
generator_kind.render(scope, config, committed_uppers, start_signal);
223226

224-
(updates, uppers, health, None, button)
227+
let probe_stream = synthesize_probes(
228+
config.id,
229+
&progress,
230+
config.timestamp_interval,
231+
config.now_fn.clone(),
232+
);
233+
234+
(updates, health, probe_stream, button)
225235
}
226236
}
227237

@@ -450,3 +460,68 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
450460
vec![button.press_on_drop()],
451461
)
452462
}
463+
464+
/// Synthesizes a probe stream that produces the frontier of the given progress stream at the given
465+
/// interval.
466+
///
467+
/// This is used as a fallback for sources that don't support probing the frontier of the upstream
468+
/// system.
469+
fn synthesize_probes<G>(
470+
source_id: GlobalId,
471+
progress: &Stream<G, Infallible>,
472+
interval: Duration,
473+
now_fn: NowFn,
474+
) -> Stream<G, Probe<G::Timestamp>>
475+
where
476+
G: Scope,
477+
{
478+
let scope = progress.scope();
479+
480+
let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
481+
let is_active_worker = active_worker == scope.index();
482+
483+
let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
484+
let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
485+
let mut input = op.new_input_for(progress, Pipeline, &output);
486+
487+
op.build(|caps| async move {
488+
if !is_active_worker {
489+
return;
490+
}
491+
492+
let [cap] = caps.try_into().expect("one capability per output");
493+
494+
let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());
495+
496+
let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
497+
let mut frontier = minimum_frontier.clone();
498+
loop {
499+
tokio::select! {
500+
event = input.next() => match event {
501+
Some(AsyncEvent::Progress(progress)) => frontier = progress,
502+
Some(AsyncEvent::Data(..)) => unreachable!(),
503+
None => break,
504+
},
505+
// We only report a probe if the source upper frontier is not the minimum frontier.
506+
// This makes it so the first remap binding corresponds to the snapshot of the
507+
// source, and because the first binding always maps to the minimum *target*
508+
// frontier we guarantee that the source will never appear empty.
509+
probe_ts = ticker.tick(), if frontier != minimum_frontier => {
510+
let probe = Probe {
511+
probe_ts,
512+
upstream_frontier: frontier.clone(),
513+
};
514+
output.give(&cap, probe);
515+
}
516+
}
517+
}
518+
519+
let probe = Probe {
520+
probe_ts: now_fn().into(),
521+
upstream_frontier: Antichain::new(),
522+
};
523+
output.give(&cap, probe);
524+
});
525+
526+
output_stream
527+
}

src/storage/src/source/kafka.rs

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
use std::collections::BTreeMap;
1111
use std::collections::btree_map::Entry;
12-
use std::convert::Infallible;
1312
use std::str::{self};
1413
use std::sync::Arc;
1514
use std::thread;
@@ -136,8 +135,6 @@ pub struct KafkaSourceReader {
136135
struct PartitionCapability {
137136
/// The capability of the data produced
138137
data: Capability<KafkaTimestamp>,
139-
/// The capability of the progress stream
140-
progress: Capability<KafkaTimestamp>,
141138
}
142139

143140
/// The high watermark offsets of a Kafka partition.
@@ -186,14 +183,13 @@ impl SourceRender for KafkaSourceConnection {
186183
start_signal: impl std::future::Future<Output = ()> + 'static,
187184
) -> (
188185
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
189-
Stream<G, Infallible>,
190186
Stream<G, HealthStatusMessage>,
191-
Option<Stream<G, Probe<KafkaTimestamp>>>,
187+
Stream<G, Probe<KafkaTimestamp>>,
192188
Vec<PressOnDropButton>,
193189
) {
194190
let (metadata, probes, metadata_token) =
195191
render_metadata_fetcher(scope, self.clone(), config.clone());
196-
let (data, progress, health, reader_token) = render_reader(
192+
let (data, health, reader_token) = render_reader(
197193
scope,
198194
self,
199195
config.clone(),
@@ -221,9 +217,8 @@ impl SourceRender for KafkaSourceConnection {
221217

222218
(
223219
data_collections,
224-
progress,
225220
health,
226-
Some(probes),
221+
probes,
227222
vec![metadata_token, reader_token],
228223
)
229224
}
@@ -242,15 +237,13 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
242237
start_signal: impl std::future::Future<Output = ()> + 'static,
243238
) -> (
244239
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
245-
Stream<G, Infallible>,
246240
Stream<G, HealthStatusMessage>,
247241
PressOnDropButton,
248242
) {
249243
let name = format!("KafkaReader({})", config.id);
250244
let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
251245

252246
let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
253-
let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
254247
let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
255248

256249
let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
@@ -307,7 +300,7 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
307300
let busy_signal = Arc::clone(&config.busy_signal);
308301
let button = builder.build(move |caps| {
309302
SignaledFuture::new(busy_signal, async move {
310-
let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
303+
let [mut data_cap, health_cap] = caps.try_into().unwrap();
311304

312305
let client_id = connection.client_id(
313306
config.config.config_set(),
@@ -363,7 +356,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
363356
);
364357
let part_cap = PartitionCapability {
365358
data: data_cap.delayed(&part_ts),
366-
progress: progress_cap.delayed(&part_ts),
367359
};
368360
partition_capabilities.insert(*pid, part_cap);
369361
}
@@ -375,7 +367,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
375367
let future_ts =
376368
Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
377369
data_cap.downgrade(&future_ts);
378-
progress_cap.downgrade(&future_ts);
379370

380371
info!(
381372
source_id = config.id.to_string(),
@@ -620,23 +611,9 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
620611
RangeBound::exact(pid),
621612
MzOffset::from(start_offset),
622613
);
623-
let part_upper_ts = Partitioned::new_singleton(
624-
RangeBound::exact(pid),
625-
MzOffset::from(high_watermark),
626-
);
627614

628-
// This is the moment at which we have discovered a new partition
629-
// and we need to make sure we produce its initial snapshot at a,
630-
// single timestamp so that the source transitions from no data
631-
// from this partition to all the data of this partition. We do
632-
// this by initializing the data capability to the starting offset
633-
// and, importantly, the progress capability directly to the high
634-
// watermark. This jump of the progress capability ensures that
635-
// everything until the high watermark will be reclocked to a
636-
// single point.
637615
entry.insert(PartitionCapability {
638616
data: data_cap.delayed(&part_since_ts),
639-
progress: progress_cap.delayed(&part_upper_ts),
640617
});
641618
}
642619
}
@@ -682,7 +659,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
682659
}
683660

684661
data_cap.downgrade(&future_ts);
685-
progress_cap.downgrade(&future_ts);
686662
}
687663
Some(MetadataUpdate::TransientError(status)) => {
688664
if let Some(update) = status.kafka {
@@ -955,11 +931,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
955931
}
956932
};
957933

958-
// We use try_downgrade here because during the initial snapshot phase the
959-
// data capability is not beyond the progress capability and therefore a
960-
// normal downgrade would panic. Once it catches up though the data
961-
// capbility is what's pushing the progress capability forward.
962-
let _ = part_cap.progress.try_downgrade(&upper);
963934
}
964935
}
965936

@@ -980,7 +951,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
980951

981952
(
982953
stream.as_collection(),
983-
progress_stream,
984954
health_stream,
985955
button.press_on_drop(),
986956
)

src/storage/src/source/mysql.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
//! trigger a restart of the dataflow.
5252
5353
use std::collections::BTreeMap;
54-
use std::convert::Infallible;
5554
use std::fmt;
5655
use std::io;
5756
use std::rc::Rc;
@@ -110,9 +109,8 @@ impl SourceRender for MySqlSourceConnection {
110109
_start_signal: impl std::future::Future<Output = ()> + 'static,
111110
) -> (
112111
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
113-
Stream<G, Infallible>,
114112
Stream<G, HealthStatusMessage>,
115-
Option<Stream<G, Probe<GtidPartition>>>,
113+
Stream<G, Probe<GtidPartition>>,
116114
Vec<PressOnDropButton>,
117115
) {
118116
// Collect the source outputs that we will be exporting.
@@ -163,7 +161,7 @@ impl SourceRender for MySqlSourceConnection {
163161
metrics.snapshot_metrics.clone(),
164162
);
165163

166-
let (repl_updates, uppers, repl_err, repl_token) = replication::render(
164+
let (repl_updates, repl_err, repl_token) = replication::render(
167165
scope.clone(),
168166
config.clone(),
169167
self.clone(),
@@ -234,9 +232,8 @@ impl SourceRender for MySqlSourceConnection {
234232

235233
(
236234
data_collections,
237-
uppers,
238235
health,
239-
Some(probe_stream),
236+
probe_stream,
240237
vec![snapshot_token, repl_token, stats_token],
241238
)
242239
}

0 commit comments

Comments
 (0)