Skip to content

Commit 491d9ac

Browse files
authored
service: Simple read only mode (#38)
* read only mode for service * read only mode for service, clippy + fmt * read only mode for service test fixes * Requested changes * Service test * Service test fix
1 parent 9020043 commit 491d9ac

9 files changed

Lines changed: 261 additions & 26 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/embucket-lambda/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct EnvConfig {
1818
pub embucket_version: String,
1919
pub metastore_config: Option<PathBuf>,
2020
pub jwt_secret: Option<String>,
21+
pub read_only: bool,
2122
}
2223

2324
impl EnvConfig {
@@ -40,6 +41,7 @@ impl EnvConfig {
4041
embucket_version: env_or_default("EMBUCKET_VERSION", "0.1.0"),
4142
metastore_config: env::var("METASTORE_CONFIG").ok().map(PathBuf::from),
4243
jwt_secret: env::var("JWT_SECRET").ok(),
44+
read_only: parse_env("READ_ONLY").unwrap_or(true),
4345
}
4446
}
4547

@@ -56,6 +58,7 @@ impl EnvConfig {
5658
mem_enable_track_consumers_pool: self.mem_enable_track_consumers_pool,
5759
disk_pool_size_mb: self.disk_pool_size_mb,
5860
query_history_rows_limit: self.query_history_rows_limit,
61+
read_only: self.read_only,
5962
}
6063
}
6164
}

crates/embucket-lambda/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async fn main() -> Result<(), LambdaError> {
4848
mem_pool_size_mb = ?env_config.mem_pool_size_mb,
4949
disk_pool_size_mb = ?env_config.disk_pool_size_mb,
5050
bootstrap_default_entities = env_config.bootstrap_default_entities,
51+
read_only = env_config.read_only,
5152
metastore_config = env_config.metastore_config.as_ref().map(|p| p.display().to_string()),
5253
"Loaded Lambda configuration"
5354
);

crates/embucketd/src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ pub struct CliOpts {
169169
help = "JWT secret for auth"
170170
)]
171171
jwt_secret: Option<String>,
172+
173+
#[arg(
174+
long,
175+
env = "READ_ONLY",
176+
default_value = "true",
177+
help = "If the service should only accept read only commands (selects)"
178+
)]
179+
pub read_only: bool,
172180
}
173181

174182
impl CliOpts {

crates/embucketd/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ async fn async_main(
145145
mem_enable_track_consumers_pool: opts.mem_enable_track_consumers_pool,
146146
disk_pool_size_mb: opts.disk_pool_size_mb,
147147
query_history_rows_limit: opts.query_history_rows_limit,
148+
read_only: opts.read_only,
148149
};
149150
let host = opts.host.clone().unwrap();
150151
let port = opts.port.unwrap();

crates/executor/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,13 @@ pub enum Error {
632632
#[snafu(implicit)]
633633
location: Location,
634634
},
635+
636+
#[snafu(display("Statement not supported in read_only mode: {statement}"))]
637+
NotSupportedStatementInReadOnlyMode {
638+
statement: String,
639+
#[snafu(implicit)]
640+
location: Location,
641+
},
635642
}
636643

