Skip to content

Commit c558ace

Browse files
onursaticilwwmanning
authored andcommitted
buffered strategy to not use eof for the final chunk (#7219)
## Summary fixes buffered layout writer so it doesn't write the final chunk on the eof pointer. Eof should only be used for data that the writer wants to place at the end of the file. Buffered writer was writing regular buffered data to there which did mess up ordering of some segments. Previously struct writer was using a transposed stream without spawning a task per column, on that world buffering was deadlocky. That is changed for a while to spawn now, so we should be deadlock safe. I did try converting all clickbench files repeatedly, as well as the public bi datasets and randomly generated wide tables but I couldn't deadlock this. fixes #7234 fixes #7236 ## Testing add a vortex file test that asserts the dict layout segments are in the right order, as well as zone maps across columns --------- Signed-off-by: Onur Satici <onur@spiraldb.com> Signed-off-by: Will Manning <will@willmanning.io>
1 parent c2dd0c8 commit c558ace

2 files changed

Lines changed: 194 additions & 15 deletions

File tree

vortex-file/src/tests.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use vortex_buffer::ByteBufferMut;
6262
use vortex_buffer::buffer;
6363
use vortex_error::VortexResult;
6464
use vortex_io::session::RuntimeSession;
65+
use vortex_layout::Layout;
6566
use vortex_layout::scan::scan_builder::ScanBuilder;
6667
use vortex_layout::session::LayoutSession;
6768
use vortex_session::VortexSession;
@@ -71,6 +72,7 @@ use crate::V1_FOOTER_FBS_SIZE;
7172
use crate::VERSION;
7273
use crate::VortexFile;
7374
use crate::WriteOptionsSessionExt;
75+
use crate::footer::SegmentSpec;
7476

7577
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
7678
let mut session = VortexSession::empty()
@@ -1696,3 +1698,176 @@ async fn timestamp_unit_mismatch_errors_with_constant_children()
16961698

16971699
Ok(())
16981700
}
1701+
1702+
/// Collect all segment byte offsets reachable from a layout node.
1703+
fn collect_segment_offsets(layout: &dyn Layout, segment_specs: &[SegmentSpec]) -> Vec<u64> {
1704+
let mut result = Vec::new();
1705+
collect_segment_offsets_inner(layout, segment_specs, &mut result);
1706+
result
1707+
}
1708+
1709+
fn collect_segment_offsets_inner(
1710+
layout: &dyn Layout,
1711+
segment_specs: &[SegmentSpec],
1712+
result: &mut Vec<u64>,
1713+
) {
1714+
for seg_id in layout.segment_ids() {
1715+
result.push(segment_specs[*seg_id as usize].offset);
1716+
}
1717+
for child in layout.children().unwrap() {
1718+
collect_segment_offsets_inner(child.as_ref(), segment_specs, result);
1719+
}
1720+
}
1721+
1722+
/// Assert that all offsets in `before` are less than all offsets in `after`.
1723+
fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) {
1724+
if let (Some(&max_before), Some(&min_after)) = (before.iter().max(), after.iter().min()) {
1725+
assert!(
1726+
max_before < min_after,
1727+
"{context}: expected all 'before' offsets < all 'after' offsets, \
1728+
but max before = {max_before} >= min after = {min_after}"
1729+
);
1730+
}
1731+
}
1732+
1733+
#[tokio::test]
1734+
#[cfg_attr(miri, ignore)]
1735+
async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {
1736+
// Create low-cardinality strings to trigger dict encoding, plus an integer column.
1737+
let n = 100_000;
1738+
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
1739+
let strings = VarBinArray::from(values).into_array();
1740+
let numbers = PrimitiveArray::from_iter(0..n as i32).into_array();
1741+
1742+
let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap();
1743+
1744+
let mut buf = ByteBufferMut::empty();
1745+
let summary = SESSION
1746+
.write_options()
1747+
.write(&mut buf, st.to_array_stream())
1748+
.await?;
1749+
1750+
let footer = summary.footer();
1751+
let segment_specs = footer.segment_map();
1752+
let root = footer.layout();
1753+
1754+
// Walk the layout tree and find all dict layouts.
1755+
// Verify codes segments come before values segments in byte order within each run.
1756+
fn check_dict_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) {
1757+
if layout.encoding_id().as_ref() == "vortex.dict" {
1758+
// child 0 = values, child 1 = codes
1759+
let values_offsets =
1760+
collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs);
1761+
let codes_offsets =
1762+
collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs);
1763+
1764+
assert_offsets_ordered(
1765+
&codes_offsets,
1766+
&values_offsets,
1767+
"dict: codes should come before values",
1768+
);
1769+
}
1770+
1771+
for child in layout.children().unwrap() {
1772+
check_dict_ordering(child.as_ref(), segment_specs);
1773+
}
1774+
}
1775+
1776+
check_dict_ordering(root.as_ref(), segment_specs);
1777+
1778+
Ok(())
1779+
}
1780+
1781+
#[tokio::test]
1782+
#[cfg_attr(miri, ignore)]
1783+
async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> {
1784+
// Create a multi-column struct with enough rows to produce zone maps.
1785+
let n = 100_000;
1786+
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
1787+
let strings = VarBinArray::from(values).into_array();
1788+
let numbers = PrimitiveArray::from_iter(0..n as i32).into_array();
1789+
let floats = PrimitiveArray::from_iter((0..n).map(|i| i as f64 * 0.1)).into_array();
1790+
1791+
let st = StructArray::from_fields(&[
1792+
("strings", strings),
1793+
("numbers", numbers),
1794+
("floats", floats),
1795+
])
1796+
.unwrap();
1797+
1798+
let mut buf = ByteBufferMut::empty();
1799+
let summary = SESSION
1800+
.write_options()
1801+
.write(&mut buf, st.to_array_stream())
1802+
.await?;
1803+
1804+
let footer = summary.footer();
1805+
let segment_specs = footer.segment_map();
1806+
let root = footer.layout();
1807+
1808+
// Find all zoned layouts and verify data segments come before zone map segments.
1809+
fn check_zoned_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) {
1810+
if layout.encoding_id().as_ref() == "vortex.stats" {
1811+
// child 0 = data, child 1 = zones
1812+
let data_offsets =
1813+
collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs);
1814+
let zones_offsets =
1815+
collect_segment_offsets(layout.child(1).unwrap().as_ref(), segment_specs);
1816+
1817+
assert_offsets_ordered(
1818+
&data_offsets,
1819+
&zones_offsets,
1820+
"zoned: data should come before zones",
1821+
);
1822+
}
1823+
1824+
for child in layout.children().unwrap() {
1825+
check_zoned_ordering(child.as_ref(), segment_specs);
1826+
}
1827+
}
1828+
1829+
check_zoned_ordering(root.as_ref(), segment_specs);
1830+
1831+
// Additionally: all zone map segments across all columns should appear after
1832+
// all data segments across all columns.
1833+
let mut all_data_offsets = Vec::new();
1834+
let mut all_zones_offsets = Vec::new();
1835+
1836+
fn collect_all_zoned(
1837+
layout: &dyn Layout,
1838+
segment_specs: &[SegmentSpec],
1839+
all_data: &mut Vec<u64>,
1840+
all_zones: &mut Vec<u64>,
1841+
) {
1842+
if layout.encoding_id().as_ref() == "vortex.stats" {
1843+
// child 0 = data, child 1 = zones
1844+
all_data.extend(collect_segment_offsets(
1845+
layout.child(0).unwrap().as_ref(),
1846+
segment_specs,
1847+
));
1848+
all_zones.extend(collect_segment_offsets(
1849+
layout.child(1).unwrap().as_ref(),
1850+
segment_specs,
1851+
));
1852+
return;
1853+
}
1854+
for child in layout.children().unwrap() {
1855+
collect_all_zoned(child.as_ref(), segment_specs, all_data, all_zones);
1856+
}
1857+
}
1858+
1859+
collect_all_zoned(
1860+
root.as_ref(),
1861+
segment_specs,
1862+
&mut all_data_offsets,
1863+
&mut all_zones_offsets,
1864+
);
1865+
1866+
assert_offsets_ordered(
1867+
&all_data_offsets,
1868+
&all_zones_offsets,
1869+
"global: all data segments should come before all zone map segments",
1870+
);
1871+
1872+
Ok(())
1873+
}

