Skip to content

Commit babaf45

Browse files
committed
Merge remote-tracking branch 'origin/develop' into ji/better-execute
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk> # Conflicts: # vortex-array/src/arrays/filter/vtable.rs # vortex-array/src/arrays/masked/vtable/mod.rs # vortex-array/src/arrays/slice/vtable.rs
2 parents 37acce6 + cdebcdc commit babaf45

159 files changed

Lines changed: 2474 additions & 1216 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ needless_range_loop = "allow"
349349
or_fun_call = "deny"
350350
panic = "deny"
351351
# panic_in_result_fn = "deny" -- we cannot disable this for tests to use assertions
352+
clone_on_ref_ptr = "deny"
352353
redundant_clone = "deny"
353354
same_name_method = "deny"
354355
tests_outside_test_module = "deny"

benchmarks/compress-bench/src/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Compressor for ParquetCompressor {
5353
// Read the input parquet file
5454
let file = File::open(parquet_path)?;
5555
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
56-
let schema = builder.schema().clone();
56+
let schema = Arc::clone(builder.schema());
5757
let reader = builder.build()?;
5858
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
5959

@@ -69,7 +69,7 @@ impl Compressor for ParquetCompressor {
6969
// First compress to get the bytes we'll decompress
7070
let file = File::open(parquet_path)?;
7171
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
72-
let schema = builder.schema().clone();
72+
let schema = Arc::clone(builder.schema());
7373
let reader = builder.build()?;
7474
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
7575

benchmarks/datafusion-bench/src/lib.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,10 @@ pub fn make_object_store(
8181
.with_bucket_name(bucket_name)
8282
.build()?,
8383
);
84-
session
85-
.register_object_store(&Url::parse(&format!("s3://{bucket_name}/"))?, s3.clone());
84+
session.register_object_store(
85+
&Url::parse(&format!("s3://{bucket_name}/"))?,
86+
Arc::<object_store::aws::AmazonS3>::clone(&s3),
87+
);
8688
Ok(s3)
8789
}
8890
"gs" => {
@@ -92,13 +94,16 @@ pub fn make_object_store(
9294
.with_bucket_name(bucket_name)
9395
.build()?,
9496
);
95-
session
96-
.register_object_store(&Url::parse(&format!("gs://{bucket_name}/"))?, gcs.clone());
97+
session.register_object_store(
98+
&Url::parse(&format!("gs://{bucket_name}/"))?,
99+
Arc::<object_store::gcp::GoogleCloudStorage>::clone(&gcs),
100+
);
97101
Ok(gcs)
98102
}
99103
_ => {
100104
let fs = Arc::new(LocalFileSystem::default());
101-
session.register_object_store(&Url::parse("file:/")?, fs.clone());
105+
session
106+
.register_object_store(&Url::parse("file:/")?, Arc::<LocalFileSystem>::clone(&fs));
102107
Ok(fs)
103108
}
104109
}

benchmarks/datafusion-bench/src/main.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async fn main() -> anyhow::Result<()> {
203203
.iter()
204204
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
205205
{
206-
plans_mut.push((query_idx, *format, plan.clone()));
206+
plans_mut.push((query_idx, *format, Arc::clone(&plan)));
207207
}
208208
}
209209

@@ -252,7 +252,7 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
252252
let pattern = benchmark.pattern(table.name, format);
253253
let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern)?;
254254

255-
let mut listing_options = ListingOptions::new(file_format.clone())
255+
let mut listing_options = ListingOptions::new(Arc::clone(&file_format))
256256
.with_session_config_options(session.state().config());
257257
if benchmark.dataset_name() == "polarsignals" && format == Format::Parquet {
258258
// Work around a DataFusion bug (fixed in 53.0.0) where the
@@ -304,8 +304,10 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
304304
.runtime_env()
305305
.object_store(table_url.object_store())?;
306306

307-
let fs: vortex::io::filesystem::FileSystemRef =
308-
Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle()));
307+
let fs: vortex::io::filesystem::FileSystemRef = Arc::new(ObjectStoreFileSystem::new(
308+
Arc::clone(&store),
309+
SESSION.handle(),
310+
));
309311
let base_prefix = benchmark_base.path().trim_start_matches('/').to_string();
310312
let fs = fs.with_prefix(base_prefix);
311313

