File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ use vortex::dtype::DType;
1313use vortex:: dtype:: arrow:: FromArrowType ;
1414use vortex:: error:: { VortexError , VortexExpect } ;
1515use vortex:: file:: VortexWriteOptions as WriteOptions ;
16+ use vortex:: io:: VortexWrite ;
1617use vortex:: io:: runtime:: tokio:: TokioRuntime ;
1718use vortex:: iter:: { ArrayIteratorAdapter , ArrayIteratorExt } ;
1819use vortex:: stream:: ArrayStream ;
@@ -67,6 +68,7 @@ pub(crate) unsafe fn write_array_stream(
6768 RUNTIME . block_on ( async {
6869 let mut file = tokio:: fs:: File :: create ( path) . await ?;
6970 options. inner . write ( & mut file, vortex_stream) . await ?;
71+ file. shutdown ( ) . await ?;
7072 Ok ( ( ) )
7173 } )
7274}
Original file line number Diff line number Diff line change @@ -8,6 +8,7 @@ use futures::StreamExt;
88use indicatif:: ProgressBar ;
99use parquet:: arrow:: ParquetRecordBatchStreamBuilder ;
1010use tokio:: fs:: File ;
11+ use tokio:: io:: AsyncWriteExt ;
1112use vortex:: ArrayRef ;
1213use vortex:: arrow:: FromArrowArray ;
1314use vortex:: compressor:: CompactCompressor ;
@@ -80,13 +81,12 @@ pub async fn exec_convert(flags: Flags) -> anyhow::Result<()> {
8081 Strategy :: Compact => strategy. with_compressor ( CompactCompressor :: default ( ) ) ,
8182 } ;
8283
84+ let mut file = File :: create ( output_path) . await ?;
8385 VortexWriteOptions :: default ( )
8486 . with_strategy ( strategy. build ( ) )
85- . write (
86- & mut File :: create ( output_path) . await ?,
87- ArrayStreamAdapter :: new ( dtype, vortex_stream) ,
88- )
87+ . write ( & mut file, ArrayStreamAdapter :: new ( dtype, vortex_stream) )
8988 . await ?;
89+ file. shutdown ( ) . await ?;
9090
9191 Ok ( ( ) )
9292}
You can’t perform that action at this time.
0 commit comments