Skip to content

Commit 56e9cf3

Browse files
authored
Fix missing tests (#14)
* Fix missing tests * Upd * Upd * Upd * Fix clonning * Fix clonning * Add table rwlock * Add table rwlock * Add table rwlock * Revert merge inot tests * Fix test * Fix test
1 parent d23b138 commit 56e9cf3

798 files changed

Lines changed: 17940 additions & 33 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ debug/
33
target/
44

55
.env
6+
metastore.yaml
67

78
styles/
89
!styles/config

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog-metastore/src/metastore.rs

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use async_trait::async_trait;
1212
use bytes::Bytes;
1313
use chrono::Utc;
1414
use dashmap::DashMap;
15-
use iceberg_rust::catalog::commit::apply_table_updates;
15+
use iceberg_rust::catalog::commit::{TableUpdate as IcebergTableUpdate, apply_table_updates};
1616
use iceberg_rust_spec::{
1717
schema::Schema as IcebergSchema,
1818
table_metadata::{FormatVersion, TableMetadataBuilder},
@@ -125,6 +125,10 @@ impl InMemoryMetastore {
125125
)
126126
}
127127

128+
fn database_key(ident: &TableIdent) -> DatabaseIdent {
129+
ident.database.to_ascii_lowercase()
130+
}
131+
128132
fn ensure_volume(state: &MetastoreState, name: &VolumeIdent) -> Result<RwObject<Volume>> {
129133
state.volumes.get(name).cloned().ok_or_else(|| {
130134
metastore_error::VolumeNotFoundSnafu {
@@ -359,12 +363,18 @@ impl Metastore for InMemoryMetastore {
359363

360364
async fn list_schemas(&self, database: &DatabaseIdent) -> Result<Vec<RwObject<Schema>>> {
361365
let state = self.state.read().await;
362-
Ok(state
366+
367+
let mut items: Vec<RwObject<Schema>> = state
363368
.schemas
364369
.iter()
365370
.filter(|((db, _), _)| db == database)
366371
.map(|(_, schema)| schema.clone())
367-
.collect())
372+
.collect();
373+
374+
// Sort by schema name
375+
items.sort_by(|a, b| b.ident.schema.cmp(&a.ident.schema));
376+
377+
Ok(items)
368378
}
369379

370380
async fn create_schema(&self, ident: &SchemaIdent, schema: Schema) -> Result<RwObject<Schema>> {
@@ -467,6 +477,16 @@ impl Metastore for InMemoryMetastore {
467477
.fail();
468478
}
469479

480+
if table.volume_ident.is_none() {
481+
let database = state.databases.get(&ident.database).ok_or_else(|| {
482+
metastore_error::DatabaseNotFoundSnafu {
483+
db: ident.database.clone(),
484+
}
485+
.build()
486+
})?;
487+
table.volume_ident = Some(database.volume.clone());
488+
}
489+
470490
let schema_id = *table.schema.schema_id();
471491
let mut schemas = HashMap::new();
472492
schemas.insert(schema_id, table.schema.clone());
@@ -546,7 +566,7 @@ impl Metastore for InMemoryMetastore {
546566
async fn update_table(
547567
&self,
548568
ident: &TableIdent,
549-
update: TableUpdate,
569+
mut update: TableUpdate,
550570
) -> Result<RwObject<Table>> {
551571
let object_store = self.table_object_store(ident).await?.ok_or_else(|| {
552572
metastore_error::TableNotFoundSnafu {
@@ -569,10 +589,13 @@ impl Metastore for InMemoryMetastore {
569589
}
570590
.build()
571591
})?;
592+
update
593+
.requirements
594+
.into_iter()
595+
.map(TableRequirementExt::new)
596+
.try_for_each(|req| req.assert(&table_entry.metadata))?;
572597

573-
for requirement in &update.requirements {
574-
TableRequirementExt::new(requirement.clone()).assert(&table_entry.metadata)?;
575-
}
598+
convert_add_schema_update_to_lowercase(&mut update.updates)?;
576599

577600
let mut metadata = table_entry.metadata.clone();
578601
apply_table_updates(&mut metadata, update.updates.clone())
@@ -592,14 +615,8 @@ impl Metastore for InMemoryMetastore {
592615
}
593616

594617
async fn table_object_store(&self, ident: &TableIdent) -> Result<Option<Arc<dyn ObjectStore>>> {
595-
let state = self.state.read().await;
596-
let volume_ident = state
597-
.tables
598-
.get(&Self::table_key(ident))
599-
.and_then(|table| table.volume_ident.clone());
600-
drop(state);
601-
if let Some(volume_ident) = volume_ident {
602-
self.volume_object_store(&volume_ident).await
618+
if let Some(volume) = self.volume_for_table(ident).await? {
619+
self.volume_object_store(&volume.ident).await
603620
} else {
604621
Ok(None)
605622
}
@@ -635,6 +652,12 @@ impl Metastore for InMemoryMetastore {
635652
.and_then(|table| table.volume_ident.as_ref())
636653
{
637654
Ok(state.volumes.get(volume_ident).cloned())
655+
} else if let Some(volume_ident) = state
656+
.databases
657+
.get(&Self::database_key(ident))
658+
.map(|database| &database.volume)
659+
{
660+
Ok(state.volumes.get(volume_ident).cloned())
638661
} else {
639662
Ok(None)
640663
}
@@ -686,3 +709,45 @@ fn max_field_id(schema: &IcebergSchema) -> i32 {
686709

687710
schema.fields().iter().map(recurse).max().unwrap_or(0)
688711
}
712+
713+
fn convert_add_schema_update_to_lowercase(updates: &mut Vec<IcebergTableUpdate>) -> Result<()> {
714+
for update in updates {
715+
if let IcebergTableUpdate::AddSchema {
716+
schema,
717+
last_column_id,
718+
} = update
719+
{
720+
let schema = convert_schema_fields_to_lowercase(schema)?;
721+
*update = IcebergTableUpdate::AddSchema {
722+
schema,
723+
last_column_id: *last_column_id,
724+
}
725+
}
726+
}
727+
Ok(())
728+
}
729+
730+
fn convert_schema_fields_to_lowercase(schema: &IcebergSchema) -> Result<IcebergSchema> {
731+
let converted_fields: Vec<StructField> = schema
732+
.fields()
733+
.iter()
734+
.map(|field| {
735+
StructField::new(
736+
field.id,
737+
&field.name.to_lowercase(),
738+
field.required,
739+
field.field_type.clone(),
740+
field.doc.clone(),
741+
)
742+
})
743+
.collect();
744+
745+
let mut builder = IcebergSchema::builder();
746+
builder.with_schema_id(*schema.schema_id());
747+
748+
for field in converted_fields {
749+
builder.with_struct_field(field);
750+
}
751+
752+
builder.build().context(metastore_error::IcebergSpecSnafu)
753+
}

crates/catalog-metastore/src/models/volumes.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,10 @@ impl Volume {
281281
.map(|s3| Arc::new(s3) as Arc<dyn ObjectStore>)
282282
.context(metastore_error::ObjectStoreSnafu)
283283
}
284-
VolumeType::File(_) => Ok(Arc::new(
285-
object_store::local::LocalFileSystem::new().with_automatic_cleanup(true),
284+
VolumeType::File(file) => Ok(Arc::new(
285+
LocalFileSystem::new_with_prefix(file.path.clone())
286+
.context(metastore_error::ObjectStoreSnafu)?
287+
.with_automatic_cleanup(true),
286288
) as Arc<dyn ObjectStore>),
287289
VolumeType::Memory => {
288290
Ok(Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>)

crates/catalog/src/catalogs/embucket/iceberg_catalog.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, sync::Arc};
22

3+
use crate::error;
34
use async_trait::async_trait;
45
use catalog_metastore::error::{self as metastore_error, Result as MetastoreResult};
56
use catalog_metastore::{
@@ -28,6 +29,7 @@ use iceberg_rust_spec::{
2829
identifier::FullIdentifier as IcebergFullIdentifier, namespace::Namespace as IcebergNamespace,
2930
};
3031
use object_store::ObjectStore;
32+
use snafu::ResultExt;
3133

3234
#[derive(Debug)]
3335
pub struct EmbucketIcebergCatalog {
@@ -442,12 +444,11 @@ impl IcebergCatalog for EmbucketIcebergCatalog {
442444
properties: None,
443445
};
444446

445-
// TODO: restore .context
446447
let table = self
447448
.metastore
448449
.create_table(&ident, table_create_request)
449450
.await
450-
// .context(crate::execution::error::MetastoreSnafu)
451+
.context(error::MetastoreSnafu)
451452
.map_err(|e| IcebergError::External(Box::new(e)))?;
452453
Ok(IcebergTable::new(
453454
identifier.clone(),

crates/catalog/src/lib.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ pub mod information_schema;
1212
pub mod schema;
1313
pub mod table;
1414

15+
#[cfg(test)]
16+
pub mod tests;
17+
1518
// TBD: Should we move this into a separate crate? As this is duplicate implementation
1619
// of what we have in the functions crate.
1720
pub fn block_in_new_runtime<F, R>(future: F) -> Result<R>
@@ -29,3 +32,70 @@ where
2932
.join()
3033
.unwrap_or_else(|_| error::ThreadPanickedWhileExecutingFutureSnafu.fail()?)
3134
}
35+
36+
pub mod test_utils {
37+
use datafusion::arrow::array::{ArrayRef, RecordBatch};
38+
use datafusion::arrow::compute::{
39+
SortColumn, SortOptions, lexsort_to_indices, take_record_batch,
40+
};
41+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
42+
use std::collections::HashSet;
43+
use std::sync::Arc;
44+
45+
#[allow(clippy::unwrap_used, clippy::must_use_candidate)]
46+
pub fn sort_record_batch_by_sortable_columns(batch: &RecordBatch) -> RecordBatch {
47+
let sort_columns: Vec<SortColumn> = (0..batch.num_columns())
48+
.filter_map(|i| {
49+
let col = batch.column(i).clone();
50+
let field = batch.schema().field(i).clone();
51+
if matches!(field.data_type(), DataType::Null) {
52+
None
53+
} else {
54+
Some(SortColumn {
55+
values: col,
56+
options: Some(SortOptions::default()),
57+
})
58+
}
59+
})
60+
.collect();
61+
62+
if sort_columns.is_empty() {
63+
return batch.clone();
64+
}
65+
66+
let indices = lexsort_to_indices(&sort_columns, Some(batch.num_rows())).unwrap();
67+
take_record_batch(batch, &indices).unwrap()
68+
}
69+
70+
#[allow(clippy::unwrap_used, clippy::must_use_candidate)]
71+
pub fn remove_columns_from_batches<S: std::hash::BuildHasher>(
72+
batches: Vec<RecordBatch>,
73+
excluded_columns: &HashSet<&str, S>,
74+
) -> Vec<RecordBatch> {
75+
batches
76+
.into_iter()
77+
.map(|batch| {
78+
let schema = batch.schema();
79+
let indices: Vec<usize> = schema
80+
.fields()
81+
.iter()
82+
.enumerate()
83+
.filter_map(|(i, f)| {
84+
if excluded_columns.contains(f.name().as_str()) {
85+
None
86+
} else {
87+
Some(i)
88+
}
89+
})
90+
.collect();
91+
92+
let columns: Vec<ArrayRef> =
93+
indices.iter().map(|&i| batch.column(i).clone()).collect();
94+
let fields: Vec<Field> = indices.iter().map(|&i| schema.field(i).clone()).collect();
95+
let new_schema = Arc::new(Schema::new(fields));
96+
97+
RecordBatch::try_new(new_schema, columns).unwrap()
98+
})
99+
.collect()
100+
}
101+
}

0 commit comments

Comments
 (0)