@@ -416,7 +418,7 @@ pub async fn execute_query(
416418
.create_physical_plan()
417419
.with_labelset(get_labelset_from_global())
418420
.await?;
419-
let result = collect(plan.clone(), task_ctx)
421+
let result = collect(Arc::clone(&plan), task_ctx)
420422
.with_labelset(get_labelset_from_global())
421423
.await?;
422424

benchmarks/lance-bench/src/compress.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::fs;
55
use std::fs::File;
66
use std::path::Path;
7+
use std::sync::Arc;
78
use std::time::Duration;
89
use std::time::Instant;
910

@@ -92,7 +93,7 @@ impl Compressor for LanceCompressor {
9293
// Read the input parquet file
9394
let file = File::open(parquet_path)?;
9495
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
95-
let schema = builder.schema().clone();
96+
let schema = Arc::clone(builder.schema());
9697
let reader = builder.build()?;
9798
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
9899

@@ -131,7 +132,7 @@ impl Compressor for LanceCompressor {
131132
// First compress to get the Lance dataset
132133
let file = File::open(parquet_path)?;
133134
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
134-
let schema = builder.schema().clone();
135+
let schema = Arc::clone(builder.schema());
135136
let reader = builder.build()?;
136137
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
137138

benchmarks/lance-bench/src/convert.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Iterator for ParquetFilesIterator {
8484

8585
impl RecordBatchReader for ParquetFilesIterator {
8686
fn schema(&self) -> SchemaRef {
87-
self.schema.clone()
87+
Arc::clone(&self.schema)
8888
}
8989
}
9090

@@ -161,7 +161,7 @@ pub async fn convert_parquet_to_lance<'p>(
161161
// Get schema from the first Parquet file
162162
let first_file = File::open(&parquet_files[0])?;
163163
let first_builder = ParquetRecordBatchReaderBuilder::try_new(first_file)?;
164-
let schema = first_builder.schema().clone();
164+
let schema = Arc::clone(first_builder.schema());
165165

166166
// Create a streaming iterator that reads from all Parquet files
167167
let batch_iter = ParquetFilesIterator::new(parquet_files, schema)?;
@@ -237,7 +237,7 @@ pub fn convert_utf8view_batch(batch: RecordBatch) -> anyhow::Result<RecordBatch>
237237
// Cast Utf8View to Utf8.
238238
cast(column, &DataType::Utf8)?
239239
} else {
240-
column.clone()
240+
Arc::clone(column)
241241
};
242242
new_columns.push(new_column);
243243
}
@@ -277,6 +277,6 @@ impl Iterator for ConvertingParquetFilesIterator {
277277

278278
impl RecordBatchReader for ConvertingParquetFilesIterator {
279279
fn schema(&self) -> SchemaRef {
280-
self.converted_schema.clone()
280+
Arc::clone(&self.converted_schema)
281281
}
282282
}

encodings/alp/src/alp/array.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ impl VTable for ALP {
172172
let array = require_child!(array, array.encoded(), ENCODED_SLOT => Primitive);
173173
require_patches!(
174174
array,
175-
array.patches(),
176175
PATCH_INDICES_SLOT,
177176
PATCH_VALUES_SLOT,
178177
PATCH_CHUNK_OFFSETS_SLOT

encodings/alp/src/alp_rd/array.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ impl VTable for ALPRD {
228228
let array = require_child!(array, array.right_parts(), 1 => Primitive);
229229
require_patches!(
230230
array,
231-
array.left_parts_patches(),
232231
LP_PATCH_INDICES_SLOT,
233232
LP_PATCH_VALUES_SLOT,
234233
LP_PATCH_CHUNK_OFFSETS_SLOT

encodings/fastlanes/src/bitpacking/compute/filter.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl FilterKernel for BitPacked {
8282

8383
let patches = array
8484
.patches()
85-
.map(|patches| patches.filter(&Mask::Values(values.clone()), ctx))
85+
.map(|patches| patches.filter(&Mask::Values(Arc::clone(values)), ctx))
8686
.transpose()?
8787
.flatten();
8888

@@ -112,7 +112,9 @@ fn filter_primitive_without_patches<U: UnsignedPType + BitPacking>(
112112
selection: &Arc<MaskValues>,
113113
) -> VortexResult<(Buffer<U>, Validity)> {
114114
let values = filter_with_indices(array.data(), selection.indices());
115-
let validity = array.validity()?.filter(&Mask::Values(selection.clone()))?;
115+
let validity = array
116+
.validity()?
117+
.filter(&Mask::Values(Arc::clone(selection)))?;
116118

117119
Ok((values.freeze(), validity))
118120
}

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::hash::Hash;
55
use std::hash::Hasher;
66

77
use prost::Message;
8-
use vortex_array::AnyCanonical;
98
use vortex_array::Array;
109
use vortex_array::ArrayEq;
1110
use vortex_array::ArrayHash;
@@ -283,17 +282,11 @@ impl VTable for BitPacked {
283282
fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
284283
require_patches!(
285284
array,
286-
array.patches(),
287285
PATCH_INDICES_SLOT,
288286
PATCH_VALUES_SLOT,
289287
PATCH_CHUNK_OFFSETS_SLOT
290288
);
291-
let validity = array.validity()?;
292-
require_validity!(
293-
array,
294-
&validity,
295-
VALIDITY_SLOT => AnyCanonical
296-
);
289+
require_validity!(array, VALIDITY_SLOT);
297290

298291
Ok(ExecutionResult::done(
299292
unpack_array(array.as_view(), ctx)?.into_array(),

0 commit comments

Comments
 (0)