vortex-layout/src/layouts/buffered.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::sync::atomic::Ordering;
99
use async_stream::try_stream;
1010
use async_trait::async_trait;
1111
use futures::StreamExt as _;
12+
use futures::pin_mut;
1213
use vortex_array::ArrayContext;
1314
use vortex_error::VortexResult;
1415
use vortex_io::runtime::Handle;
@@ -44,20 +45,18 @@ impl LayoutStrategy for BufferedStrategy {
4445
&self,
4546
ctx: ArrayContext,
4647
segment_sink: SegmentSinkRef,
47-
mut stream: SendableSequentialStream,
48-
mut eof: SequencePointer,
48+
stream: SendableSequentialStream,
49+
eof: SequencePointer,
4950
handle: Handle,
5051
) -> VortexResult<LayoutRef> {
5152
let dtype = stream.dtype().clone();
5253
let buffer_size = self.buffer_size;
5354

54-
// We have no choice but to put our final buffers here!
55-
// We cannot hold on to sequence ids across iterations of the stream, otherwise we can
56-
// cause deadlocks with other columns that are waiting for us to flush.
57-
let mut final_flush = eof.split_off();
58-
5955
let buffered_bytes_counter = self.buffered_bytes.clone();
6056
let buffered_stream = try_stream! {
57+
let stream = stream.peekable();
58+
pin_mut!(stream);
59+
6160
let mut nbytes = 0u64;
6261
let mut chunks = VecDeque::new();
6362

@@ -68,11 +67,23 @@ impl LayoutStrategy for BufferedStrategy {
6867
buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
6968
chunks.push_back(chunk);
7069

70+
// If this is the last element, flush everything.
71+
if stream.as_mut().peek().await.is_none() {
72+
let mut sequence_ptr = sequence_id.descend();
73+
while let Some(chunk) = chunks.pop_front() {
74+
let chunk_size = chunk.nbytes();
75+
nbytes -= chunk_size;
76+
buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
77+
yield (sequence_ptr.advance(), chunk)
78+
}
79+
break;
80+
}
81+
7182
if nbytes < 2 * buffer_size {
7283
continue;
7384
};
7485

75-
// Wait until we're at 2x the buffer size before flushing 1x the buffer size
86+
// Wait until we're at 2x the buffer size before flushing 1x the buffer size.
7687
// This avoids small tail stragglers being flushed at the end of the file.
7788
let mut sequence_ptr = sequence_id.descend();
7889
while nbytes > buffer_size {
@@ -85,13 +96,6 @@ impl LayoutStrategy for BufferedStrategy {
8596
yield (sequence_ptr.advance(), chunk)
8697
}
8798
}
88-
89-
// Now the input stream has ended, flush everything
90-
while let Some(chunk) = chunks.pop_front() {
91-
let chunk_size = chunk.nbytes();
92-
buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
93-
yield (final_flush.advance(), chunk)
94-
}
9599
};
96100

97101
self.child

0 commit comments

Comments
 (0)