Skip to content

Commit 2baf0bc

Browse files
authored
refactor(cubestore): Introduce QueryResult enum for SqlService execute (cube-js#10817)
1 parent 0a7948e commit 2baf0bc

15 files changed

Lines changed: 998 additions & 700 deletions

File tree

rust/cubestore/cubestore-sql-tests/src/benches.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ impl Bench for SimpleBench {
6767
let r = services
6868
.sql_service
6969
.exec_query(state.query.as_str())
70+
.await?
71+
.collect()
7072
.await?;
7173
let rows = to_rows(&r);
7274
assert_eq!(rows, vec![vec![TableValue::Int(23)]]);
@@ -105,11 +107,13 @@ impl Bench for ParquetMetadataCacheBench {
105107
let _ = services
106108
.sql_service
107109
.exec_query("CREATE SCHEMA IF NOT EXISTS test")
110+
.await?
111+
.collect()
108112
.await?;
109113

110114
let _ = services.sql_service
111115
.exec_query(format!("CREATE TABLE test.table (`repo` text, `email` text, `commit_count` int) WITH (input_format = 'csv') LOCATION '{}'", path_to_string(path)?).as_str())
112-
.await?;
116+
.await?.collect().await?;
113117

114118
// Wait for all pending (compaction) jobs to finish.
115119
wait_for_all_jobs(services).await?;
@@ -137,6 +141,8 @@ impl Bench for ParquetMetadataCacheBench {
137141
)
138142
.as_str(),
139143
)
144+
.await?
145+
.collect()
140146
.await?;
141147
let rows = to_rows(&r);
142148
assert_eq!(rows, vec![vec![TableValue::Int(6)]]);
@@ -157,6 +163,8 @@ impl Bench for CacheSetGetBench {
157163
services
158164
.sql_service
159165
.exec_query("CACHE SET TTL 600 'my_key' 'my_value'")
166+
.await?
167+
.collect()
160168
.await?;
161169

162170
let state = Arc::new(());
@@ -171,6 +179,8 @@ impl Bench for CacheSetGetBench {
171179
let r = services
172180
.sql_service
173181
.exec_query("CACHE GET 'my_key'")
182+
.await?
183+
.collect()
174184
.await?;
175185

176186
let rows = to_rows(&r);
@@ -208,6 +218,8 @@ impl Bench for crate::benches::QueueListBench {
208218
i,
209219
"a".repeat(self.payload_size)
210220
))
221+
.await?
222+
.collect()
211223
.await?;
212224
}
213225

@@ -223,6 +235,8 @@ impl Bench for crate::benches::QueueListBench {
223235
let r = services
224236
.sql_service
225237
.exec_query(r#"QUEUE PENDING "STANDALONE#queue""#)
238+
.await?
239+
.collect()
226240
.await?;
227241

228242
assert_eq!(

rust/cubestore/cubestore-sql-tests/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,12 @@ pub struct BasicSqlClient {
116116
#[async_trait]
117117
impl SqlClient for BasicSqlClient {
118118
async fn exec_query(&self, query: &str) -> Result<Arc<DataFrame>, CubeError> {
119-
self.service.as_ref().exec_query(query).await
119+
self.service
120+
.as_ref()
121+
.exec_query(query)
122+
.await?
123+
.collect()
124+
.await
120125
}
121126

122127
async fn exec_query_with_context(
@@ -127,6 +132,8 @@ impl SqlClient for BasicSqlClient {
127132
self.service
128133
.as_ref()
129134
.exec_query_with_context(context, query)
135+
.await?
136+
.collect()
130137
.await
131138
}
132139

rust/cubestore/cubestore-sql-tests/tests/cluster.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ impl MultiProcTest for ClusterSqlTest {
8484
}))
8585
.await
8686
.unwrap();
87+
Ok(())
8788
})
8889
.await;
8990
}
@@ -123,6 +124,7 @@ impl WorkerProc<WorkerArgs> for WorkerFn {
123124
.start_test_worker(|_| async move {
124125
init.signal().await;
125126
done.wait_completion().await;
127+
Ok(())
126128
})
127129
.await
128130
}

rust/cubestore/cubestore-sql-tests/tests/in_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ fn main() {
2222
}))
2323
.await
2424
.unwrap();
25+
Ok(())
2526
}));
2627
});
2728
}

