Skip to content

Commit 75d3498

Browse files
committed
Some changes
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 6a4a839 commit 75d3498

25 files changed

Lines changed: 1181 additions & 575 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ use vortex::layout::LayoutReader;
4242
use vortex::metrics::Label;
4343
use vortex::metrics::MetricsRegistry;
4444
use vortex::scan::ScanBuilder;
45-
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT;
46-
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT;
4745
use vortex::session::VortexSession;
4846
use vortex_utils::aliases::dash_map::DashMap;
4947
use vortex_utils::aliases::dash_map::Entry;
@@ -298,7 +296,8 @@ impl FileOpener for VortexOpener {
298296
}
299297
};
300298

301-
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
299+
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader)
300+
.with_segment_source(vxf.segment_source());
302301

303302
if let Some(extensions) = file.extensions
304303
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
@@ -363,8 +362,6 @@ impl FileOpener for VortexOpener {
363362
.with_projection(scan_projection)
364363
.with_some_filter(filter)
365364
.with_ordered(has_output_ordering)
366-
.with_target_output_rows(DEFAULT_TARGET_OUTPUT_ROWS_HINT)
367-
.with_target_output_bytes(DEFAULT_TARGET_OUTPUT_BYTES_HINT)
368365
.map(move |chunk| {
369366
let mut ctx = session.create_execution_ctx();
370367
chunk.execute_record_batch(&stream_schema, &mut ctx)

vortex-file/src/file.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,19 @@ impl VortexFile {
103103
self.session.clone(),
104104
));
105105
}
106-
Ok(Arc::new(LayoutReaderDataSource::new(
107-
reader,
108-
self.session.clone(),
109-
)))
106+
Ok(Arc::new(
107+
LayoutReaderDataSource::new(reader, self.session.clone())
108+
.with_segment_source(self.segment_source()),
109+
))
110110
}
111111

112112
/// Initiate a scan of the file, returning a builder for configuring the scan.
113113
pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
114114
Ok(ScanBuilder::new(
115115
self.session.clone(),
116116
self.layout_reader()?,
117-
))
117+
)
118+
.with_segment_source(self.segment_source()))
118119
}
119120

120121
/// Returns true if the expression will never match any rows in the file.

vortex-file/src/read/driver.rs

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl State {
140140
tracing::debug!(?req, "ReadRequest dropped before registration");
141141
return;
142142
}
143+
self.metrics.registered_requests.add(1);
143144
self.requests_by_offset.insert((req.offset, req.id));
144145
self.requests.insert(req.id, req);
145146
}
@@ -149,20 +150,43 @@ impl State {
149150
self.requests_by_offset.remove(&(req.offset, req_id));
150151
tracing::debug!(?req, "ReadRequest dropped before poll");
151152
} else {
153+
self.metrics.polled_requests.add(1);
152154
self.polled_requests.insert(req_id, req);
153155
}
154156
}
155157
}
156158
ReadEvent::Dropped(req_id) => {
157159
if let Some(req) = self.requests.remove(&req_id) {
160+
self.metrics.dropped_requests.add(1);
158161
self.requests_by_offset.remove(&(req.offset, req_id));
159162
tracing::debug!(?req, "ReadRequest dropped before poll");
160163
}
161164
if let Some(req) = self.polled_requests.remove(&req_id) {
165+
self.metrics.dropped_requests.add(1);
162166
self.requests_by_offset.remove(&(req.offset, req_id));
163167
tracing::debug!(?req, "ReadRequest dropped after poll");
164168
}
165169
}
170+
ReadEvent::BatchBoundary => {
171+
// Promote all registered-but-unpolled requests to polled status.
172+
// This tells the coalescer that the entire batch is needed now,
173+
// allowing it to form optimal coalesced reads.
174+
let promoted = self.requests.len();
175+
if promoted > 0 {
176+
tracing::debug!(
177+
promoted,
178+
"BatchBoundary: promoting registered requests to polled"
179+
);
180+
self.metrics.polled_requests.add(promoted as u64);
181+
for (req_id, req) in std::mem::take(&mut self.requests) {
182+
if req.callback.is_closed() {
183+
self.requests_by_offset.remove(&(req.offset, req_id));
184+
} else {
185+
self.polled_requests.insert(req_id, req);
186+
}
187+
}
188+
}
189+
}
166190
}
167191
}
168192

