Skip to content

Commit e09c630

Browse files
committed
fix fmt
1 parent 2b002b7 commit e09c630

8 files changed

Lines changed: 52 additions & 27 deletions

File tree

crates/catalog/rest/src/catalog.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,11 @@ impl CatalogBuilder for RestCatalogBuilder {
130130
"Catalog uri is required",
131131
))
132132
} else {
133-
Ok(RestCatalog::new(self.config, self.storage_factory, self.runtime))
133+
Ok(RestCatalog::new(
134+
self.config,
135+
self.storage_factory,
136+
self.runtime,
137+
))
134138
}
135139
};
136140

crates/catalog/s3tables/src/catalog.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,9 @@ mod tests {
738738
props: HashMap::new(),
739739
};
740740

741-
Ok(Some(S3TablesCatalog::new(config, None, Runtime::default()).await?))
741+
Ok(Some(
742+
S3TablesCatalog::new(config, None, Runtime::default()).await?,
743+
))
742744
}
743745

744746
#[tokio::test]

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::delete_vector::DeleteVector;
3030
use crate::expr::Predicate::AlwaysTrue;
3131
use crate::expr::{Predicate, Reference};
3232
use crate::io::FileIO;
33+
use crate::runtime::Runtime;
3334
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
3435
use crate::spec::{
3536
DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor,
@@ -38,8 +39,6 @@ use crate::spec::{
3839
};
3940
use crate::{Error, ErrorKind, Result};
4041

41-
use crate::runtime::Runtime;
42-
4342
#[derive(Clone, Debug)]
4443
pub(crate) struct CachingDeleteFileLoader {
4544
basic_delete_file_loader: BasicDeleteFileLoader,
@@ -79,7 +78,11 @@ enum ParsedDeleteFileContext {
7978

8079
#[allow(unused_variables)]
8180
impl CachingDeleteFileLoader {
82-
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize, runtime: Runtime) -> Self {
81+
pub(crate) fn new(
82+
file_io: FileIO,
83+
concurrency_limit_data_files: usize,
84+
runtime: Runtime,
85+
) -> Self {
8386
CachingDeleteFileLoader {
8487
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
8588
concurrency_limit_data_files,
@@ -730,7 +733,8 @@ mod tests {
730733
let table_location = tmp_dir.path();
731734
let file_io = FileIO::new_with_fs();
732735

733-
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
736+
let delete_file_loader =
737+
CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
734738

735739
let file_scan_tasks = setup(table_location);
736740

@@ -950,7 +954,8 @@ mod tests {
950954
};
951955

952956
// Load the deletes - should handle both types without error
953-
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
957+
let delete_file_loader =
958+
CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
954959
let delete_filter = delete_file_loader
955960
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
956961
.await
@@ -1021,7 +1026,8 @@ mod tests {
10211026
let table_location = tmp_dir.path();
10221027
let file_io = FileIO::new_with_fs();
10231028

1024-
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
1029+
let delete_file_loader =
1030+
CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
10251031

10261032
let file_scan_tasks = setup(table_location);
10271033

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ pub(crate) mod tests {
302302
let table_location = tmp_dir.path();
303303
let file_io = FileIO::new_with_fs();
304304

305-
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
305+
let delete_file_loader =
306+
CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default());
306307

307308
let file_scan_tasks = setup(table_location);
308309

crates/iceberg/src/arrow/reader.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,12 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
5656
use crate::expr::{BoundPredicate, BoundReference};
5757
use crate::io::{FileIO, FileMetadata, FileRead};
5858
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
59+
use crate::runtime::Runtime;
5960
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
6061
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
6162
use crate::utils::available_parallelism;
6263
use crate::{Error, ErrorKind};
6364

64-
use crate::runtime::Runtime;
65-
6665
/// Default gap between byte ranges below which they are coalesced into a
6766
/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`.
6867
const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use itertools::Itertools;
2727

2828
use super::namespace_state::NamespaceState;
2929
use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30+
use crate::runtime::Runtime;
3031
use crate::spec::{TableMetadata, TableMetadataBuilder};
3132
use crate::table::Table;
32-
use crate::runtime::Runtime;
3333
use crate::{
3434
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
3535
TableCommit, TableCreation, TableIdent,

crates/iceberg/src/scan/mod.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,12 @@ impl TableScan {
402402
|(manifest_entry_context, tx)| {
403403
let rt_inner = rt_inner.clone();
404404
async move {
405-
rt_inner.spawn(async move {
406-
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
407-
})
408-
.await
405+
rt_inner
406+
.spawn(async move {
407+
Self::process_delete_manifest_entry(manifest_entry_context, tx)
408+
.await
409+
})
410+
.await
409411
}
410412
},
411413
)
@@ -431,10 +433,12 @@ impl TableScan {
431433
|(manifest_entry_context, tx)| {
432434
let rt_inner = rt_inner.clone();
433435
async move {
434-
rt_inner.spawn(async move {
435-
Self::process_data_manifest_entry(manifest_entry_context, tx).await
436-
})
437-
.await
436+
rt_inner
437+
.spawn(async move {
438+
Self::process_data_manifest_entry(manifest_entry_context, tx)
439+
.await
440+
})
441+
.await
438442
}
439443
},
440444
)
@@ -450,10 +454,11 @@ impl TableScan {
450454

451455
/// Returns an [`ArrowRecordBatchStream`].
452456
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
453-
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
454-
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
455-
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
456-
.with_row_selection_enabled(self.row_selection_enabled);
457+
let mut arrow_reader_builder =
458+
ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
459+
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
460+
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
461+
.with_row_selection_enabled(self.row_selection_enabled);
457462

458463
if let Some(batch_size) = self.batch_size {
459464
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
@@ -1345,14 +1350,22 @@ pub mod tests {
13451350
.unwrap();
13461351
assert_eq!(plan_task.len(), 2);
13471352

1348-
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone(), fixture.table.runtime().clone()).build();
1353+
let reader = ArrowReaderBuilder::new(
1354+
fixture.table.file_io().clone(),
1355+
fixture.table.runtime().clone(),
1356+
)
1357+
.build();
13491358
let batch_stream = reader
13501359
.clone()
13511360
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
13521361
.unwrap();
13531362
let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
13541363

1355-
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone(), fixture.table.runtime().clone()).build();
1364+
let reader = ArrowReaderBuilder::new(
1365+
fixture.table.file_io().clone(),
1366+
fixture.table.runtime().clone(),
1367+
)
1368+
.build();
13561369
let batch_stream = reader
13571370
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
13581371
.unwrap();

crates/iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ use crate::arrow::ArrowReaderBuilder;
2323
use crate::inspect::MetadataTable;
2424
use crate::io::FileIO;
2525
use crate::io::object_cache::ObjectCache;
26+
use crate::runtime::Runtime;
2627
use crate::scan::TableScanBuilder;
2728
use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef};
28-
use crate::runtime::Runtime;
2929
use crate::{Error, ErrorKind, Result, TableIdent};
3030

3131
/// Builder to create table scan.

0 commit comments

Comments
 (0)