Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ debug/
target/

.env
metastore.yaml

styles/
!styles/config
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 80 additions & 15 deletions crates/catalog-metastore/src/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use dashmap::DashMap;
use iceberg_rust::catalog::commit::apply_table_updates;
use iceberg_rust::catalog::commit::{TableUpdate as IcebergTableUpdate, apply_table_updates};
use iceberg_rust_spec::{
schema::Schema as IcebergSchema,
table_metadata::{FormatVersion, TableMetadataBuilder},
Expand Down Expand Up @@ -125,6 +125,10 @@ impl InMemoryMetastore {
)
}

fn database_key(ident: &TableIdent) -> DatabaseIdent {
ident.database.to_ascii_lowercase()
}

fn ensure_volume(state: &MetastoreState, name: &VolumeIdent) -> Result<RwObject<Volume>> {
state.volumes.get(name).cloned().ok_or_else(|| {
metastore_error::VolumeNotFoundSnafu {
Expand Down Expand Up @@ -359,12 +363,18 @@ impl Metastore for InMemoryMetastore {

async fn list_schemas(&self, database: &DatabaseIdent) -> Result<Vec<RwObject<Schema>>> {
let state = self.state.read().await;
Ok(state

let mut items: Vec<RwObject<Schema>> = state
.schemas
.iter()
.filter(|((db, _), _)| db == database)
.map(|(_, schema)| schema.clone())
.collect())
.collect();

// Sort by schema name
items.sort_by(|a, b| b.ident.schema.cmp(&a.ident.schema));

Ok(items)
}

async fn create_schema(&self, ident: &SchemaIdent, schema: Schema) -> Result<RwObject<Schema>> {
Expand Down Expand Up @@ -467,6 +477,16 @@ impl Metastore for InMemoryMetastore {
.fail();
}

if table.volume_ident.is_none() {
let database = state.databases.get(&ident.database).ok_or_else(|| {
metastore_error::DatabaseNotFoundSnafu {
db: ident.database.clone(),
}
.build()
})?;
table.volume_ident = Some(database.volume.clone());
}

let schema_id = *table.schema.schema_id();
let mut schemas = HashMap::new();
schemas.insert(schema_id, table.schema.clone());
Expand Down Expand Up @@ -546,7 +566,7 @@ impl Metastore for InMemoryMetastore {
async fn update_table(
&self,
ident: &TableIdent,
update: TableUpdate,
mut update: TableUpdate,
) -> Result<RwObject<Table>> {
let object_store = self.table_object_store(ident).await?.ok_or_else(|| {
metastore_error::TableNotFoundSnafu {
Expand All @@ -569,10 +589,13 @@ impl Metastore for InMemoryMetastore {
}
.build()
})?;
update
.requirements
.into_iter()
.map(TableRequirementExt::new)
.try_for_each(|req| req.assert(&table_entry.metadata))?;

for requirement in &update.requirements {
TableRequirementExt::new(requirement.clone()).assert(&table_entry.metadata)?;
}
convert_add_schema_update_to_lowercase(&mut update.updates)?;

let mut metadata = table_entry.metadata.clone();
apply_table_updates(&mut metadata, update.updates.clone())
Expand All @@ -592,14 +615,8 @@ impl Metastore for InMemoryMetastore {
}

async fn table_object_store(&self, ident: &TableIdent) -> Result<Option<Arc<dyn ObjectStore>>> {
let state = self.state.read().await;
let volume_ident = state
.tables
.get(&Self::table_key(ident))
.and_then(|table| table.volume_ident.clone());
drop(state);
if let Some(volume_ident) = volume_ident {
self.volume_object_store(&volume_ident).await
if let Some(volume) = self.volume_for_table(ident).await? {
self.volume_object_store(&volume.ident).await
} else {
Ok(None)
}
Expand Down Expand Up @@ -635,6 +652,12 @@ impl Metastore for InMemoryMetastore {
.and_then(|table| table.volume_ident.as_ref())
{
Ok(state.volumes.get(volume_ident).cloned())
} else if let Some(volume_ident) = state
.databases
.get(&Self::database_key(ident))
.map(|database| &database.volume)
{
Ok(state.volumes.get(volume_ident).cloned())
} else {
Ok(None)
}
Expand Down Expand Up @@ -686,3 +709,45 @@ fn max_field_id(schema: &IcebergSchema) -> i32 {

schema.fields().iter().map(recurse).max().unwrap_or(0)
}

fn convert_add_schema_update_to_lowercase(updates: &mut Vec<IcebergTableUpdate>) -> Result<()> {
for update in updates {
if let IcebergTableUpdate::AddSchema {
schema,
last_column_id,
} = update
{
let schema = convert_schema_fields_to_lowercase(schema)?;
*update = IcebergTableUpdate::AddSchema {
schema,
last_column_id: *last_column_id,
}
}
}
Ok(())
}

fn convert_schema_fields_to_lowercase(schema: &IcebergSchema) -> Result<IcebergSchema> {
let converted_fields: Vec<StructField> = schema
.fields()
.iter()
.map(|field| {
StructField::new(
field.id,
&field.name.to_lowercase(),
field.required,
field.field_type.clone(),
field.doc.clone(),
)
})
.collect();

let mut builder = IcebergSchema::builder();
builder.with_schema_id(*schema.schema_id());

for field in converted_fields {
builder.with_struct_field(field);
}

builder.build().context(metastore_error::IcebergSpecSnafu)
}
6 changes: 4 additions & 2 deletions crates/catalog-metastore/src/models/volumes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,10 @@ impl Volume {
.map(|s3| Arc::new(s3) as Arc<dyn ObjectStore>)
.context(metastore_error::ObjectStoreSnafu)
}
VolumeType::File(_) => Ok(Arc::new(
object_store::local::LocalFileSystem::new().with_automatic_cleanup(true),
VolumeType::File(file) => Ok(Arc::new(
LocalFileSystem::new_with_prefix(file.path.clone())
.context(metastore_error::ObjectStoreSnafu)?
.with_automatic_cleanup(true),
) as Arc<dyn ObjectStore>),
VolumeType::Memory => {
Ok(Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>)
Expand Down
5 changes: 3 additions & 2 deletions crates/catalog/src/catalogs/embucket/iceberg_catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use crate::error;
use async_trait::async_trait;
use catalog_metastore::error::{self as metastore_error, Result as MetastoreResult};
use catalog_metastore::{
Expand Down Expand Up @@ -28,6 +29,7 @@ use iceberg_rust_spec::{
identifier::FullIdentifier as IcebergFullIdentifier, namespace::Namespace as IcebergNamespace,
};
use object_store::ObjectStore;
use snafu::ResultExt;

#[derive(Debug)]
pub struct EmbucketIcebergCatalog {
Expand Down Expand Up @@ -442,12 +444,11 @@ impl IcebergCatalog for EmbucketIcebergCatalog {
properties: None,
};

// TODO: restore .context
let table = self
.metastore
.create_table(&ident, table_create_request)
.await
// .context(crate::execution::error::MetastoreSnafu)
.context(error::MetastoreSnafu)
.map_err(|e| IcebergError::External(Box::new(e)))?;
Ok(IcebergTable::new(
identifier.clone(),
Expand Down
70 changes: 70 additions & 0 deletions crates/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub mod information_schema;
pub mod schema;
pub mod table;

#[cfg(test)]
pub mod tests;

// TBD: Should we move this into a separate crate? As this is duplicate implementation
// of what we have in the functions crate.
pub fn block_in_new_runtime<F, R>(future: F) -> Result<R>
Expand All @@ -29,3 +32,70 @@ where
.join()
.unwrap_or_else(|_| error::ThreadPanickedWhileExecutingFutureSnafu.fail()?)
}

pub mod test_utils {
use datafusion::arrow::array::{ArrayRef, RecordBatch};
use datafusion::arrow::compute::{
SortColumn, SortOptions, lexsort_to_indices, take_record_batch,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::collections::HashSet;
use std::sync::Arc;

#[allow(clippy::unwrap_used, clippy::must_use_candidate)]
pub fn sort_record_batch_by_sortable_columns(batch: &RecordBatch) -> RecordBatch {
let sort_columns: Vec<SortColumn> = (0..batch.num_columns())
.filter_map(|i| {
let col = batch.column(i).clone();
let field = batch.schema().field(i).clone();
if matches!(field.data_type(), DataType::Null) {
None
} else {
Some(SortColumn {
values: col,
options: Some(SortOptions::default()),
})
}
})
.collect();

if sort_columns.is_empty() {
return batch.clone();
}

let indices = lexsort_to_indices(&sort_columns, Some(batch.num_rows())).unwrap();
take_record_batch(batch, &indices).unwrap()
}

#[allow(clippy::unwrap_used, clippy::must_use_candidate)]
pub fn remove_columns_from_batches<S: std::hash::BuildHasher>(
batches: Vec<RecordBatch>,
excluded_columns: &HashSet<&str, S>,
) -> Vec<RecordBatch> {
batches
.into_iter()
.map(|batch| {
let schema = batch.schema();
let indices: Vec<usize> = schema
.fields()
.iter()
.enumerate()
.filter_map(|(i, f)| {
if excluded_columns.contains(f.name().as_str()) {
None
} else {
Some(i)
}
})
.collect();

let columns: Vec<ArrayRef> =
indices.iter().map(|&i| batch.column(i).clone()).collect();
let fields: Vec<Field> = indices.iter().map(|&i| schema.field(i).clone()).collect();
let new_schema = Arc::new(Schema::new(fields));

RecordBatch::try_new(new_schema, columns).unwrap()
})
.collect()
}
}
Loading