Skip to content

Commit 15d19fe

Browse files
authored
Refactor VortexWrite to return Stream<Buffer> and Footer (#4557)
I attempted this in #4549 , but it was hanging, which has been resolved by switching over to using the new runtimes API for execution. I could have made this change separately, but both this PR and switching to runtimes adds an argument to write_stream function so would have been a conflict nightmare 😢 --- This revives an old branch I had worked on with @onursatici about changing the Vortex write API to return a Stream and be completely independent from the actual file sink. This PR exposes a new write_stream function, and we can remove the old write(VortexWrite, ArrayStream) APIs in a future PR. We should decide whether we want to make that change prior to releasing to avoid multiple user-facing breaks to this Rust API. In order to make this change easier, we now pass an explicit eof-of-file pointer into the writers that they can use to create new streams that occur "just before the end of the file", as is required for zone maps for example. We also remove the custom transpose_stream implementation that was quite dependent on the polling behaviour of `try_join_all`, which alarmingly changes when the number of futures reaches 40. Closes #4549 Fixes #4552 Fixes #4064 --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 4011e4b commit 15d19fe

70 files changed

Lines changed: 1967 additions & 1647 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/src/clickbench/clickbench_data.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ pub async fn convert_parquet_to_vortex(
195195
compaction
196196
);
197197
let array_stream = parquet_to_vortex(parquet_file_path)?;
198-
let f = OpenOptions::new()
198+
let mut f = OpenOptions::new()
199199
.write(true)
200200
.truncate(true)
201201
.create(true)
@@ -204,7 +204,7 @@ pub async fn convert_parquet_to_vortex(
204204

205205
let write_options = compaction.apply_options(VortexWriteOptions::default());
206206

207-
write_options.write(f, array_stream).await?;
207+
write_options.write_tokio(&mut f, array_stream).await?;
208208

209209
anyhow::Ok(())
210210
})

bench-vortex/src/compress/vortex.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,16 @@ use std::io::Cursor;
55
use std::sync::Arc;
66

77
use bytes::Bytes;
8-
use tokio::runtime::Handle;
98
use vortex::Array;
10-
use vortex::file::{VortexOpenOptions, VortexWriteOptions, WriteStrategyBuilder};
9+
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
1110

1211
#[inline(never)]
1312
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> anyhow::Result<u64> {
14-
Ok(VortexWriteOptions::default()
15-
.with_strategy(
16-
WriteStrategyBuilder::new()
17-
.with_executor(Arc::new(Handle::current()))
18-
.build(),
19-
)
20-
.write(Cursor::new(buf), array.to_array_stream())
21-
.await?
22-
.position())
13+
let mut cursor = Cursor::new(buf);
14+
VortexWriteOptions::default()
15+
.write_tokio(&mut cursor, array.to_array_stream())
16+
.await?;
17+
Ok(cursor.position())
2318
}
2419

2520
#[inline(never)]

