Skip to content

Commit 28ce068

Browse files
authored
Register tables from config (#22)
* Remove iceberg call from create table * Remove invalidate_cache * Init * Read tables from config * Read tables from config * Fix avro
1 parent b5f9b56 commit 28ce068

5 files changed

Lines changed: 188 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 13 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,16 @@ datafusion-expr = { version = "50.0.0" }
4444
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "439cbd2282504c3ffaf262f1ffdb530a0fb1a151" }
4545
datafusion-macros = { version = "50.0.0" }
4646
datafusion-physical-plan = { version = "50.0.0" }
47-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "1eeb4515446119dd3b4dbb7ebd2f70ae5b4f827d" }
47+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b5089dc285667858469a6c2abb297af3d81a03f5" }
4848
futures = { version = "0.3" }
4949
http = "1.2"
5050
http-body-util = "0.1.0"
5151
iceberg = { git = "https://github.com/apache/iceberg-rust.git", rev="7a5ad1fcaf00d4638857812bab788105f6c60573"}
52-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "1eeb4515446119dd3b4dbb7ebd2f70ae5b4f827d" }
53-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "1eeb4515446119dd3b4dbb7ebd2f70ae5b4f827d" }
54-
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "1eeb4515446119dd3b4dbb7ebd2f70ae5b4f827d" }
55-
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "1eeb4515446119dd3b4dbb7ebd2f70ae5b4f827d" }
52+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b5089dc285667858469a6c2abb297af3d81a03f5" }
53+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b5089dc285667858469a6c2abb297af3d81a03f5" }
54+
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b5089dc285667858469a6c2abb297af3d81a03f5" }
55+
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b5089dc285667858469a6c2abb297af3d81a03f5" }
56+
5657
indexmap = "2.7.1"
5758
jsonwebtoken = "9.3.1"
5859
lazy_static = { version = "1.5" }

crates/catalog-metastore/src/metastore.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::{collections::HashMap, sync::Arc};
2-
31
use crate::error::{self as metastore_error, Result};
42
use crate::models::{
53
RwObject,
@@ -20,6 +18,7 @@ use iceberg_rust_spec::{
2018
};
2119
use object_store::{ObjectStore, PutPayload, path::Path};
2220
use snafu::ResultExt;
21+
use std::{collections::HashMap, sync::Arc};
2322
use tokio::sync::RwLock;
2423
use tracing::instrument;
2524
use uuid::Uuid;
@@ -60,6 +59,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
6059
ident: &TableIdent,
6160
table: TableCreateRequest,
6261
) -> Result<RwObject<Table>>;
62+
async fn register_table(&self, ident: &TableIdent, table: Table) -> Result<RwObject<Table>>;
6363
async fn get_table(&self, ident: &TableIdent) -> Result<Option<RwObject<Table>>>;
6464
async fn update_table(
6565
&self,
@@ -110,7 +110,8 @@ impl InMemoryMetastore {
110110
)
111111
}
112112