@@ -215,6 +239,9 @@ impl State {
215239
let first_req = self.next_uncoalesced()?;
216240

217241
let mut requests = vec![first_req];
242+
let mut payload_bytes = requests[0].length as u64;
243+
let mut registered_only_requests = 0usize;
244+
let mut polled_requests = 1usize;
218245
let mut current_start = requests[0].offset;
219246
let mut current_end = requests[0].offset + requests[0].length as u64;
220247
let align = *self.coalesced_buffer_alignment as u64;
@@ -269,18 +296,28 @@ impl State {
269296
let new_total_size = new_end - aligned_start;
270297

271298
if new_total_size > window.max_size {
299+
self.metrics.batched_skipped_max_size.add(1);
272300
// Skip it but keep it available for future coalescing operations.
273301
continue;
274302
}
275303

276304
current_start = new_start;
277305
current_end = new_end;
278-
let req = self
279-
.polled_requests
280-
.remove(&req_id)
281-
.or_else(|| self.requests.remove(&req_id))
282-
.vortex_expect("Missing request in requests_by_offset");
306+
let (req, was_polled) = if let Some(req) = self.polled_requests.remove(&req_id)
307+
{
308+
(req, true)
309+
} else if let Some(req) = self.requests.remove(&req_id) {
310+
(req, false)
311+
} else {
312+
unreachable!("Missing request in requests_by_offset");
313+
};
283314

315+
payload_bytes = payload_bytes.saturating_add(req.length as u64);
316+
if was_polled {
317+
polled_requests = polled_requests.saturating_add(1);
318+
} else {
319+
registered_only_requests = registered_only_requests.saturating_add(1);
320+
}
284321
requests.push(req);
285322
if ids_to_remove.insert(req_id) {
286323
keys_to_remove.push((req_offset, req_id));
@@ -302,6 +339,18 @@ impl State {
302339
requests.sort_unstable_by_key(|r| r.offset);
303340

304341
let aligned_start = current_start - (current_start % align);
342+
let range_bytes = current_end - aligned_start;
343+
344+
self.metrics.batched_range_bytes.update(range_bytes as f64);
345+
self.metrics
346+
.batched_payload_bytes
347+
.update(payload_bytes as f64);
348+
self.metrics
349+
.batched_registered_only_requests
350+
.update(registered_only_requests as f64);
351+
self.metrics
352+
.batched_polled_requests
353+
.update(polled_requests as f64);
305354

306355
tracing::debug!(
307356
"Coalesced {} requests into range {}..{} (len={})",
@@ -808,4 +857,86 @@ mod tests {
808857
assert_eq!(individual_count, 2, "Expected 2 individual requests");
809858
assert_eq!(coalesced_operations, 0, "Expected 0 coalesced operations");
810859
}
860+
861+
#[tokio::test]
862+
async fn test_metrics_record_registered_only_batch_members() {
863+
let (req1, _rx1) = create_request(1, 0, 10);
864+
let (req2, _rx2) = create_request(2, 50, 10);
865+
let (req3, _rx3) = create_request(3, 100, 10);
866+
867+
let events = vec![
868+
ReadEvent::Request(req1),
869+
ReadEvent::Request(req2),
870+
ReadEvent::Request(req3),
871+
ReadEvent::Polled(2),
872+
];
873+
874+
let event_stream = stream::iter(events);
875+
let metrics_registry = DefaultMetricsRegistry::default();
876+
let metrics = RequestMetrics::new(&metrics_registry, vec![]);
877+
let io_stream = IoRequestStream::new(
878+
event_stream,
879+
Some(CoalesceConfig {
880+
distance: 60,
881+
max_size: 1024,
882+
}),
883+
Alignment::none(),
884+
metrics,
885+
);
886+
887+
let outputs: Vec<IoRequest> = io_stream.collect().await;
888+
assert_eq!(outputs.len(), 1);
889+
890+
let snapshot = metrics_registry.snapshot();
891+
let mut registered = 0u64;
892+
let mut polled = 0u64;
893+
let mut coalesced = 0u64;
894+
let mut registered_only_count = 0usize;
895+
let mut registered_only_total = 0.0;
896+
let mut polled_in_batch_count = 0usize;
897+
let mut polled_in_batch_total = 0.0;
898+
899+
for metric in snapshot.iter() {
900+
match metric.value() {
901+
MetricValue::Counter(counter) => match metric.name().as_ref() {
902+
"io.requests.registered" => registered = counter.value(),
903+
"io.requests.polled" => polled = counter.value(),
904+
"io.requests.coalesced" => coalesced = counter.value(),
905+
_ => {}
906+
},
907+
MetricValue::Histogram(histogram) => match metric.name().as_ref() {
908+
"io.requests.batched.registered_only_requests" => {
909+
registered_only_count = histogram.count();
910+
registered_only_total = histogram.total();
911+
}
912+
"io.requests.batched.polled_requests" => {
913+
polled_in_batch_count = histogram.count();
914+
polled_in_batch_total = histogram.total();
915+
}
916+
_ => {}
917+
},
918+
_ => {}
919+
}
920+
}
921+
922+
assert_eq!(registered, 3, "Expected 3 registered requests");
923+
assert_eq!(polled, 1, "Expected 1 polled request");
924+
assert_eq!(coalesced, 1, "Expected 1 coalesced operation");
925+
assert_eq!(
926+
registered_only_count, 1,
927+
"Expected one histogram sample for registered-only requests"
928+
);
929+
assert_eq!(
930+
registered_only_total, 2.0,
931+
"Expected two registered-only requests in the coalesced batch"
932+
);
933+
assert_eq!(
934+
polled_in_batch_count, 1,
935+
"Expected one histogram sample for polled requests"
936+
);
937+
assert_eq!(
938+
polled_in_batch_total, 1.0,
939+
"Expected one polled request in the coalesced batch"
940+
);
941+
}
811942
}

vortex-file/src/segments/source.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ pub enum ReadEvent {
3939
Request(ReadRequest),
4040
Polled(RequestId),
4141
Dropped(RequestId),
42+
/// Signals that a logical batch of requests has been fully registered.
43+
/// The driver promotes all registered-but-unpolled requests to polled status,
44+
/// allowing the coalescer to form optimal reads over the entire batch.
45+
BatchBoundary,
4246
}
4347

4448
/// A [`SegmentSource`] for file-like IO.
@@ -134,6 +138,10 @@ impl FileSegmentSource {
134138
}
135139

136140
impl SegmentSource for FileSegmentSource {
141+
fn flush(&self) {
142+
drop(self.events.unbounded_send(ReadEvent::BatchBoundary));
143+
}
144+
137145
fn request(&self, id: SegmentId) -> SegmentFuture {
138146
// We eagerly register the read request here assuming the behaviour of [`FileRead`], where
139147
// coalescing becomes effective prior to the future being polled.
@@ -232,23 +240,55 @@ impl Drop for ReadFuture {
232240
}
233241

234242
pub struct RequestMetrics {
243+
pub registered_requests: Counter,
244+
pub polled_requests: Counter,
245+
pub dropped_requests: Counter,
235246
pub individual_requests: Counter,
236247
pub coalesced_requests: Counter,
237248
pub num_requests_coalesced: Histogram,
249+
pub batched_range_bytes: Histogram,
250+
pub batched_payload_bytes: Histogram,
251+
pub batched_registered_only_requests: Histogram,
252+
pub batched_polled_requests: Histogram,
253+
pub batched_skipped_max_size: Counter,
238254
}
239255

240256
impl RequestMetrics {
241257
pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
242258
Self {
259+
registered_requests: MetricBuilder::new(metrics_registry)
260+
.add_labels(labels.clone())
261+
.counter("io.requests.registered"),
262+
polled_requests: MetricBuilder::new(metrics_registry)
263+
.add_labels(labels.clone())
264+
.counter("io.requests.polled"),
265+
dropped_requests: MetricBuilder::new(metrics_registry)
266+
.add_labels(labels.clone())
267+
.counter("io.requests.dropped"),
243268
individual_requests: MetricBuilder::new(metrics_registry)
244269
.add_labels(labels.clone())
245270
.counter("io.requests.individual"),
246271
coalesced_requests: MetricBuilder::new(metrics_registry)
247272
.add_labels(labels.clone())
248273
.counter("io.requests.coalesced"),
249274
num_requests_coalesced: MetricBuilder::new(metrics_registry)
250-
.add_labels(labels)
275+
.add_labels(labels.clone())
251276
.histogram("io.requests.coalesced.num_coalesced"),
277+
batched_range_bytes: MetricBuilder::new(metrics_registry)
278+
.add_labels(labels.clone())
279+
.histogram("io.requests.batched.range_bytes"),
280+
batched_payload_bytes: MetricBuilder::new(metrics_registry)
281+
.add_labels(labels.clone())
282+
.histogram("io.requests.batched.payload_bytes"),
283+
batched_registered_only_requests: MetricBuilder::new(metrics_registry)
284+
.add_labels(labels.clone())
285+
.histogram("io.requests.batched.registered_only_requests"),
286+
batched_polled_requests: MetricBuilder::new(metrics_registry)
287+
.add_labels(labels.clone())
288+
.histogram("io.requests.batched.polled_requests"),
289+
batched_skipped_max_size: MetricBuilder::new(metrics_registry)
290+
.add_labels(labels)
291+
.counter("io.requests.batched.skipped_max_size"),
252292
}
253293
}
254294
}

vortex-layout/public-api.lock

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::name(&self) -> &allo
632632

633633
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult<futures_core::future::BoxFuture<'static, vortex_error::VortexResult<vortex_array::array::ArrayRef>>>
634634

635+
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_fetch_hints(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>, row_bytes_hint: usize) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_layout::ProjectionFetchHint>>
636+
635637
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::pruning_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
636638

637639
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, field_mask: &[vortex_array::dtype::field_mask::FieldMask], row_range: &core::ops::range::Range<u64>, splits: &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
@@ -1540,6 +1542,28 @@ pub fn vortex_layout::LazyReaderChildren::get(&self, idx: usize) -> vortex_error
15401542

15411543
pub fn vortex_layout::LazyReaderChildren::new(children: alloc::sync::Arc<dyn vortex_layout::LayoutChildren>, dtypes: alloc::vec::Vec<vortex_array::dtype::DType>, names: alloc::vec::Vec<alloc::sync::Arc<str>>, segment_source: alloc::sync::Arc<dyn vortex_layout::segments::SegmentSource>, session: vortex_session::VortexSession) -> Self
15421544

1545+
pub struct vortex_layout::ProjectionFetchHint
1546+
1547+
pub vortex_layout::ProjectionFetchHint::estimated_fetch_bytes: usize
1548+
1549+
pub vortex_layout::ProjectionFetchHint::row_range: core::ops::range::Range<u64>
1550+
1551+
impl core::clone::Clone for vortex_layout::ProjectionFetchHint
1552+
1553+
pub fn vortex_layout::ProjectionFetchHint::clone(&self) -> vortex_layout::ProjectionFetchHint
1554+
1555+
impl core::cmp::Eq for vortex_layout::ProjectionFetchHint
1556+
1557+
impl core::cmp::PartialEq for vortex_layout::ProjectionFetchHint
1558+
1559+
pub fn vortex_layout::ProjectionFetchHint::eq(&self, other: &vortex_layout::ProjectionFetchHint) -> bool
1560+
1561+
impl core::fmt::Debug for vortex_layout::ProjectionFetchHint
1562+
1563+
pub fn vortex_layout::ProjectionFetchHint::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
1564+
1565+
impl core::marker::StructuralPartialEq for vortex_layout::ProjectionFetchHint
1566+
15431567
pub trait vortex_layout::ArrayFutureExt
15441568

15451569
pub fn vortex_layout::ArrayFutureExt::masked(self, mask: vortex_array::mask_future::MaskFuture) -> Self
@@ -1670,6 +1694,8 @@ pub fn vortex_layout::LayoutReader::name(&self) -> &alloc::sync::Arc<str>
16701694

16711695
pub fn vortex_layout::LayoutReader::projection_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult<vortex_layout::ArrayFuture>
16721696

1697+
pub fn vortex_layout::LayoutReader::projection_fetch_hints(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>, row_bytes_hint: usize) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_layout::ProjectionFetchHint>>
1698+
16731699
pub fn vortex_layout::LayoutReader::pruning_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
16741700

16751701
pub fn vortex_layout::LayoutReader::register_splits(&self, field_mask: &[vortex_array::dtype::field_mask::FieldMask], row_range: &core::ops::range::Range<u64>, splits: &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
@@ -1688,6 +1714,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::name(&self) -> &allo
16881714

16891715
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult<futures_core::future::BoxFuture<'static, vortex_error::VortexResult<vortex_array::array::ArrayRef>>>
16901716

1717+
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_fetch_hints(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>, row_bytes_hint: usize) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_layout::ProjectionFetchHint>>
1718+
16911719
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::pruning_evaluation(&self, row_range: &core::ops::range::Range<u64>, expr: &vortex_array::expr::expression::Expression, mask: vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
16921720

16931721
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, field_mask: &[vortex_array::dtype::field_mask::FieldMask], row_range: &core::ops::range::Range<u64>, splits: &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>

0 commit comments

Comments
 (0)