rust/cubestore/cubestore-sql-tests/tests/migration.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ fn main() {
6363
test_fn(Box::new(FilterWritesSqlClient::new(services.sql_service)))
6464
.await
6565
.unwrap();
66+
Ok(())
6667
},
6768
));
6869
});
@@ -147,7 +148,9 @@ impl FilterWritesSqlClient {
147148
impl SqlClient for FilterWritesSqlClient {
148149
async fn exec_query(&self, query: &str) -> Result<Arc<DataFrame>, CubeError> {
149150
match self.compute_filter_flag(query) {
150-
FilterQueryResult::RunQuery => self.sql_service.exec_query(query).await,
151+
FilterQueryResult::RunQuery => {
152+
self.sql_service.exec_query(query).await?.collect().await
153+
}
151154
FilterQueryResult::Hardcoded(result) => result,
152155
FilterQueryResult::UnrecognizedQueryType => unimplemented!(
153156
"FilterWritesSqlClient does not support query prefix for '{}'",
@@ -164,6 +167,8 @@ impl SqlClient for FilterWritesSqlClient {
164167
FilterQueryResult::RunQuery => {
165168
self.sql_service
166169
.exec_query_with_context(context, query)
170+
.await?
171+
.collect()
167172
.await
168173
}
169174
FilterQueryResult::Hardcoded(result) => result,

rust/cubestore/cubestore-sql-tests/tests/multi_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ fn main() {
3030
}))
3131
.await
3232
.unwrap();
33+
Ok(())
3334
}),
3435
);
3536
});

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,7 +1804,7 @@ impl Config {
18041804

18051805
pub async fn start_test<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
18061806
where
1807-
T: Future<Output = ()> + Send,
1807+
T: Future<Output = Result<(), CubeError>> + Send,
18081808
{
18091809
self.start_test_with_options::<_, T, _, _>(
18101810
true,
@@ -1822,7 +1822,7 @@ impl Config {
18221822

18231823
pub async fn start_migration_test<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
18241824
where
1825-
T: Future<Output = ()> + Send,
1825+
T: Future<Output = Result<(), CubeError>> + Send,
18261826
{
18271827
self.start_migration_test_with_options::<_, T, _, _>(
18281828
Option::<
@@ -1839,7 +1839,7 @@ impl Config {
18391839

18401840
pub async fn start_test_worker<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
18411841
where
1842-
T: Future<Output = ()> + Send,
1842+
T: Future<Output = Result<(), CubeError>> + Send,
18431843
{
18441844
self.start_test_with_options::<_, T, _, _>(
18451845
false,
@@ -1861,7 +1861,7 @@ impl Config {
18611861
test_fn: impl FnOnce(CubeServices) -> T2,
18621862
) where
18631863
T1: Future<Output = ()> + Send,
1864-
T2: Future<Output = ()> + Send,
1864+
T2: Future<Output = Result<(), CubeError>> + Send,
18651865
{
18661866
self.start_test_with_options(true, Some(configure_injector), test_fn)
18671867
.await
@@ -1874,7 +1874,7 @@ impl Config {
18741874
test_fn: F,
18751875
) where
18761876
T1: Future<Output = ()> + Send,
1877-
T2: Future<Output = ()> + Send,
1877+
T2: Future<Output = Result<(), CubeError>> + Send,
18781878
I: FnOnce(Arc<Injector>) -> T1,
18791879
F: FnOnce(CubeServices) -> T2,
18801880
{
@@ -1900,8 +1900,10 @@ impl Config {
19001900

19011901
// Should be long enough even for CI.
19021902
let timeout = Duration::from_secs(600);
1903-
if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
1904-
panic!("Test timed out after {} seconds", timeout.as_secs());
1903+
match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
1904+
Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()),
1905+
Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()),
1906+
Ok(Ok(())) => {}
19051907
}
19061908

19071909
services.stop_processing_loops().await.unwrap();
@@ -1924,7 +1926,7 @@ impl Config {
19241926
test_fn: F,
19251927
) where
19261928
T1: Future<Output = ()> + Send,
1927-
T2: Future<Output = ()> + Send,
1929+
T2: Future<Output = Result<(), CubeError>> + Send,
19281930
I: FnOnce(Arc<Injector>) -> T1,
19291931
F: FnOnce(CubeServices) -> T2,
19301932
{
@@ -1943,8 +1945,10 @@ impl Config {
19431945

19441946
// Should be long enough even for CI.
19451947
let timeout = Duration::from_secs(600);
1946-
if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
1947-
panic!("Test timed out after {} seconds", timeout.as_secs());
1948+
match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await {
1949+
Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()),
1950+
Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()),
1951+
Ok(Ok(())) => {}
19481952
}
19491953

19501954
services.stop_processing_loops().await.unwrap();
@@ -1962,14 +1966,14 @@ impl Config {
19621966

19631967
pub async fn run_test<T>(name: &str, test_fn: impl FnOnce(CubeServices) -> T)
19641968
where
1965-
T: Future<Output = ()> + Send,
1969+
T: Future<Output = Result<(), CubeError>> + Send,
19661970
{
19671971
Self::test(name).start_test(test_fn).await;
19681972
}
19691973

19701974
pub async fn run_migration_test<T>(name: &str, test_fn: impl FnOnce(CubeServices) -> T)
19711975
where
1972-
T: Future<Output = ()> + Send,
1976+
T: Future<Output = Result<(), CubeError>> + Send,
19731977
{
19741978
Self::migration_test(name)
19751979
.start_migration_test(test_fn)

rust/cubestore/cubestore/src/http/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,8 @@ impl HttpServer {
589589
.with_parameters(&parameters),
590590
&query,
591591
)
592+
.await?
593+
.collect()
592594
.await?,
593595
}),
594596
x => Err(CubeError::user(format!("Unexpected command: {:?}", x))),
@@ -982,7 +984,9 @@ mod tests {
982984
use crate::http::{HttpCommand, HttpMessage, HttpServer};
983985
use crate::metastore::{Column, ColumnType};
984986
use crate::mysql::MockSqlAuthService;
985-
use crate::sql::{timestamp_from_string, InlineTable, QueryPlans, SqlQueryContext, SqlService};
987+
use crate::sql::{
988+
timestamp_from_string, InlineTable, QueryPlans, QueryResult, SqlQueryContext, SqlService,
989+
};
986990
use crate::store::DataFrame;
987991
use crate::table::{Row, TableValue};
988992
use crate::CubeError;
@@ -1137,26 +1141,26 @@ mod tests {
11371141

11381142
#[async_trait]
11391143
impl SqlService for SqlServiceMock {
1140-
async fn exec_query(&self, _query: &str) -> Result<Arc<DataFrame>, CubeError> {
1144+
async fn exec_query(&self, _query: &str) -> Result<QueryResult, CubeError> {
11411145
todo!()
11421146
}
11431147

11441148
async fn exec_query_with_context(
11451149
&self,
11461150
_context: SqlQueryContext,
11471151
query: &str,
1148-
) -> Result<Arc<DataFrame>, CubeError> {
1152+
) -> Result<QueryResult, CubeError> {
11491153
tokio::time::sleep(Duration::from_secs(2)).await;
11501154
let counter = self.message_counter.fetch_add(1, Ordering::Relaxed);
11511155
if query == "close_connection" {
11521156
Err(CubeError::wrong_connection("wrong connection".to_string()))
11531157
} else if query == "error" {
11541158
Err(CubeError::internal("error".to_string()))
11551159
} else {
1156-
Ok(Arc::new(DataFrame::new(
1160+
Ok(QueryResult::Frame(Arc::new(DataFrame::new(
11571161
vec![Column::new("foo".to_string(), ColumnType::String, 0)],
11581162
vec![Row::new(vec![TableValue::String(format!("{}", counter))])],
1159-
)))
1163+
))))
11601164
}
11611165
}
11621166

rust/cubestore/cubestore/src/mysql/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ impl<W: io::Write + Send> AsyncMysqlShim<W> for Backend {
6969
query,
7070
)
7171
.await;
72+
let res = match res {
73+
Ok(qr) => qr.collect().await,
74+
Err(e) => Err(e),
75+
};
7276
if let Err(e) = res {
7377
error!(
7478
"Error during processing {}: {}",

rust/cubestore/cubestore/src/remotefs/cleanup.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,18 +315,33 @@ mod test {
315315
let service = services.sql_service;
316316
let meta_store = services.meta_store;
317317
let remote_fs = services.injector.get_service_typed::<dyn RemoteFs>().await;
318-
let _ = service.exec_query("CREATE SCHEMA test").await.unwrap();
318+
let _ = service
319+
.exec_query("CREATE SCHEMA test")
320+
.await
321+
.unwrap()
322+
.collect()
323+
.await
324+
.unwrap();
319325
let _ = service
320326
.exec_query("CREATE TABLE test.tst (a int, b int)")
321327
.await
328+
.unwrap()
329+
.collect()
330+
.await
322331
.unwrap();
323332
let _ = service
324333
.exec_query("INSERT INTO test.tst (a, b) VALUES (10, 10), (20 , 20)")
325334
.await
335+
.unwrap()
336+
.collect()
337+
.await
326338
.unwrap();
327339
let _ = service
328340
.exec_query("INSERT INTO test.tst (a, b) VALUES (20, 20), (40 , 40)")
329341
.await
342+
.unwrap()
343+
.collect()
344+
.await
330345
.unwrap();
331346
let files = remove_root_paritition(meta_store.get_all_filenames().await.unwrap());
332347
assert_eq!(files.len(), 2);
@@ -361,7 +376,14 @@ mod test {
361376
let path = remote_fs.local_file("metastore".to_string()).await.unwrap();
362377
assert!(Path::new(&path).exists());
363378

364-
let _ = service.exec_query("SELECT * FROM test.tst").await.unwrap();
379+
let _ = service
380+
.exec_query("SELECT * FROM test.tst")
381+
.await
382+
.unwrap()
383+
.collect()
384+
.await
385+
.unwrap();
386+
Ok::<(), CubeError>(())
365387
})
366388
.await;
367389
}

0 commit comments

Comments
 (0)