bench-vortex/src/datasets/taxi_data.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ pub async fn fetch_taxi_data() -> Result<ArrayRef> {
4949
pub async fn taxi_data_vortex() -> Result<PathBuf> {
5050
idempotent_async("taxi.vortex", |output_fname| async move {
5151
let buf = output_fname.to_path_buf();
52-
let output_file = File::create(output_fname).await?;
52+
let mut output_file = File::create(output_fname).await?;
5353
VortexWriteOptions::default()
54-
.write(output_file, parquet_to_vortex(taxi_data_parquet().await?)?)
55-
.await?
56-
.flush()
54+
.write_tokio(
55+
&mut output_file,
56+
parquet_to_vortex(taxi_data_parquet().await?)?,
57+
)
5758
.await?;
59+
output_file.flush().await?;
5860
Ok(buf)
5961
})
6062
.await

bench-vortex/src/public_bi.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ use log::trace;
2727
use regex::Regex;
2828
use tokio::fs::File;
2929
use tokio::process::Command as TokioCommand;
30-
use tokio::runtime::Handle;
3130
use tracing::info;
3231
use url::Url;
3332
use vortex::ArrayRef;
3433
use vortex::error::{VortexResult, vortex_err};
35-
use vortex::file::{VortexOpenOptions, VortexWriteOptions, WriteStrategyBuilder};
34+
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
3635
use vortex::iter::ArrayIteratorExt;
3736
use vortex::utils::aliases::hash_map::HashMap;
3837
use vortex_datafusion::VortexFormat;
@@ -346,13 +345,8 @@ impl PBIData {
346345
let vortex_file =
347346
idempotent_async(&vortex, async |output_path| -> anyhow::Result<()> {
348347
VortexWriteOptions::default()
349-
.with_strategy(
350-
WriteStrategyBuilder::new()
351-
.with_executor(Arc::new(Handle::current()))
352-
.build(),
353-
)
354-
.write(
355-
File::create(output_path)
348+
.write_tokio(
349+
&mut File::create(output_path)
356350
.await
357351
.map_err(|e| anyhow::anyhow!("Failed to create file: {}", e))?,
358352
parquet_to_vortex(parquet)?,

bench-vortex/src/statpopgen/download_vcf.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::io;
5-
use std::sync::Arc;
65

76
use anyhow::{Context, Result, bail};
87
use futures::StreamExt;
@@ -12,7 +11,6 @@ use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
1211
use reqwest::Client;
1312
use tokio::fs::{File, create_dir_all};
1413
use tokio::io::BufReader;
15-
use tokio::runtime::Handle;
1614
use tokio_util::io::StreamReader;
1715
use tracing::info;
1816
use vortex::ArrayRef;
@@ -132,7 +130,7 @@ impl StatPopGenBenchmark {
132130

133131
pub async fn parquet_to_vortex(&self, format: Format) -> Result<()> {
134132
let parquet_path = self.parquet_path()?;
135-
let strategy = WriteStrategyBuilder::new().with_executor(Arc::new(Handle::current()));
133+
let strategy = WriteStrategyBuilder::new();
136134
let (output_path, strategy) = match format {
137135
Format::OnDiskVortex => (self.vortex_path()?, strategy),
138136
Format::VortexCompact => (
@@ -179,8 +177,8 @@ impl StatPopGenBenchmark {
179177

180178
VortexWriteOptions::default()
181179
.with_strategy(strategy.build())
182-
.write(
183-
File::create(output_path).await?,
180+
.write_tokio(
181+
&mut File::create(output_path).await?,
184182
ArrayStreamAdapter::new(dtype, vortex_stream),
185183
)
186184
.await?;

bench-vortex/src/tpch/tpchgen.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ impl VortexWriter {
381381
let write_task = Some(tokio::spawn(async move {
382382
let stream = ArrayStreamAdapter::new(dtype, ReceiverStream::new(receiver));
383383

384-
let file = TokioFile::create(&file_path).await?;
384+
let mut file = TokioFile::create(&file_path).await?;
385385
compaction_strategy
386386
.apply_options(VortexWriteOptions::default())
387-
.write(file, stream)
387+
.write_tokio(&mut file, stream)
388388
.await
389389
.map_err(|e| anyhow!("Vortex write failed: {}", e))?;
390390

encodings/sequence/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ vortex-array = { workspace = true }
2020
vortex-buffer = { workspace = true }
2121
vortex-dtype = { workspace = true }
2222
vortex-error = { workspace = true }
23+
vortex-io = { workspace = true }
2324
vortex-mask = { workspace = true }
2425
vortex-proto = { workspace = true }
2526
vortex-scalar = { workspace = true }

encodings/sequence/src/serde.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ mod tests {
9797
use vortex_dtype::Nullability;
9898
use vortex_expr::{get_item, root};
9999
use vortex_file::{VortexOpenOptions, VortexWriteOptions};
100+
use vortex_io::runtime::tokio::TokioRuntime;
100101
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
101102

102103
use crate::SequenceArray;
@@ -106,10 +107,10 @@ mod tests {
106107
let seq = SequenceArray::typed_new(2i8, 3, Nullability::NonNullable, 4).unwrap();
107108
let st = StructArray::from_fields(&[("a", seq.to_array())]).unwrap();
108109

109-
let file = tokio::fs::File::create("/tmp/abc.vx").await.unwrap();
110+
let mut file = tokio::fs::File::create("/tmp/abc.vx").await.unwrap();
110111
VortexWriteOptions::default()
111112
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
112-
.write(file, st.to_array_stream())
113+
.write(&mut file, st.to_array_stream(), TokioRuntime::handle())
113114
.await
114115
.unwrap();
115116

fuzz/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ vortex-dtype = { workspace = true, features = ["arbitrary"] }
3131
vortex-error = { workspace = true }
3232
vortex-expr = { workspace = true, features = ["arbitrary"] }
3333
vortex-file = { workspace = true, features = ["tokio"] }
34+
vortex-io = { workspace = true }
3435
vortex-mask = { workspace = true }
3536
vortex-scalar = { workspace = true, features = ["arbitrary"] }
3637
vortex-utils = { workspace = true }

0 commit comments

Comments
 (0)