113-
fn table_key(ident: &TableIdent) -> (DatabaseIdent, String, String) {
113+
#[must_use]
114+
pub fn table_key(ident: &TableIdent) -> (DatabaseIdent, String, String) {
114115
(
115116
ident.database.to_ascii_lowercase(),
116117
ident.schema.to_ascii_lowercase(),
@@ -558,6 +559,13 @@ impl Metastore for InMemoryMetastore {
558559
Ok(row)
559560
}
560561

562+
async fn register_table(&self, ident: &TableIdent, table: Table) -> Result<RwObject<Table>> {
563+
let mut state = self.state.write().await;
564+
let row = RwObject::new(table);
565+
state.tables.insert(Self::table_key(ident), row.clone());
566+
Ok(row)
567+
}
568+
561569
async fn get_table(&self, ident: &TableIdent) -> Result<Option<RwObject<Table>>> {
562570
let state = self.state.read().await;
563571
Ok(state.tables.get(&Self::table_key(ident)).cloned())

crates/embucketd/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ tracing-subscriber = { version = "0.3.20", features = ["env-filter", "registry",
1919
tracing-opentelemetry = { version = "0.31.0" }
2020
tracing-allocations = { version = "0.1.0", optional = true }
2121
tracing-core = { version = "0.1.34"}
22+
object_store = { workspace = true }
23+
iceberg-rust = { workspace = true }
2224
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] }
2325
opentelemetry_sdk = { version = "0.30.0", features = [
2426
"experimental_trace_batch_span_processor_with_async_runtime",
@@ -36,6 +38,7 @@ utoipa-swagger-ui = { workspace = true }
3638
strum.workspace = true
3739
serde_yaml = { workspace = true }
3840
serde = { workspace = true }
41+
serde_json = { workspace = true }
3942
snafu = { workspace = true }
4043

4144
[lints]

crates/embucketd/src/metastore_config.rs

Lines changed: 155 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
use catalog_metastore::{
2+
Database, Metastore, Schema, SchemaIdent, TableFormat, TableIdent, Volume, VolumeIdent,
3+
};
4+
use iceberg_rust::spec::table_metadata::TableMetadata;
5+
use iceberg_rust::spec::util::strip_prefix;
6+
use serde::Deserialize;
7+
use serde_json::Value;
8+
use snafu::prelude::*;
9+
use std::collections::HashMap;
110
use std::{
211
path::{Path, PathBuf},
312
sync::Arc,
413
};
5-
6-
use catalog_metastore::{Database, Metastore, Schema, SchemaIdent, Volume, VolumeIdent};
7-
use serde::Deserialize;
8-
use snafu::prelude::*;
914
use tokio::fs;
1015

1116
#[derive(Debug, Deserialize, Default)]
@@ -16,6 +21,8 @@ pub struct MetastoreBootstrapConfig {
1621
databases: Vec<DatabaseEntry>,
1722
#[serde(default)]
1823
schemas: Vec<SchemaEntry>,
24+
#[serde(default)]
25+
tables: Vec<TableEntry>,
1926
}
2027

2128
#[derive(Debug, Deserialize, Clone)]
@@ -38,6 +45,20 @@ struct SchemaEntry {
3845
schema: String,
3946
}
4047

48+
#[derive(Debug, Deserialize, Clone)]
49+
struct TableEntry {
50+
database: String,
51+
schema: String,
52+
table: String,
53+
metadata_location: String,
54+
}
55+
56+
impl TableEntry {
57+
fn table_ident(&self) -> TableIdent {
58+
TableIdent::new(&self.database, &self.schema, &self.table)
59+
}
60+
}
61+
4162
#[derive(Debug, Snafu)]
4263
pub enum ConfigError {
4364
#[snafu(display("Failed to read metastore config {path:?}: {source}"))]
@@ -54,6 +75,26 @@ pub enum ConfigError {
5475
Metastore {
5576
source: catalog_metastore::error::Error,
5677
},
78+
#[snafu(display("Database {database} not found for table {table}"))]
79+
TableDatabaseMissing { table: String, database: String },
80+
#[snafu(display("Volume {volume} not found for table {table}"))]
81+
TableVolumeMissing { table: String, volume: VolumeIdent },
82+
#[snafu(display("Invalid metadata location for table {table}: {reason}"))]
83+
InvalidMetadataLocation { table: String, reason: String },
84+
#[snafu(display("Invalid metadata"))]
85+
InvalidMetadata,
86+
#[snafu(display("Failed to fetch metadata for table {table}: {source}"))]
87+
MetadataFetch {
88+
table: String,
89+
#[snafu(source)]
90+
source: object_store::Error,
91+
},
92+
#[snafu(display("Failed to parse metadata for table {table}: {source}"))]
93+
MetadataParse {
94+
table: String,
95+
#[snafu(source)]
96+
source: serde_json::Error,
97+
},
5798
}
5899

59100
const DEFAULT_SCHEMA_NAME: &str = "public";
@@ -84,6 +125,10 @@ impl MetastoreBootstrapConfig {
84125
.await?;
85126
}
86127

128+
for table in &self.tables {
129+
self.apply_table(table, metastore.clone()).await?;
130+
}
131+
87132
Ok(())
88133
}
89134

@@ -182,4 +227,110 @@ impl MetastoreBootstrapConfig {
182227
}
183228
Ok(())
184229
}
230+
231+
async fn apply_table(
232+
&self,
233+
entry: &TableEntry,
234+
metastore: Arc<dyn Metastore>,
235+
) -> Result<(), ConfigError> {
236+
let table_ident = entry.table_ident();
237+
let table_name = entry.table.clone();
238+
if metastore
239+
.table_exists(&table_ident)
240+
.await
241+
.context(MetastoreSnafu)?
242+
{
243+
tracing::debug!(table = %table_name, "Table already exists, skipping config create");
244+
return Ok(());
245+
}
246+
247+
let database = metastore
248+
.get_database(&entry.database)
249+
.await
250+
.context(MetastoreSnafu)?
251+
.ok_or_else(|| ConfigError::TableDatabaseMissing {
252+
table: table_name.clone(),
253+
database: entry.database.clone(),
254+
})?;
255+
256+
self.ensure_schema(metastore.clone(), &entry.database, &entry.schema)
257+
.await?;
258+
259+
let volume_ident = database.volume.clone();
260+
let volume = metastore
261+
.get_volume(&volume_ident)
262+
.await
263+
.context(MetastoreSnafu)?
264+
.ok_or_else(|| ConfigError::TableVolumeMissing {
265+
table: table_name.clone(),
266+
volume: volume_ident.clone(),
267+
})?;
268+
let table_object_store = volume.get_object_store().context(MetastoreSnafu)?;
269+
270+
let bytes = table_object_store
271+
.get(
272+
&strip_prefix(&entry.metadata_location.clone())
273+
.as_str()
274+
.into(),
275+
)
276+
.await
277+
.map_err(|e| ConfigError::InvalidMetadataLocation {
278+
table: table_name.clone(),
279+
reason: e.to_string(),
280+
})?
281+
.bytes()
282+
.await
283+
.context(MetadataFetchSnafu {
284+
table: table_name.clone(),
285+
})?;
286+
287+
let json_val: Value = serde_json::from_slice(&bytes).context(MetadataParseSnafu {
288+
table: table_name.clone(),
289+
})?;
290+
291+
// Patch missing iceberg spec fields
292+
let json_val = patch_missing_operation(json_val)?;
293+
294+
// Convert back to bytes
295+
let patched_bytes = serde_json::to_vec(&json_val).context(MetadataParseSnafu {
296+
table: table_name.clone(),
297+
})?;
298+
// Deserialize normally
299+
let metadata: TableMetadata =
300+
serde_json::from_slice(&patched_bytes).context(MetadataParseSnafu {
301+
table: table_name.clone(),
302+
})?;
303+
304+
let stored_table = catalog_metastore::Table {
305+
ident: table_ident.clone(),
306+
metadata,
307+
metadata_location: entry.metadata_location.clone(),
308+
properties: HashMap::default(),
309+
volume_ident: Some(volume.ident.clone()),
310+
volume_location: None,
311+
is_temporary: false,
312+
format: TableFormat::Iceberg,
313+
};
314+
metastore
315+
.register_table(&table_ident, stored_table)
316+
.await
317+
.context(MetastoreSnafu)?;
318+
Ok(())
319+
}
320+
}
321+
322+
fn patch_missing_operation(mut value: Value) -> Result<Value, ConfigError> {
323+
if let Some(snapshots) = value.get_mut("snapshots").and_then(|v| v.as_array_mut()) {
324+
for snapshot in snapshots {
325+
if let Some(summary) = snapshot.get_mut("summary")
326+
&& summary.get("operation").is_none()
327+
{
328+
summary
329+
.as_object_mut()
330+
.context(InvalidMetadataSnafu)?
331+
.insert("operation".to_string(), Value::String("append".into()));
332+
}
333+
}
334+
}
335+
Ok(value)
185336
}

0 commit comments

Comments
 (0)