Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions docs-mintlify/docs/integrations/snowflake-semantic-views.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ Alternatively, you can push Cube views into Snowflake as native semantic views.

This enables you to use Cube-authored views directly in Snowflake, maintaining consistency across both platforms.

### Cubes defined with `sql`

When a cube uses the `sql` property with a plain SQL string, Cube creates a helper
Snowflake view named `CUBE_SV_SRC_<CUBENAME>` in a configurable schema (defaults to
`PUBLIC`) and uses that view as the source for the semantic view. For example:
`sql: "SELECT id, status FROM raw.orders"`.

Note that if you're simply referencing a table, use `sql_table` instead, as it's the
recommended approach for straightforward table access (e.g., `sql_table: MY_SCHEMA.MY_TABLE`).

### Prerequisites

The push integration uses the SQL Runner to execute DDL statements in Snowflake. To
Expand All @@ -60,6 +70,8 @@ successfully create semantic views, ensure the following:
- The Snowflake role configured for your Cube data source (via [`CUBEJS_DB_SNOWFLAKE_ROLE`](/reference/configuration/environment-variables#cubejs_db_snowflake_role))
has privileges to create semantic views in the target database and schema
(`CREATE SEMANTIC VIEW` on the schema, plus `USAGE` on the parent database and schema).
If any cube uses a plain SQL string in its `sql` property, the role also needs
`CREATE VIEW` privileges on the schema where helper views are created (which defaults to `PUBLIC`).
- The role has `USAGE` on the warehouse specified by [`CUBEJS_DB_SNOWFLAKE_WAREHOUSE`](/reference/configuration/environment-variables#cubejs_db_snowflake_warehouse)
and `SELECT` on the underlying tables referenced by the view.
- [`CUBEJS_DB_SNOWFLAKE_QUOTED_IDENTIFIERS_IGNORE_CASE`](/reference/configuration/environment-variables#cubejs_db_snowflake_quoted_identifiers_ignore_case)
Expand All @@ -81,9 +93,10 @@ Cube only.

When pushing a Cube view to Snowflake, the following are currently not supported:

- **Cubes without a `sql_table`.** Each cube referenced by the view must be
backed by a physical table. Cubes defined with an arbitrary `sql` block
(subqueries, CTEs, or other inline SQL) can't be pushed.
- **Cubes with templated `sql`.** If a cube's `sql` property uses template
expressions (e.g., Jinja or dbt `{{ source(...) }}` syntax), it can't be
pushed. See [Cubes defined with `sql`](#cubes-defined-with-sql) for details on
what SQL patterns are supported.
- **Cubes without a single-column primary key.** Every cube needs a
`primary_key` dimension that resolves to a single physical column. Composite
primary keys and primary keys defined as SQL expressions aren't supported.
Expand Down
16 changes: 15 additions & 1 deletion rust/cubestore/cubestore-sql-tests/src/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl Bench for SimpleBench {
let r = services
.sql_service
.exec_query(state.query.as_str())
.await?
.collect()
.await?;
let rows = to_rows(&r);
assert_eq!(rows, vec![vec![TableValue::Int(23)]]);
Expand Down Expand Up @@ -105,11 +107,13 @@ impl Bench for ParquetMetadataCacheBench {
let _ = services
.sql_service
.exec_query("CREATE SCHEMA IF NOT EXISTS test")
.await?
.collect()
.await?;

let _ = services.sql_service
.exec_query(format!("CREATE TABLE test.table (`repo` text, `email` text, `commit_count` int) WITH (input_format = 'csv') LOCATION '{}'", path_to_string(path)?).as_str())
.await?;
.await?.collect().await?;

// Wait for all pending (compaction) jobs to finish.
wait_for_all_jobs(services).await?;
Expand Down Expand Up @@ -137,6 +141,8 @@ impl Bench for ParquetMetadataCacheBench {
)
.as_str(),
)
.await?
.collect()
.await?;
let rows = to_rows(&r);
assert_eq!(rows, vec![vec![TableValue::Int(6)]]);
Expand All @@ -157,6 +163,8 @@ impl Bench for CacheSetGetBench {
services
.sql_service
.exec_query("CACHE SET TTL 600 'my_key' 'my_value'")
.await?
.collect()
.await?;

let state = Arc::new(());
Expand All @@ -171,6 +179,8 @@ impl Bench for CacheSetGetBench {
let r = services
.sql_service
.exec_query("CACHE GET 'my_key'")
.await?
.collect()
.await?;

let rows = to_rows(&r);
Expand Down Expand Up @@ -208,6 +218,8 @@ impl Bench for crate::benches::QueueListBench {
i,
"a".repeat(self.payload_size)
))
.await?
.collect()
.await?;
}

Expand All @@ -223,6 +235,8 @@ impl Bench for crate::benches::QueueListBench {
let r = services
.sql_service
.exec_query(r#"QUEUE PENDING "STANDALONE#queue""#)
.await?
.collect()
.await?;

assert_eq!(
Expand Down
9 changes: 8 additions & 1 deletion rust/cubestore/cubestore-sql-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ pub struct BasicSqlClient {
#[async_trait]
impl SqlClient for BasicSqlClient {
async fn exec_query(&self, query: &str) -> Result<Arc<DataFrame>, CubeError> {
self.service.as_ref().exec_query(query).await
self.service
.as_ref()
.exec_query(query)
.await?
.collect()
.await
}

async fn exec_query_with_context(
Expand All @@ -127,6 +132,8 @@ impl SqlClient for BasicSqlClient {
self.service
.as_ref()
.exec_query_with_context(context, query)
.await?
.collect()
.await
}

Expand Down
2 changes: 2 additions & 0 deletions rust/cubestore/cubestore-sql-tests/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl MultiProcTest for ClusterSqlTest {
}))
.await
.unwrap();
Ok(())
})
.await;
}
Expand Down Expand Up @@ -123,6 +124,7 @@ impl WorkerProc<WorkerArgs> for WorkerFn {
.start_test_worker(|_| async move {
init.signal().await;
done.wait_completion().await;
Ok(())
})
.await
}
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore-sql-tests/tests/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn main() {
}))
.await
.unwrap();
Ok(())
}));
});
}
7 changes: 6 additions & 1 deletion rust/cubestore/cubestore-sql-tests/tests/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ fn main() {
test_fn(Box::new(FilterWritesSqlClient::new(services.sql_service)))
.await
.unwrap();
Ok(())
},
));
});
Expand Down Expand Up @@ -147,7 +148,9 @@ impl FilterWritesSqlClient {
impl SqlClient for FilterWritesSqlClient {
async fn exec_query(&self, query: &str) -> Result<Arc<DataFrame>, CubeError> {
match self.compute_filter_flag(query) {
FilterQueryResult::RunQuery => self.sql_service.exec_query(query).await,
FilterQueryResult::RunQuery => {
self.sql_service.exec_query(query).await?.collect().await
}
FilterQueryResult::Hardcoded(result) => result,
FilterQueryResult::UnrecognizedQueryType => unimplemented!(
"FilterWritesSqlClient does not support query prefix for '{}'",
Expand All @@ -164,6 +167,8 @@ impl SqlClient for FilterWritesSqlClient {
FilterQueryResult::RunQuery => {
self.sql_service
.exec_query_with_context(context, query)
.await?
.collect()
.await
}
FilterQueryResult::Hardcoded(result) => result,
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore-sql-tests/tests/multi_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn main() {
}))
.await
.unwrap();
Ok(())
}),
);
});
Expand Down
28 changes: 16 additions & 12 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,7 @@ impl Config {

pub async fn start_test<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future<Output = ()> + Send,
T: Future<Output = Result<(), CubeError>> + Send,
{
self.start_test_with_options::<_, T, _, _>(
true,
Expand All @@ -1822,7 +1822,7 @@ impl Config {

pub async fn start_migration_test<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future<Output = ()> + Send,
T: Future<Output = Result<(), CubeError>> + Send,
{
self.start_migration_test_with_options::<_, T, _, _>(
Option::<
Expand All @@ -1839,7 +1839,7 @@ impl Config {

pub async fn start_test_worker<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future<Output = ()> + Send,
T: Future<Output = Result<(), CubeError>> + Send,
{
self.start_test_with_options::<_, T, _, _>(
false,
Expand All @@ -1861,7 +1861,7 @@ impl Config {
test_fn: impl FnOnce(CubeServices) -> T2,
) where
T1: Future<Output = ()> + Send,
T2: Future<Output = ()> + Send,
T2: Future<Output = Result<(), CubeError>> + Send,
{
self.start_test_with_options(true, Some(configure_injector), test_fn)
.await
Expand All @@ -1874,7 +1874,7 @@ impl Config {
test_fn: F,
) where
T1: Future<Output = ()> + Send,
T2: Future<Output = ()> + Send,
T2: Future<Output = Result<(), CubeError>> + Send,
I: FnOnce(Arc<Injector>) -> T1,
F: FnOnce(CubeServices) -> T2,
{
Expand All @@ -1900,8 +1900,10 @@ impl Config {

// Should be long enough even for CI.
let timeout = Duration::from_secs(600);
if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
panic!("Test timed out after {} seconds", timeout.as_secs());
match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()),
Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()),
Ok(Ok(())) => {}
}

services.stop_processing_loops().await.unwrap();
Expand All @@ -1924,7 +1926,7 @@ impl Config {
test_fn: F,
) where
T1: Future<Output = ()> + Send,
T2: Future<Output = ()> + Send,
T2: Future<Output = Result<(), CubeError>> + Send,
I: FnOnce(Arc<Injector>) -> T1,
F: FnOnce(CubeServices) -> T2,
{
Expand All @@ -1943,8 +1945,10 @@ impl Config {

// Should be long enough even for CI.
let timeout = Duration::from_secs(600);
if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
panic!("Test timed out after {} seconds", timeout.as_secs());
match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()),
Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()),
Ok(Ok(())) => {}
}

services.stop_processing_loops().await.unwrap();
Expand All @@ -1962,14 +1966,14 @@ impl Config {

pub async fn run_test<T>(name: &str, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future<Output = ()> + Send,
T: Future<Output = Result<(), CubeError>> + Send,
{
Self::test(name).start_test(test_fn).await;
}

pub async fn run_migration_test<T>(name: &str, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future<Output = ()> + Send,
T: Future<Output = Result<(), CubeError>> + Send,
{
Self::migration_test(name)
.start_migration_test(test_fn)
Expand Down
14 changes: 9 additions & 5 deletions rust/cubestore/cubestore/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ impl HttpServer {
.with_parameters(&parameters),
&query,
)
.await?
.collect()
.await?,
}),
x => Err(CubeError::user(format!("Unexpected command: {:?}", x))),
Expand Down Expand Up @@ -982,7 +984,9 @@ mod tests {
use crate::http::{HttpCommand, HttpMessage, HttpServer};
use crate::metastore::{Column, ColumnType};
use crate::mysql::MockSqlAuthService;
use crate::sql::{timestamp_from_string, InlineTable, QueryPlans, SqlQueryContext, SqlService};
use crate::sql::{
timestamp_from_string, InlineTable, QueryPlans, QueryResult, SqlQueryContext, SqlService,
};
use crate::store::DataFrame;
use crate::table::{Row, TableValue};
use crate::CubeError;
Expand Down Expand Up @@ -1137,26 +1141,26 @@ mod tests {

#[async_trait]
impl SqlService for SqlServiceMock {
async fn exec_query(&self, _query: &str) -> Result<Arc<DataFrame>, CubeError> {
async fn exec_query(&self, _query: &str) -> Result<QueryResult, CubeError> {
todo!()
}

async fn exec_query_with_context(
&self,
_context: SqlQueryContext,
query: &str,
) -> Result<Arc<DataFrame>, CubeError> {
) -> Result<QueryResult, CubeError> {
tokio::time::sleep(Duration::from_secs(2)).await;
let counter = self.message_counter.fetch_add(1, Ordering::Relaxed);
if query == "close_connection" {
Err(CubeError::wrong_connection("wrong connection".to_string()))
} else if query == "error" {
Err(CubeError::internal("error".to_string()))
} else {
Ok(Arc::new(DataFrame::new(
Ok(QueryResult::Frame(Arc::new(DataFrame::new(
vec![Column::new("foo".to_string(), ColumnType::String, 0)],
vec![Row::new(vec![TableValue::String(format!("{}", counter))])],
)))
))))
}
}

Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/cubestore/src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl<W: io::Write + Send> AsyncMysqlShim<W> for Backend {
query,
)
.await;
let res = match res {
Ok(qr) => qr.collect().await,
Err(e) => Err(e),
};
if let Err(e) = res {
error!(
"Error during processing {}: {}",
Expand Down
Loading
Loading