Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
with:
toolchain: ${{ matrix.toolchain }}
components: clippy
- uses: Swatinem/rust-cache@v2
- name: Install Dependencies
run: |
sudo apt install -y llvm-18 libclang-18-dev
Expand Down
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos:
- repo: https://github.com/doublify/pre-commit-rust
rev: v1.0
hooks:
- id: cargo-check
args: [ "--workspace" ]
- id: fmt
args: [ "--", "--check"]
- id: clippy
53 changes: 33 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
**Run Snowflake SQL dialect on your data lake in 30 seconds. Zero dependencies.**

[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![SQL Logic Test Coverage](https://raw.githubusercontent.com/Embucket/embucket/assets/assets/badge.svg)](test/README.md)
[![dbt Gitlab run results](https://raw.githubusercontent.com/Embucket/embucket/assets_dbt/assets_dbt/dbt_success_badge.svg)](test/dbt_integration_tests/dbt-gitlab/README.md)

## Quick start

Expand All @@ -17,26 +15,50 @@ docker run --name embucket --rm -p 3000:3000 embucket/embucket
Run the Snowflake CLI against the local endpoint:

```bash
pip install snowflake-cli
snow sql -c local -a local -u embucket -p embucket -q "select 1;"
```

**Done.** You just ran Snowflake SQL dialect against the local Embucket instance with zero configuration.

### Bootstrap external volumes via config
### Create external volumes via config

You can pre-create volumes, databases, and schemas by pointing `embucketd` at a YAML config file. This
is handy when you want to mount an S3 Tables bucket at startup without sending API calls after the
process is online.
**Important**: External volumes must be created via YAML configuration at startup. REST API-based volume creation is not supported.

Pre-create volumes, databases, and schemas by pointing `embucketd` at a YAML config file:

```bash
cargo run -p embucketd -- \
--no-bootstrap \
--metastore-config config/metastore.s3tables.demo.yaml
--metastore-config config/metastore.yaml
```

**Sample configuration** (`config/metastore.yaml`):

```yaml
volumes:
# S3 Tables volume - connects to AWS S3 Table Bucket
- ident: demo
type: s3-tables
database: demo
credentials:
credential_type: access_key
aws-access-key-id: YOUR_ACCESS_KEY
aws-secret-access-key: YOUR_SECRET_KEY
arn: arn:aws:s3tables:us-east-2:123456789012:bucket/my-table-bucket

# S3 volume - connects to standard S3 bucket
# - ident: s3_volume
# type: s3
# bucket: my-data-bucket
# endpoint: https://s3.amazonaws.com
# credentials:
# credential_type: access_key
# aws-access-key-id: YOUR_ACCESS_KEY
# aws-secret-access-key: YOUR_SECRET_KEY
```

The sample config under `config/metastore.s3tables.demo.yaml` provisions a `demo` database backed by an
S3 Tables bucket using the credentials provided in the file. Update the file with your own secrets
for real deployments.
Update the credentials and ARN/bucket details with your own values for real deployments.

## What just happened?

Expand All @@ -58,7 +80,6 @@ Perfect for teams who want Snowflake's simplicity with bring-your-own-cloud cont
Built on proven open source:
- [Apache DataFusion](https://datafusion.apache.org/) for SQL execution
- [Apache Iceberg](https://iceberg.apache.org/) for ACID table metadata
- A lightweight in-memory metastore purpose-built for Embucket

## Why Embucket?

Expand All @@ -70,16 +91,8 @@ Built on proven open source:
- **Horizontal scaling** - Add nodes for more throughput
- **Zero operations** - No external dependencies to manage

## Next steps

**Ready for more?** Check out the comprehensive documentation:

[Quick start](https://docs.embucket.com/essentials/quick-start/) - Detailed setup and first queries
[Architecture](https://docs.embucket.com/essentials/architecture/) - How the zero-disk lakehouse works
[Configuration](https://docs.embucket.com/essentials/configuration/) - Production deployment options
[dbt Integration](https://docs.embucket.com/guides/dbt-snowplow/) - Run existing dbt projects
## Build from source

**From source:**
```bash
git clone https://github.com/Embucket/embucket.git
cd embucket && cargo build
Expand Down
8 changes: 4 additions & 4 deletions crates/api-snowflake-rest/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::SqlState;
use crate::models::JsonResponse;
use crate::models::ResponseData;
use axum::{Json, http, response::IntoResponse};
use executor::QueryRecordId;
use executor::error::OperationOn;
use executor::error_code::ErrorCode;
use executor::snowflake_error::Entity;
use datafusion::arrow::error::ArrowError;
use error_stack::ErrorChainExt;
use error_stack::ErrorExt;
use error_stack_trace;
use executor::QueryRecordId;
use executor::error::OperationOn;
use executor::error_code::ErrorCode;
use executor::snowflake_error::Entity;
use snafu::Location;
use snafu::prelude::*;

Expand Down
4 changes: 2 additions & 2 deletions crates/api-snowflake-rest/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use axum::Json;
use base64;
use base64::engine::general_purpose::STANDARD as engine_base64;
use base64::prelude::*;
use executor::models::QueryResult;
use executor::utils::{DataSerializationFormat, convert_record_batches};
use datafusion::arrow::ipc::MetadataVersion;
use datafusion::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
use datafusion::arrow::record_batch::RecordBatch;
use executor::models::QueryResult;
use executor::utils::{DataSerializationFormat, convert_record_batches};
use snafu::ResultExt;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion crates/api-snowflake-rest/src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use super::layer::require_auth;
use super::server_models::Config;
use super::state;
use axum::middleware;
use catalog_metastore::Metastore;
use executor::service::CoreExecutionService;
use executor::utils::Config as UtilsConfig;
use catalog_metastore::Metastore;
use std::sync::Arc;
use tower::ServiceBuilder;
use tower_http::compression::CompressionLayer;
Expand Down
2 changes: 1 addition & 1 deletion crates/api-snowflake-rest/src/server/test_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::server_models::Config;
use crate::server::router::make_app;
use executor::utils::Config as UtilsConfig;
use catalog_metastore::{InMemoryMetastore, Metastore};
use executor::utils::Config as UtilsConfig;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing_subscriber::fmt::format::FmtSpan;
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/src/catalog_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::table::CachingTable;
use aws_config::{BehaviorVersion, Region, SdkConfig};
use aws_credential_types::Credentials;
use aws_credential_types::provider::SharedCredentialsProvider;
use catalog_metastore::{AwsCredentials, Database, Metastore, RwObject, S3TablesVolume, VolumeType};
use catalog_metastore::{
AwsCredentials, Database, Metastore, RwObject, S3TablesVolume, VolumeType,
};
use catalog_metastore::{SchemaIdent, TableIdent};
use dashmap::DashMap;
use datafusion::{
Expand Down
11 changes: 5 additions & 6 deletions crates/catalog/src/catalogs/embucket/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@ impl CatalogProvider for EmbucketCatalog {
let database = self.database.clone();

block_in_new_runtime(async move {
metastore
.list_schemas(&database)
.await
.map(|schemas| {
metastore.list_schemas(&database).await.map_or_else(
|_| vec![],
|schemas| {
schemas
.into_iter()
.map(|s| s.ident.schema.clone())
.collect()
})
.unwrap_or_else(|_| vec![])
},
)
})
.unwrap_or_else(|_| vec![])
}
Expand Down
6 changes: 4 additions & 2 deletions crates/catalog/src/catalogs/embucket/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ impl SchemaProvider for EmbucketSchema {
metastore
.list_tables(&SchemaIdent::new(database, schema))
.await
.map(|tables| tables.into_iter().map(|s| s.ident.table.clone()).collect())
.unwrap_or_else(|_| vec![])
.map_or_else(
|_| vec![],
|tables| tables.into_iter().map(|s| s.ident.table.clone()).collect(),
)
})
.unwrap_or_else(|_| vec![]);

Expand Down
4 changes: 2 additions & 2 deletions crates/embucketd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use axum::{
Json, Router,
routing::{get, post},
};
use catalog_metastore::InMemoryMetastore;
use clap::Parser;
use dotenv::dotenv;
use executor::service::CoreExecutionService;
use executor::utils::Config as ExecutionConfig;
use catalog_metastore::InMemoryMetastore;
use dotenv::dotenv;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::runtime::TokioCurrentThread;
Expand Down
2 changes: 1 addition & 1 deletion crates/executor/src/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ pub mod query_planner;
pub mod rewriters;
pub mod type_planner;

pub use functions as functions;
pub use functions;
7 changes: 6 additions & 1 deletion crates/executor/src/datafusion/rewriters/session_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ impl SessionContextExprRewriter {
let queries = self
.recent_queries
.read()
.map(|guard| guard.iter().map(|q| q.to_string()).collect::<Vec<_>>())
.map(|guard| {
guard
.iter()
.map(std::string::ToString::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let query_id = get_query_by_index(&queries, index).unwrap_or_default();
Ok(utf8_val(&query_id))
Expand Down
6 changes: 4 additions & 2 deletions crates/executor/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::snowflake_error::SnowflakeError;
use crate::query_types::{QueryRecordId, QueryStatus};
use datafusion_common::DataFusionError;
use catalog::error::Error as CatalogError;
use datafusion_common::DataFusionError;
use error_stack_trace;
use iceberg_rust::error::Error as IcebergError;
use iceberg_s3tables_catalog::error::Error as S3tablesError;
Expand Down Expand Up @@ -155,7 +155,9 @@ pub enum Error {
location: Location,
},

#[snafu(display("DataFusion error when building logical plan for join of merge target and source: {error}"))]
#[snafu(display(
"DataFusion error when building logical plan for join of merge target and source: {error}"
))]
DataFusionLogicalPlanMergeJoin {
#[snafu(source(from(DataFusionError, Box::new)))]
error: Box<DataFusionError>,
Expand Down
2 changes: 1 addition & 1 deletion crates/executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use catalog as catalog;
pub use catalog;
pub mod datafusion;
pub mod dedicated_executor;
pub mod error;
Expand Down
15 changes: 9 additions & 6 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use crate::datafusion::physical_plan::merge::{
use crate::datafusion::rewriters::session_context::SessionContextExprRewriter;
use crate::error::{OperationOn, OperationType};
use crate::models::{QueryContext, QueryResult};
use catalog::catalog::CachingCatalog;
use catalog::catalog_list::CachedEntity;
use catalog::table::CachingTable;
use catalog_metastore::{
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
SchemaIdent as MetastoreSchemaIdent, TableCreateRequest as MetastoreTableCreateRequest,
Expand Down Expand Up @@ -74,9 +77,6 @@ use datafusion_iceberg::catalog::mirror::Mirror;
use datafusion_iceberg::catalog::schema::IcebergSchema;
use datafusion_iceberg::table::DataFusionTableConfigBuilder;
use datafusion_physical_plan::collect;
use catalog::catalog::CachingCatalog;
use catalog::catalog_list::CachedEntity;
use catalog::table::CachingTable;
use functions::semi_structured::variant::visitors::visit_all;
use functions::session_params::SessionProperty;
use functions::visitors::{
Expand Down Expand Up @@ -3543,13 +3543,16 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
}
}

fn is_missing_catalog_entity(err: &IcebergError) -> bool {
matches!(err, IcebergError::NotFound(_) | IcebergError::CatalogNotFound)
const fn is_missing_catalog_entity(err: &IcebergError) -> bool {
matches!(
err,
IcebergError::NotFound(_) | IcebergError::CatalogNotFound
)
}

#[cfg(test)]
mod tests {
use super::{is_missing_catalog_entity, IcebergError};
use super::{IcebergError, is_missing_catalog_entity};

#[test]
fn detects_catalog_not_found_as_missing_tabular() {
Expand Down
8 changes: 5 additions & 3 deletions crates/executor/src/query_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ impl FromStr for QueryRecordId {

impl From<Uuid> for QueryRecordId {
fn from(value: Uuid) -> Self {
Self(i64::from_be_bytes(
value.as_bytes()[8..16].try_into().unwrap(),
))
let bytes = value.as_bytes();
// Safety: UUID is 16 bytes, we take the last 8 bytes [8..16]
Self(i64::from_be_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
]))
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/executor/src/result_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct ResultSet {

impl ResultSet {
#[must_use]
#[allow(clippy::as_conversions)] // Safe: usize to i128 conversion is always safe as i128 can hold any usize value
pub const fn calc_hard_rows_limit(&self) -> Option<usize> {
if self.batch_size_bytes > QUERY_HISTORY_HARD_LIMIT_BYTES {
let batch_size_bytes: i128 = self.batch_size_bytes as i128;
Expand Down
2 changes: 1 addition & 1 deletion crates/executor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::running_queries::RunningQueryId;
use crate::session::{SESSION_INACTIVITY_EXPIRATION_SECONDS, to_unix};
use crate::tracing::SpanTracer;
use crate::utils::{Config, MemPoolType};
use catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
use catalog_metastore::{
Database, InMemoryMetastore, Metastore, Schema, SchemaIdent, TableIdent as MetastoreTableIdent,
Volume, VolumeType,
};
use catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio::time::{Duration, timeout};
Expand Down
4 changes: 2 additions & 2 deletions crates/executor/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::query::UserQuery;
use crate::query_types::QueryRecordId;
use crate::running_queries::RunningQueries;
use crate::utils::Config;
use catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
use catalog_metastore::Metastore;
use datafusion::config::ConfigOptions;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::{SessionStateBuilder, SessionStateDefaults};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::planner::IdentNormalizer;
use datafusion_functions_json::register_all as register_json_udfs;
use catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
use functions::expr_planner::CustomExprPlanner;
use functions::register_udafs;
use functions::session_params::{SessionParams, SessionProperty};
Expand Down Expand Up @@ -214,9 +214,9 @@ impl UserSession {
}

pub fn record_query_id(&self, query_id: QueryRecordId) {
const MAX_QUERIES: usize = 64;
if let Ok(mut guard) = self.recent_queries.write() {
guard.push_front(query_id);
const MAX_QUERIES: usize = 64;
while guard.len() > MAX_QUERIES {
guard.pop_back();
}
Expand Down
4 changes: 2 additions & 2 deletions crates/executor/src/snowflake_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
#![allow(clippy::match_same_arms)]
use crate::error::{Error, OperationOn, OperationType};
use crate::error_code::ErrorCode;
use catalog::df_error::DFExternalError as DFCatalogExternalDFError;
use catalog::error::Error as CatalogError;
use catalog_metastore::error::Error as MetastoreError;
use datafusion::arrow::error::ArrowError;
use datafusion_common::Diagnostic;
use datafusion_common::diagnostic::DiagnosticKind;
use datafusion_common::error::DataFusionError;
use catalog::df_error::DFExternalError as DFCatalogExternalDFError;
use catalog::error::Error as CatalogError;
use functions::df_error::DFExternalError as EmubucketFunctionsExternalDFError;
use iceberg_rust::error::Error as IcebergError;
use snafu::GenerateImplicitData;
Expand Down
Loading
Loading