637644
impl Error {

crates/executor/src/query.rs

Lines changed: 101 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,50 @@ impl UserQuery {
279279
// 3. Single place to construct Logical plan from this AST
280280
// 4. Single place to rewrite-optimize-adjust logical plan
281281
// etc
282+
if self.session.config.read_only {
283+
match statement {
284+
DFStatement::Statement(s) => match *s {
285+
Statement::Query(subquery) => {
286+
return self.execute_query_statement(subquery).await;
287+
}
288+
Statement::Use(entity) => {
289+
return self.execute_use_statement(entity).await;
290+
}
291+
other => {
292+
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
293+
statement: other.to_string(),
294+
}
295+
.fail();
296+
}
297+
},
298+
DFStatement::Explain(explain) => match *explain.statement {
299+
DFStatement::Statement(s) => match *s {
300+
Statement::Query(..) | Statement::Use(..) => {
301+
return self.execute_sql(&self.query).await;
302+
}
303+
other => {
304+
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
305+
statement: other.to_string(),
306+
}
307+
.fail();
308+
}
309+
},
310+
other => {
311+
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
312+
statement: other.to_string(),
313+
}
314+
.fail();
315+
}
316+
},
317+
other => {
318+
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
319+
statement: other.to_string(),
320+
}
321+
.fail();
322+
}
323+
}
324+
}
325+
282326
if let DFStatement::Statement(s) = statement {
283327
match *s {
284328
Statement::AlterSession {
@@ -300,29 +344,7 @@ impl UserQuery {
300344
return self.status_response();
301345
}
302346
Statement::Use(entity) => {
303-
let (variable, value) = match entity {
304-
Use::Catalog(n) => ("catalog", n.to_string()),
305-
Use::Schema(n) => ("schema", n.to_string()),
306-
Use::Database(n) => ("database", n.to_string()),
307-
Use::Warehouse(n) => ("warehouse", n.to_string()),
308-
Use::Role(n) => ("role", n.to_string()),
309-
Use::Object(n) => ("object", n.to_string()),
310-
Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()),
311-
Use::Default => ("", String::new()),
312-
};
313-
if variable.is_empty() | value.is_empty() {
314-
return ex_error::OnyUseWithVariablesSnafu.fail();
315-
}
316-
let params = HashMap::from([(
317-
variable.to_string(),
318-
SessionProperty::from_str_value(
319-
variable.to_string(),
320-
value,
321-
Some(self.session.ctx.session_id()),
322-
),
323-
)]);
324-
self.session.set_session_variable(true, params)?;
325-
return self.status_response();
347+
return self.execute_use_statement(entity).await;
326348
}
327349
Statement::Set(statement) => {
328350
use datafusion::sql::sqlparser::ast::Set;
@@ -397,9 +419,8 @@ impl UserQuery {
397419
Statement::Truncate { table_names, .. } => {
398420
return Box::pin(self.truncate_table(table_names)).await;
399421
}
400-
Statement::Query(mut subquery) => {
401-
self.traverse_and_update_query(subquery.as_mut()).await;
402-
return Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await;
422+
Statement::Query(subquery) => {
423+
return self.execute_query_statement(subquery).await;
403424
}
404425
Statement::Drop { .. } => return Box::pin(self.drop_query(*s)).await,
405426
Statement::Merge { .. } => return Box::pin(self.merge_query(*s)).await,
@@ -418,6 +439,60 @@ impl UserQuery {
418439
self.execute_sql(&self.query).await
419440
}
420441

442+
#[instrument(
443+
name = "UserQuery::execute_query_statement",
444+
level = "debug",
445+
skip(self),
446+
fields(
447+
statement,
448+
query_id = self.query_context.query_id.to_string(),
449+
),
450+
err
451+
)]
452+
pub async fn execute_query_statement(
453+
&mut self,
454+
mut subquery: Box<Query>,
455+
) -> Result<QueryResult> {
456+
self.traverse_and_update_query(subquery.as_mut()).await;
457+
Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await
458+
}
459+
460+
#[instrument(
461+
name = "UserQuery::execute_use_statement",
462+
level = "debug",
463+
skip(self),
464+
fields(
465+
statement,
466+
query_id = self.query_context.query_id.to_string(),
467+
),
468+
err
469+
)]
470+
pub async fn execute_use_statement(&mut self, entity: Use) -> Result<QueryResult> {
471+
let (variable, value) = match entity {
472+
Use::Catalog(n) => ("catalog", n.to_string()),
473+
Use::Schema(n) => ("schema", n.to_string()),
474+
Use::Database(n) => ("database", n.to_string()),
475+
Use::Warehouse(n) => ("warehouse", n.to_string()),
476+
Use::Role(n) => ("role", n.to_string()),
477+
Use::Object(n) => ("object", n.to_string()),
478+
Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()),
479+
Use::Default => ("", String::new()),
480+
};
481+
if variable.is_empty() | value.is_empty() {
482+
return ex_error::OnyUseWithVariablesSnafu.fail();
483+
}
484+
let params = HashMap::from([(
485+
variable.to_string(),
486+
SessionProperty::from_str_value(
487+
variable.to_string(),
488+
value,
489+
Some(self.session.ctx.session_id()),
490+
),
491+
)]);
492+
self.session.set_session_variable(true, params)?;
493+
self.status_response()
494+
}
495+
421496
#[instrument(name = "UserQuery::get_catalog", level = "trace", skip(self), err)]
422497
pub fn get_catalog(&self, name: &str) -> Result<Arc<dyn CatalogProvider>> {
423498
self.session

crates/executor/src/tests/service.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,3 +432,134 @@ async fn test_query_timeout() {
432432
"Expected query execution exceeded timeout error but got {res:?}"
433433
);
434434
}
435+
436+
#[tokio::test]
437+
#[allow(clippy::expect_used)]
438+
async fn test_execute_read_only_mode() {
439+
//setup
440+
let metastore = Arc::new(InMemoryMetastore::new());
441+
let execution_svc = CoreExecutionService::new(metastore.clone(), Arc::new(Config::default()))
442+
.await
443+
.expect("Failed to create execution service");
444+
445+
execution_svc
446+
.create_session("test_session_id")
447+
.await
448+
.expect("Failed to create session");
449+
450+
execution_svc
451+
.query(
452+
"test_session_id",
453+
"CREATE OR REPLACE TABLE fetch_test(c1 INT)",
454+
QueryContext::default(),
455+
)
456+
.await
457+
.expect("Failed to execute query");
458+
459+
execution_svc
460+
.query(
461+
"test_session_id",
462+
"INSERT INTO fetch_test VALUES (1),(2),(3),(4)",
463+
QueryContext::default(),
464+
)
465+
.await
466+
.expect("Failed to execute query");
467+
468+
drop(execution_svc);
469+
470+
//read only mode test
471+
let execution_svc =
472+
CoreExecutionService::new(metastore, Arc::new(Config::default().with_read_only(true)))
473+
.await
474+
.expect("Failed to create execution service");
475+
476+
execution_svc
477+
.create_session("test_session_id")
478+
.await
479+
.expect("Failed to create session");
480+
481+
//should fail
482+
execution_svc
483+
.query(
484+
"test_session_id",
485+
"CREATE OR REPLACE TABLE fetch_test(c1 INT)",
486+
QueryContext::default(),
487+
)
488+
.await
489+
.expect_err("Read only mode failed");
490+
491+
//should fail
492+
execution_svc
493+
.query(
494+
"test_session_id",
495+
"INSERT INTO fetch_test VALUES (1),(2),(3),(4)",
496+
QueryContext::default(),
497+
)
498+
.await
499+
.expect_err("Read only mode failed");
500+
501+
execution_svc
502+
.query("test_session_id", "SELECT 1", QueryContext::default())
503+
.await
504+
.expect("Failed to execute query in read only mode");
505+
506+
execution_svc
507+
.query(
508+
"test_session_id",
509+
"EXPLAIN SELECT 1",
510+
QueryContext::default(),
511+
)
512+
.await
513+
.expect("Failed to execute query in read only mode");
514+
515+
execution_svc
516+
.query(
517+
"test_session_id",
518+
"WITH limited_data AS (
519+
SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS
520+
)
521+
SELECT * FROM limited_data ORDER BY c1;",
522+
QueryContext::default(),
523+
)
524+
.await
525+
.expect("Failed to execute query in read only mode");
526+
527+
execution_svc
528+
.query(
529+
"test_session_id",
530+
"EXPLAIN WITH limited_data AS (
531+
SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS
532+
)
533+
SELECT * FROM limited_data ORDER BY c1;",
534+
QueryContext::default(),
535+
)
536+
.await
537+
.expect("Failed to execute query in read only mode");
538+
539+
execution_svc
540+
.query(
541+
"test_session_id",
542+
"SELECT 1 UNION ALL SELECT c1 FROM fetch_test;",
543+
QueryContext::default(),
544+
)
545+
.await
546+
.expect("Failed to execute query in read only mode");
547+
548+
execution_svc
549+
.query(
550+
"test_session_id",
551+
"EXPLAIN SELECT 1 UNION ALL SELECT c1 FROM fetch_test;",
552+
QueryContext::default(),
553+
)
554+
.await
555+
.expect("Failed to execute query in read only mode");
556+
557+
execution_svc
558+
.query(
559+
"test_session_id",
560+
"USE SCHEMA public;",
561+
QueryContext::default(),
562+
)
563+
.await
564+
.expect("Failed to execute query in read only mode");
565+
}

crates/executor/src/utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct Config {
4343
pub mem_enable_track_consumers_pool: Option<bool>,
4444
pub disk_pool_size_mb: Option<usize>,
4545
pub query_history_rows_limit: usize,
46+
pub read_only: bool,
4647
}
4748

4849
impl Default for Config {
@@ -58,6 +59,7 @@ impl Default for Config {
5859
mem_enable_track_consumers_pool: None,
5960
disk_pool_size_mb: None,
6061
query_history_rows_limit: DEFAULT_QUERY_HISTORY_ROWS_LIMIT,
62+
read_only: false,
6163
}
6264
}
6365
}
@@ -89,6 +91,12 @@ impl Config {
8991
self.query_history_rows_limit = limit;
9092
self
9193
}
94+
95+
#[must_use]
96+
pub const fn with_read_only(mut self, read_only: bool) -> Self {
97+
self.read_only = read_only;
98+
self
99+
}
92100
}
93101

94102
#[derive(Copy, Clone, PartialEq, Eq, EnumString, Debug, Display, Default)]

0 commit comments

Comments
 (0)