diff --git a/Cargo.lock b/Cargo.lock index d6a692df6c66d..f8ab743f75563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -561,7 +561,7 @@ dependencies = [ [[package]] name = "arrow-udf-runtime" version = "0.8.0" -source = "git+https://github.com/datafuse-extras/arrow-udf.git?rev=92eeb3b#92eeb3b8ecf10a894b8bd861a8c118215426fa93" +source = "git+https://github.com/datafuse-extras/arrow-udf.git?rev=a442343#a44234332e9c182c247a510c3721b655572f323c" dependencies = [ "anyhow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index ef0fd8939bf85..5ab2d02b9e640 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -642,7 +642,7 @@ overflow-checks = true rpath = false [patch.crates-io] -arrow-udf-runtime = { git = "https://github.com/datafuse-extras/arrow-udf.git", rev = "92eeb3b" } +arrow-udf-runtime = { git = "https://github.com/datafuse-extras/arrow-udf.git", rev = "a442343" } async-backtrace = { git = "https://github.com/datafuse-extras/async-backtrace.git", rev = "dea4553" } async-recursion = { git = "https://github.com/datafuse-extras/async-recursion.git", rev = "a353334" } backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "72265be" } diff --git a/src/meta/app/src/principal/user_defined_function.rs b/src/meta/app/src/principal/user_defined_function.rs index cba5326217820..208f98385a822 100644 --- a/src/meta/app/src/principal/user_defined_function.rs +++ b/src/meta/app/src/principal/user_defined_function.rs @@ -40,6 +40,8 @@ pub struct UDFServer { #[derive(Clone, Debug, Eq, PartialEq)] pub struct UDFScript { pub code: String, + pub imports: Vec, + pub packages: Vec, pub handler: String, pub language: String, pub arg_types: Vec, @@ -50,6 +52,8 @@ pub struct UDFScript { #[derive(Clone, Debug, Eq, PartialEq)] pub struct UDAFScript { pub code: String, + pub imports: Vec, + pub packages: Vec, pub language: String, // aggregate function input types pub arg_types: Vec, @@ -167,6 +171,8 @@ impl UserDefinedFunction { arg_types, return_type, runtime_version: runtime_version.to_string(), + imports: vec![], + packages: vec![], }), created_on: Utc::now(), } @@ -226,6 +232,8 @@ impl Display for UDFDefinition { handler, language, runtime_version, + imports, + packages, }) => { for (i, item) in arg_types.iter().enumerate() { if i > 0 { @@ -235,7 +243,7 @@ impl Display for UDFDefinition { } write!( f, - ") RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$" + ") RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$" )?; } UDFDefinition::UDAFScript(UDAFScript { @@ -245,6 +253,8 @@ impl Display for UDFDefinition { return_type, language, runtime_version, + imports, + packages, }) => { for (i, item) in arg_types.iter().enumerate() { if i > 0 { @@ -259,7 +269,7 @@ impl Display for UDFDefinition { } write!(f, "{} {}", item.name(), item.data_type())?; } - write!(f, " }} RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?; + write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?; } } Ok(()) diff --git a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs index b3b39297719aa..3a11a5218b25b 100644 --- a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs @@ -137,6 +137,8 @@ impl FromToProto for mt::UDFScript { handler: p.handler, language: p.language, runtime_version: p.runtime_version, + imports: p.imports, + packages: p.packages, }) } @@ -171,6 +173,8 @@ impl FromToProto for mt::UDFScript { arg_types, return_type: Some(return_type), runtime_version: self.runtime_version.clone(), + imports: self.imports.clone(), + packages: self.packages.clone(), }) } } @@ -206,6 +210,8 @@ impl FromToProto for mt::UDAFScript { return_type, language: p.language, runtime_version: p.runtime_version, + imports: p.imports, + packages: p.packages, state_fields, }) } @@ -259,6 +265,8 @@ impl FromToProto for mt::UDAFScript { arg_types, state_fields, return_type: Some(return_type), + imports: self.imports.clone(), + packages: self.packages.clone(), }) } } diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 00be0d24f677f..6396122b8f5cd 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -159,6 +159,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (127, "2025-05-18: Add: UserOption::workload_group"), (128, "2025-05-22: Add: Storage Network config"), (129, "2025-05-30: Add: New DataType Vector"), + (130, "2025-06-19: Add: New UDF imports and packages in udf definition"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index d00f0494f499f..624bcc1f44bca 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -120,4 +120,4 @@ mod v125_table_index; mod v126_iceberg_storage_catalog_option; mod v127_user_option_workload_group; mod v128_storage_network_config; -mod v129_vector_datatype; +mod v130_udf_imports_packages; diff --git a/src/meta/proto-conv/tests/it/v081_udf_script.rs b/src/meta/proto-conv/tests/it/v081_udf_script.rs index a275b15750763..e52a072f366eb 100644 --- a/src/meta/proto-conv/tests/it/v081_udf_script.rs +++ b/src/meta/proto-conv/tests/it/v081_udf_script.rs @@ -116,6 +116,8 @@ fn test_decode_udf_script() -> anyhow::Result<()> { language: "python".to_string(), arg_types: vec![DataType::Number(NumberDataType::Int32)], return_type: DataType::Number(NumberDataType::Float32), + imports: vec![], + packages: vec![], runtime_version: "3.12.2".to_string(), }), created_on: DateTime::::default(), diff --git a/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs b/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs index c6b49ee0f70cd..32100f6b0f135 100644 --- a/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs +++ b/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs @@ -61,6 +61,8 @@ fn test_decode_v115_add_udaf_script() -> anyhow::Result<()> { )], return_type: DataType::Number(NumberDataType::Float32), runtime_version: "".to_string(), + imports: vec![], + packages: vec![], }), created_on: DateTime::::default(), }; diff --git a/src/meta/proto-conv/tests/it/v129_vector_datatype.rs b/src/meta/proto-conv/tests/it/v129_vector_datatype copy.rs similarity index 100% rename from src/meta/proto-conv/tests/it/v129_vector_datatype.rs rename to src/meta/proto-conv/tests/it/v129_vector_datatype copy.rs diff --git a/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs new file mode 100644 index 0000000000000..f0b8b40497efa --- /dev/null +++ b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use databend_common_meta_app::principal::UDFDefinition; +use databend_common_meta_app::principal::UDFScript; +use databend_common_meta_app::principal::UserDefinedFunction; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `test_pb_from_to()` +#[test] +fn test_decode_v130_udf_script() -> anyhow::Result<()> { + let bytes = vec![ + 10, 5, 109, 121, 95, 102, 110, 18, 21, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, 100, + 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 50, 119, 10, 9, 115, 111, 109, 101, 32, + 99, 111, 100, 101, 18, 5, 109, 121, 95, 102, 110, 26, 6, 112, 121, 116, 104, 111, 110, 34, + 19, 154, 2, 9, 58, 0, 160, 6, 130, 1, 168, 6, 24, 160, 6, 130, 1, 168, 6, 24, 42, 19, 154, + 2, 9, 74, 0, 160, 6, 130, 1, 168, 6, 24, 160, 6, 130, 1, 168, 6, 24, 50, 6, 51, 46, 49, 50, + 46, 50, 58, 9, 64, 115, 49, 47, 97, 46, 122, 105, 112, 58, 8, 64, 115, 50, 47, 98, 46, 112, + 121, 66, 5, 110, 117, 109, 112, 121, 66, 6, 112, 97, 110, 100, 97, 115, 160, 6, 130, 1, + 168, 6, 24, 42, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, + 48, 32, 85, 84, 67, 160, 6, 130, 1, 168, 6, 24, + ]; + + let want = || UserDefinedFunction { + name: "my_fn".to_string(), + description: "This is a description".to_string(), + definition: UDFDefinition::UDFScript(UDFScript { + code: "some code".to_string(), + handler: "my_fn".to_string(), + language: "python".to_string(), + arg_types: vec![DataType::Number(NumberDataType::Int32)], + return_type: DataType::Number(NumberDataType::Float32), + imports: vec!["@s1/a.zip".to_string(), "@s2/b.py".to_string()], + packages: vec!["numpy".to_string(), "pandas".to_string()], + runtime_version: "3.12.2".to_string(), + }), + created_on: DateTime::::default(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), bytes.as_slice(), 130, want()) +} diff --git a/src/meta/protos/proto/udf.proto b/src/meta/protos/proto/udf.proto index 024ed8c21ecca..8365e16f98d9a 100644 --- a/src/meta/protos/proto/udf.proto +++ b/src/meta/protos/proto/udf.proto @@ -24,7 +24,7 @@ message LambdaUDF { uint64 min_reader_ver = 101; repeated string parameters = 1; - string definition = 2; + string definition = 2; } message UDFServer { @@ -49,6 +49,8 @@ message UDFScript { repeated DataType arg_types = 4; DataType return_type = 5; string runtime_version = 6; + repeated string imports = 7; + repeated string packages = 8; } message UDAFScript { @@ -61,6 +63,8 @@ message UDAFScript { DataType return_type = 4; repeated DataType arg_types = 5; repeated DataField state_fields = 6; + repeated string imports = 7; + repeated string packages = 8; } message UserDefinedFunction { diff --git a/src/query/ast/src/ast/statements/udf.rs b/src/query/ast/src/ast/statements/udf.rs index 3b324ebce4ade..3077b41a718db 100644 --- a/src/query/ast/src/ast/statements/udf.rs +++ b/src/query/ast/src/ast/statements/udf.rs @@ -18,7 +18,9 @@ use std::fmt::Formatter; use derive_visitor::Drive; use derive_visitor::DriveMut; +use itertools::Itertools; +use crate::ast::quote::QuotedString; use crate::ast::write_comma_separated_list; use crate::ast::CreateOption; use crate::ast::Expr; @@ -43,6 +45,8 @@ pub enum UDFDefinition { arg_types: Vec, return_type: TypeName, code: String, + imports: Vec, + packages: Vec, handler: String, language: String, runtime_version: String, @@ -59,6 +63,8 @@ pub enum UDFDefinition { arg_types: Vec, state_fields: Vec, return_type: TypeName, + imports: Vec, + packages: Vec, code: String, language: String, runtime_version: String, @@ -109,12 +115,23 @@ impl Display for UDFDefinition { handler, language, runtime_version: _, + imports, + packages, } => { write!(f, "( ")?; write_comma_separated_list(f, arg_types)?; + let imports = imports + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + let packages = packages + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); write!( f, - " ) RETURNS {return_type} LANGUAGE {language} HANDLER = '{handler}' AS $$\n{code}\n$$" + " ) RETURNS {return_type} LANGUAGE {language} IMPORTS = ({}) PACKAGES = ({}) HANDLER = '{handler}' AS $$\n{code}\n$$", + imports, packages )?; } UDFDefinition::UDAFServer { @@ -149,14 +166,26 @@ impl Display for UDFDefinition { code, language, runtime_version: _, + imports, + packages, } => { + let imports = imports + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + let packages = packages + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + write!(f, "( ")?; write_comma_separated_list(f, arg_types)?; write!(f, " ) STATE {{ ")?; write_comma_separated_list(f, state_types)?; write!( f, - " }} RETURNS {return_type} LANGUAGE {language} AS $$\n{code}\n$$" + " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = ({}) PACKAGES = ({}) AS $$\n{code}\n$$", + imports, packages )?; } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index ac1a7a89b34d8..c651f166ffd7f 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -4777,6 +4777,8 @@ pub fn udf_definition(i: Input) -> IResult { "(" ~ #comma_separated_list0(type_name) ~ ")" ~ RETURNS ~ #type_name ~ LANGUAGE ~ #ident + ~ ( IMPORTS ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? + ~ ( PACKAGES ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? ~ HANDLER ~ ^"=" ~ ^#literal_string ~ ( HEADERS ~ ^"=" ~ "(" ~ #comma_separated_list0(udf_header) ~ ")" )? ~ #udf_script_or_address @@ -4789,6 +4791,8 @@ pub fn udf_definition(i: Input) -> IResult { return_type, _, language, + imports, + packages, _, _, handler, @@ -4800,6 +4804,12 @@ pub fn udf_definition(i: Input) -> IResult { arg_types, return_type, code: address_or_code.0, + imports: imports + .map(|(_, _, _, imports, _)| imports) + .unwrap_or_default(), + packages: packages + .map(|(_, _, _, packages, _)| packages) + .unwrap_or_default(), handler, language: language.to_string(), // TODO inject runtime_version by user @@ -4827,6 +4837,8 @@ pub fn udf_definition(i: Input) -> IResult { ~ STATE ~ "{" ~ #comma_separated_list0(udaf_state_field) ~ "}" ~ RETURNS ~ #type_name ~ LANGUAGE ~ #ident + ~ ( IMPORTS ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? + ~ ( PACKAGES ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? ~ ( HEADERS ~ ^"=" ~ "(" ~ #comma_separated_list0(udf_header) ~ ")" )? ~ #udf_script_or_address }, @@ -4842,6 +4854,8 @@ pub fn udf_definition(i: Input) -> IResult { return_type, _, language, + imports, + packages, headers, address_or_code, )| { @@ -4852,6 +4866,12 @@ pub fn udf_definition(i: Input) -> IResult { return_type, code: address_or_code.0, language: language.to_string(), + imports: imports + .map(|(_, _, _, imports, _)| imports) + .unwrap_or_default(), + packages: packages + .map(|(_, _, _, packages, _)| packages) + .unwrap_or_default(), // TODO inject runtime_version by user // Now we use fixed runtime version runtime_version: "".to_string(), diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index d6b249ad24ab0..ded8e40d31a32 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -1342,6 +1342,10 @@ pub enum TokenKind { HEADERS, #[token("LANGUAGE", ignore(ascii_case))] LANGUAGE, + #[token("IMPORTS", ignore(ascii_case))] + IMPORTS, + #[token("PACKAGES", ignore(ascii_case))] + PACKAGES, #[token("STATE", ignore(ascii_case))] STATE, #[token("TASK", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 7f09b694db2e8..32de2ca45ff42 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -828,6 +828,8 @@ SELECT * from s;"#, create or replace function addone(int) returns int language python + imports = ('@ss/abc') + packages = ('numpy', 'pandas') handler = 'addone_py' as '@data/abc/a.py'; "#, diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 8960235081be5..da8c2a887c70b 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -983,7 +983,7 @@ error: --> SQL:1:85 | 1 | CREATE FUNCTION my_agg (INT) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript HANDLER = 'my_agg' ADDRESS = 'http://0.0.0.0:8815'; - | ------ - ^^^^^^^ unexpected `HANDLER`, expecting `HEADERS`, `ADDRESS`, or `AS` + | ------ - ^^^^^^^ unexpected `HANDLER`, expecting `HEADERS`, `ADDRESS`, `PACKAGES`, `AS`, or `IMPORTS` | | | | | while parsing (, ...) STATE {, ...} RETURNS LANGUAGE { ADDRESS= | AS } | while parsing `CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [DESC = ]` diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 182b30fa46c39..386d00e806847 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -24965,7 +24965,7 @@ def addone_py(i): return i+1 $$; ---------- Output --------- -CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python HANDLER = 'addone_py' AS $$ +CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python IMPORTS = () PACKAGES = () HANDLER = 'addone_py' AS $$ def addone_py(i): return i+1 $$ @@ -24988,6 +24988,8 @@ CreateUDF( ], return_type: Int32, code: "def addone_py(i):\nreturn i+1", + imports: [], + packages: [], handler: "addone_py", language: "python", runtime_version: "", @@ -25000,10 +25002,12 @@ CreateUDF( create or replace function addone(int) returns int language python +imports = ('@ss/abc') +packages = ('numpy', 'pandas') handler = 'addone_py' as '@data/abc/a.py'; ---------- Output --------- -CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python HANDLER = 'addone_py' AS $$ +CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python IMPORTS = ('@ss/abc') PACKAGES = ('numpy','pandas') HANDLER = 'addone_py' AS $$ @data/abc/a.py $$ ---------- AST ------------ @@ -25025,6 +25029,13 @@ CreateUDF( ], return_type: Int32, code: "@data/abc/a.py", + imports: [ + "@ss/abc", + ], + packages: [ + "numpy", + "pandas", + ], handler: "addone_py", language: "python", runtime_version: "", @@ -25115,7 +25126,7 @@ CreateUDF( ---------- Input ---------- CREATE FUNCTION IF NOT EXISTS my_agg (INT) STATE { s STRING, i INT NOT NULL } RETURNS BOOLEAN LANGUAGE javascript AS 'some code'; ---------- Output --------- -CREATE FUNCTION IF NOT EXISTS my_agg ( Int32 ) STATE { s STRING, i Int32 NOT NULL } RETURNS BOOLEAN LANGUAGE javascript AS $$ +CREATE FUNCTION IF NOT EXISTS my_agg ( Int32 ) STATE { s STRING, i Int32 NOT NULL } RETURNS BOOLEAN LANGUAGE javascript IMPORTS = () PACKAGES = () AS $$ some code $$ ---------- AST ------------ @@ -25162,6 +25173,8 @@ CreateUDF( }, ], return_type: Boolean, + imports: [], + packages: [], code: "some code", language: "javascript", runtime_version: "", @@ -25173,7 +25186,7 @@ CreateUDF( ---------- Input ---------- ALTER FUNCTION my_agg (INT) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript AS 'some code'; ---------- Output --------- -ALTER FUNCTION my_agg ( Int32 ) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript AS $$ +ALTER FUNCTION my_agg ( Int32 ) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript IMPORTS = () PACKAGES = () AS $$ some code $$ ---------- AST ------------ @@ -25206,6 +25219,8 @@ AlterUDF( }, ], return_type: Boolean, + imports: [], + packages: [], code: "some code", language: "javascript", runtime_version: "", diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index feb6e2225e35d..78b6430c3c7c6 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -613,6 +613,9 @@ def finish(state): language: UDFLanguage::Python, code: code.into(), runtime_version: "3.12".to_string(), + imports: vec![], + imports_stage_info: vec![], + packages: vec![], }; let name = "test".to_string(); let display_name = "test".to_string(); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 6c2db1990d755..35dc956565b17 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -16,10 +16,12 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::sync::LazyLock; use arrow_array::RecordBatch; use arrow_udf_runtime::javascript::FunctionOptions; use databend_common_base::runtime::GlobalIORuntime; +use databend_common_cache::Cache; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::ARROW_EXT_TYPE_VARIANT; @@ -38,6 +40,8 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; use databend_common_sql::plans::UDFType; +use databend_common_storage::init_stage_operator; +use tempfile::TempDir; use super::runtime_pool::Pool; use super::runtime_pool::RuntimeBuilder; @@ -49,9 +53,12 @@ pub enum ScriptRuntime { Python(python_pool::PyRuntimePool), } +static PY_VERSION: LazyLock = + LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string())); + impl ScriptRuntime { - pub fn try_create(func: &UdfFunctionDesc) -> Result { - let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { + pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { + let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; match language { @@ -81,10 +88,25 @@ impl ScriptRuntime { } #[cfg(feature = "python-udf")] UDFLanguage::Python => { + let code = String::from_utf8(code.to_vec())?; + let code = if let Some(temp_dir) = _temp_dir { + format!( + r#"import sys +sys._xoptions['databend_import_directory'] = '{}' +sys.path.append('{}') +{}"#, + temp_dir.path().display(), + temp_dir.path().display(), + code + ) + } else { + code + }; + let builder = PyRuntimeBuilder { name: func.name.clone(), handler: func.func_name.clone(), - code: String::from_utf8(code.to_vec())?, + code, output_type: func.data_type.as_ref().clone(), counter: Default::default(), }; @@ -228,12 +250,55 @@ pub struct PyRuntimeBuilder { mod python_pool { use super::*; + const RESTRICTED_PYTHON_CODE: &str = r#" +import os +import sys +from pathlib import Path + +ALLOWED_BASE = Path("/tmp") + +_original_open = open +_original_os_open = os.open if hasattr(os, 'open') else None + +def safe_open(file, mode='r', **kwargs): + file_path = Path(file).resolve() + + try: + file_path.relative_to(ALLOWED_BASE) + except ValueError: + raise PermissionError(f"Access denied: {file} is outside allowed directory") + + return _original_open(file, mode, **kwargs) + +def safe_os_open(path, flags, mode=0o777): + file_path = Path(path).resolve() + try: + file_path.relative_to(ALLOWED_BASE) + except ValueError: + raise PermissionError(f"Access denied: {path} is outside allowed directory") + return _original_os_open(path, flags, mode) + +import builtins, sys +if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: + builtins.open = safe_open + if _original_os_open: + os.open = safe_os_open + + dangerous_modules = ['subprocess', 'os.system', 'eval', 'exec', 'compile'] + for module in dangerous_modules: + if module in sys.modules: + del sys.modules[module] + sys._xoptions['DATABEND_RESTRICTED_PYTHON'] = '1' +"#; + impl RuntimeBuilder for PyRuntimeBuilder { type Error = ErrorCode; fn build(&self) -> Result { let start = std::time::Instant::now(); - let mut runtime = arrow_udf_runtime::python::Builder::default().build()?; + let mut runtime = arrow_udf_runtime::python::Builder::default() + .safe_codes(RESTRICTED_PYTHON_CODE.to_string()) + .build()?; runtime.add_function_with_handler( &self.name, arrow_field_from_data_type(&self.name, self.output_type.clone()), @@ -259,14 +324,14 @@ mod python_pool { pub struct TransformUdfScript { funcs: Vec, - script_runtimes: BTreeMap>, + script_runtimes: RuntimeTimeRes, } impl TransformUdfScript { pub fn new( _func_ctx: FunctionContext, funcs: Vec, - script_runtimes: BTreeMap>, + script_runtimes: RuntimeTimeRes, ) -> Self { Self { funcs, @@ -289,7 +354,7 @@ impl Transform for TransformUdfScript { let num_rows = data_block.num_rows(); let block_entries = self.prepare_block_entries(func, &data_block)?; let input_batch = self.create_input_batch(block_entries, num_rows)?; - let runtime = self.script_runtimes.get(&func.name).unwrap(); + let (runtime, _) = self.script_runtimes.get(&func.name).unwrap(); let result_batch = runtime.handle_execution(func, &input_batch)?; self.update_datablock(func, result_batch, &mut data_block)?; } @@ -297,30 +362,129 @@ impl Transform for TransformUdfScript { } } -impl TransformUdfScript { - pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result>> { - let mut script_runtimes: BTreeMap> = BTreeMap::new(); +type RuntimeTimeRes = BTreeMap, Option>)>; +impl TransformUdfScript { + pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result { + let mut script_runtimes = BTreeMap::new(); for func in funcs { - let code = match &func.udf_type { - UDFType::Script(code) => code, + let (code, code_str) = match &func.udf_type { + UDFType::Script(box script_code) => { + (script_code, String::from_utf8(script_code.code.to_vec())?) + } _ => continue, }; + let temp_dir = match &func.udf_type { + UDFType::Script(box UDFScriptCode { + language: UDFLanguage::Python, + packages, + imports_stage_info, + .. + }) => { + let mut dependencies = Self::extract_deps(&code_str); + dependencies.extend_from_slice(packages.as_slice()); + + let temp_dir = if !dependencies.is_empty() || !imports_stage_info.is_empty() { + // try to find the temp dir from cache + let key = venv::PyVenvKeyEntry { + udf_desc: func.clone(), + }; + let mut w = venv::PY_VENV_CACHE.write(); + let entry = w.get(&key); + if let Some(entry) = entry { + Some(entry.temp_dir.clone()) + } else { + let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); + venv::install_deps(temp_dir.path(), &dependencies)?; + + if !imports_stage_info.is_empty() { + let imports_stage_info = imports_stage_info.clone(); + let temp_dir_path = temp_dir.path(); + databend_common_base::runtime::block_on(async move { + let mut fts = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let name = path + .trim_end_matches('/') + .split('/') + .next_back() + .unwrap(); + let temp_file = temp_dir_path.join(name); + fts.push(async move { + let buffer = op.read(path).await?; + databend_common_base::base::tokio::fs::write( + &temp_file, + buffer.to_bytes().as_ref(), + ) + .await + }); + } + let _ = futures::future::join_all(fts).await; + Ok::<(), ErrorCode>(()) + })?; + } + + w.insert(key, venv::PyVenvCacheEntry { + temp_dir: temp_dir.clone(), + }); + + Some(temp_dir) + } + } else { + None + }; + + temp_dir + } + _ => None, + }; + if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) { - let runtime = ScriptRuntime::try_create(func).map_err(|err| { + let runtime = ScriptRuntime::try_create(func, temp_dir.clone()).map_err(|err| { ErrorCode::UDFDataError(format!( "Failed to create UDF runtime for language {:?} with error: {err}", code.language )) })?; - entry.insert(Arc::new(runtime)); + entry.insert((Arc::new(runtime), temp_dir)); }; } Ok(script_runtimes) } + fn extract_deps(script: &str) -> Vec { + let mut ss = String::new(); + let mut meta_start = false; + for line in script.lines() { + if meta_start { + if line.starts_with("# ///") { + break; + } + ss.push_str(line.trim_start_matches('#').trim()); + ss.push('\n'); + } + if !meta_start && line.starts_with("# /// script") { + meta_start = true; + } + } + + let parsed = ss.parse::().unwrap(); + + if parsed.get("dependencies").is_none() { + return Vec::new(); + } + + if let Some(deps) = parsed["dependencies"].as_array() { + deps.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + } else { + Vec::new() + } + } + fn prepare_block_entries( &self, func: &UdfFunctionDesc, @@ -431,3 +595,104 @@ impl TransformUdfScript { Ok(()) } } + +mod venv { + use std::path::Path; + use std::process::Command; + use std::sync::Arc; + use std::sync::LazyLock; + + use databend_common_cache::LruCache; + use databend_common_cache::MemSized; + use databend_common_sql::executor::physical_plans::UdfFunctionDesc; + use parking_lot::RwLock; + use tempfile::TempDir; + + pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { + if deps.is_empty() { + return Ok(()); + } + let target_path = temp_dir_path.display().to_string(); + let status = Command::new("python") + .args(["-m", "pip", "install"]) + .args(deps) + .args(["--target", &target_path]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map_err(|e| format!("Failed to install dependencies: {}", e))?; + + log::info!("Dependency installation success {}", deps.join(", ")); + + if status.success() { + Ok(()) + } else { + Err("Dependency installation failed".into()) + } + } + + pub fn create_venv(_python_version: &str) -> Result { + let temp_dir = + tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; + + // let env_path = temp_dir.path().join(".venv"); + // Command::new("python") + // .args(["-m", "venv", env_path.to_str().unwrap()]) + // .stdout(std::process::Stdio::null()) + // .stderr(std::process::Stdio::null()) + // .status() + // .map_err(|e| format!("Failed to create venv: {}", e))?; + + Ok(temp_dir) + } + + pub fn detect_python_version() -> Result { + let output = Command::new("python") + .arg("--version") + .output() + .map_err(|e| format!("Failed to detect python version: {}", e))?; + + if output.status.success() { + let version = String::from_utf8_lossy(&output.stdout); + let version = version + .trim() + .to_string() + .replace("Python ", "") + .split('.') + .take(2) + .collect::>() + .join("."); + Ok(version) + } else { + Err("Failed to detect python version".into()) + } + } + + // cached temp dir for python udf + // Add this after the PY_VERSION LazyLock declaration + // A simple LRU cache for Python virtual environments + #[derive(Clone)] + pub(crate) struct PyVenvCacheEntry { + pub(crate) temp_dir: Arc, + } + + #[derive(Eq, Hash, PartialEq)] + pub(crate) struct PyVenvKeyEntry { + pub(crate) udf_desc: UdfFunctionDesc, + } + + impl MemSized for PyVenvKeyEntry { + fn mem_bytes(&self) -> usize { + std::mem::size_of::() + } + } + + impl MemSized for PyVenvCacheEntry { + fn mem_bytes(&self) -> usize { + std::mem::size_of::() + } + } + + pub static PY_VENV_CACHE: LazyLock>> = + LazyLock::new(|| RwLock::new(LruCache::with_items_capacity(64))); +} diff --git a/src/query/sql/src/executor/physical_plans/physical_udf.rs b/src/query/sql/src/executor/physical_plans/physical_udf.rs index bc22c5dc1ee92..acfd628b36af3 100644 --- a/src/query/sql/src/executor/physical_plans/physical_udf.rs +++ b/src/query/sql/src/executor/physical_plans/physical_udf.rs @@ -55,7 +55,7 @@ impl Udf { } } -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)] pub struct UdfFunctionDesc { pub name: String, pub func_name: String, diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 7604b7aa78456..b8acbcf085766 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -631,24 +631,28 @@ fn check_transform_query( /// copy into mytable from @my_ext_stage /// file_format = (type = csv); /// ``` -/// +/// location can be: +/// - mystage +/// - mystage/ +/// - mystage/abc +/// - ~/abc /// Returns user's stage info and relative path towards the stage's root. /// /// If input location is empty we will convert it to `/` means the root of stage /// -/// - @mystage => (mystage, "/") +/// - mystage => (mystage, "/") /// /// If input location is endswith `/`, it's a folder. /// -/// - @mystage/ => (mystage, "/") +/// - mystage/ => (mystage, "/") /// /// Otherwise, it's a file /// -/// - @mystage/abc => (mystage, "abc") +/// - mystage/abc => (mystage, "abc") /// /// For internal stage, we will also add prefix `/stage//` /// -/// - @internal/abc => (internal, "/stage/internal/abc") +/// - ~/abc => (internal, "/stage/internal/abc") #[async_backtrace::framed] pub async fn resolve_stage_location( ctx: &dyn TableContext, @@ -672,6 +676,18 @@ pub async fn resolve_stage_location( Ok((stage, path.to_string())) } +#[async_backtrace::framed] +pub async fn resolve_stage_locations( + ctx: &dyn TableContext, + locations: &[String], +) -> Result> { + let mut results = Vec::with_capacity(locations.len()); + for location in locations { + results.push(resolve_stage_location(ctx, location).await?); + } + Ok(results) +} + #[async_backtrace::framed] pub async fn resolve_file_location( ctx: &dyn TableContext, diff --git a/src/query/sql/src/planner/binder/mod.rs b/src/query/sql/src/planner/binder/mod.rs index e4f71398bf523..36bf3dec60ed6 100644 --- a/src/query/sql/src/planner/binder/mod.rs +++ b/src/query/sql/src/planner/binder/mod.rs @@ -73,6 +73,7 @@ pub use column_binding::ColumnBindingBuilder; pub use column_binding::DummyColumnType; pub use copy_into_table::resolve_file_location; pub use copy_into_table::resolve_stage_location; +pub use copy_into_table::resolve_stage_locations; pub use default_expr::DefaultExprBinder; pub use explain::ExplainConfig; pub use internal_column_factory::INTERNAL_COLUMN_FACTORY; diff --git a/src/query/sql/src/planner/binder/udf.rs b/src/query/sql/src/planner/binder/udf.rs index e2e0e970908e0..713069f6f6640 100644 --- a/src/query/sql/src/planner/binder/udf.rs +++ b/src/query/sql/src/planner/binder/udf.rs @@ -146,6 +146,8 @@ impl Binder { handler, language, runtime_version, + imports, + packages, } => { UDFValidator::is_udf_script_allowed(&language.parse()?)?; let definition = create_udf_definition_script( @@ -153,6 +155,8 @@ impl Binder { None, return_type, runtime_version, + imports, + packages, handler, language, code, @@ -171,12 +175,16 @@ impl Binder { code, language, runtime_version, + imports, + packages, } => { let definition = create_udf_definition_script( arg_types, Some(state_fields), return_type, runtime_version, + imports, + packages, "", language, code, @@ -255,6 +263,8 @@ fn create_udf_definition_script( state_fields: Option<&[UDAFStateField]>, return_type: &TypeName, runtime_version: &str, + imports: &[String], + packages: &[String], handler: &str, language: &str, code: &str, @@ -302,6 +312,8 @@ fn create_udf_definition_script( Ok(PlanUDFDefinition::UDAFScript(UDAFScript { code: code.to_string(), arg_types, + imports: imports.to_vec(), + packages: packages.to_vec(), state_fields, return_type, language: language.to_string(), @@ -312,6 +324,8 @@ fn create_udf_definition_script( code: code.to_string(), arg_types, return_type, + imports: imports.to_vec(), + packages: packages.to_vec(), handler: handler.to_string(), language: language.to_string(), runtime_version, diff --git a/src/query/sql/src/planner/plans/aggregate.rs b/src/query/sql/src/planner/plans/aggregate.rs index e1c86ff0087c7..fe3611a29f18f 100644 --- a/src/query/sql/src/planner/plans/aggregate.rs +++ b/src/query/sql/src/planner/plans/aggregate.rs @@ -32,6 +32,7 @@ use crate::plans::RelOp; use crate::plans::ScalarItem; use crate::ColumnSet; use crate::IndexType; +use crate::ScalarExpr; #[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] pub enum AggregateMode { @@ -83,6 +84,20 @@ impl Default for Aggregate { } impl Aggregate { + pub fn get_distribution_keys(&self, before_partial: bool) -> Result> { + if before_partial { + self.group_items + .iter() + .enumerate() + .map(|(index, item)| item.bound_column_expr(format!("_group_item_{}", index))) + .collect() + } else { + Ok(vec![ + self.group_items[0].bound_column_expr("_group_item_0".to_string())? + ]) + } + } + pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); for group_item in self.group_items.iter() { @@ -226,14 +241,11 @@ impl Operator for Aggregate { // Group aggregation, enforce `Hash` distribution required.distribution = match settings.get_group_by_shuffle_mode()?.as_str() { - "before_partial" => Ok(Distribution::Hash( - self.group_items - .iter() - .map(|item| item.scalar.clone()) - .collect(), - )), + "before_partial" => { + Ok(Distribution::Hash(self.get_distribution_keys(true)?)) + } "before_merge" => { - Ok(Distribution::Hash(vec![self.group_items[0].scalar.clone()])) + Ok(Distribution::Hash(self.get_distribution_keys(false)?)) } value => Err(ErrorCode::Internal(format!( "Bad settings value group_by_shuffle_mode = {:?}", @@ -324,19 +336,14 @@ impl Operator for Aggregate { match settings.get_group_by_shuffle_mode()?.as_str() { "before_partial" => { children_required.push(vec![RequiredProperty { - distribution: Distribution::Hash( - self.group_items - .iter() - .map(|item| item.scalar.clone()) - .collect(), - ), + distribution: Distribution::Hash(self.get_distribution_keys(true)?), }]); } "before_merge" => { children_required.push(vec![RequiredProperty { - distribution: Distribution::Hash(vec![self.group_items[0] - .scalar - .clone()]), + distribution: Distribution::Hash( + self.get_distribution_keys(false)?, + ), }]); } value => { diff --git a/src/query/sql/src/planner/plans/eval_scalar.rs b/src/query/sql/src/planner/plans/eval_scalar.rs index 06e394360179e..fc108fbbdaadc 100644 --- a/src/query/sql/src/planner/plans/eval_scalar.rs +++ b/src/query/sql/src/planner/plans/eval_scalar.rs @@ -19,11 +19,15 @@ use databend_common_exception::Result; use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::RelationalProperty; use crate::optimizer::ir::StatInfo; +use crate::plans::BoundColumnRef; use crate::plans::Operator; use crate::plans::RelOp; use crate::plans::ScalarExpr; +use crate::ColumnBinding; +use crate::ColumnBindingBuilder; use crate::ColumnSet; use crate::IndexType; +use crate::Visibility; /// Evaluate scalar expression #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -38,6 +42,31 @@ pub struct ScalarItem { pub index: IndexType, } +impl ScalarItem { + pub fn column_binding(&self, name: String) -> Result { + Ok(ColumnBindingBuilder::new( + name, + self.index, + Box::new(self.scalar.data_type()?), + Visibility::Visible, + ) + .build()) + } + + pub fn bound_column_expr(&self, name: String) -> Result { + if let ScalarExpr::BoundColumnRef(_) = &self.scalar { + return Ok(self.scalar.clone()); + } + + let column_binding = self.column_binding(name)?; + Ok(BoundColumnRef { + span: None, + column: column_binding, + } + .into()) + } +} + impl EvalScalar { pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 0390be9715b04..5563815cc4347 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -35,6 +35,7 @@ use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_functions::aggregates::AggregateFunctionSortDesc; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_meta_app::tenant::Tenant; @@ -1101,17 +1102,22 @@ impl Display for UDFLanguage { } } -#[derive(Clone, Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Educe, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[educe(Hash(bound = false))] pub struct UDFScriptCode { pub language: UDFLanguage, pub runtime_version: String, + #[educe(Hash(ignore))] + pub imports_stage_info: Vec<(StageInfo, String)>, + pub imports: Vec, + pub packages: Vec, pub code: Arc>, } #[derive(Clone, Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, EnumAsInner)] pub enum UDFType { Server(String), // server_addr - Script(UDFScriptCode), + Script(Box), } impl UDFType { diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 6ef6c5274a5e9..cc667c1914be6 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -125,6 +125,7 @@ use super::name_resolution::NameResolutionContext; use super::normalize_identifier; use crate::binder::bind_values; use crate::binder::resolve_file_location; +use crate::binder::resolve_stage_locations; use crate::binder::wrap_cast; use crate::binder::Binder; use crate::binder::ExprContext; @@ -4608,6 +4609,8 @@ impl<'a> TypeChecker<'a> { arg_types, return_type, runtime_version, + imports, + packages, } = udf_definition; let language = language.parse()?; @@ -4624,11 +4627,23 @@ impl<'a> TypeChecker<'a> { let code_blob = databend_common_base::runtime::block_on(self.resolve_udf_with_stage(code))? .into_boxed_slice(); - let udf_type = UDFType::Script(UDFScriptCode { + + let imports_stage_info = databend_common_base::runtime::block_on(resolve_stage_locations( + self.ctx.as_ref(), + &imports + .iter() + .map(|s| s.trim_start_matches('@').to_string()) + .collect::>(), + ))?; + + let udf_type = UDFType::Script(Box::new(UDFScriptCode { language, runtime_version, code: code_blob.into(), - }); + imports_stage_info, + imports, + packages, + })); let arg_names = args.iter().map(|arg| format!("{arg}")).join(", "); let display_name = format!("{}({})", &handler, arg_names); @@ -4666,15 +4681,28 @@ impl<'a> TypeChecker<'a> { state_fields, return_type, runtime_version, + imports, + packages, } = udf_definition; let language = language.parse()?; let code_blob = databend_common_base::runtime::block_on(self.resolve_udf_with_stage(code))? .into_boxed_slice(); - let udf_type = UDFType::Script(UDFScriptCode { + let imports_stage_info = databend_common_base::runtime::block_on(resolve_stage_locations( + self.ctx.as_ref(), + &imports + .iter() + .map(|s| s.trim_start_matches('@').to_string()) + .collect::>(), + ))?; + + let udf_type = UDFType::Script(Box::new(UDFScriptCode { language, runtime_version, code: code_blob.into(), - }); + imports, + imports_stage_info, + packages, + })); let arguments = args .iter() diff --git a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test index a3c6a34146899..3f51e6856e566 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test +++ b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test @@ -66,6 +66,9 @@ select number, gcd(number * 3, number * 6), gcd(3, gcd(number * 3, number * 6)) 3 9 3 4 12 3 +statement ok +select gcd(number * 3, number) as c, uniq( gcd(number * 3, number * 2) ) from numbers(100) group by c; + statement ok DROP FUNCTION gcd diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test b/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test index 306de0021988b..af6263b7c7b55 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test @@ -111,6 +111,9 @@ select count(*) from tmp; ---- 1000000 +statement ok +select nextval(seq) % 3 as c from numbers(1000000) group by c order by c; + statement ok DESC SEQUENCE SEQ diff --git a/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test b/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test index 059763c946573..36265872dafa6 100644 --- a/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test +++ b/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test @@ -1,7 +1,19 @@ ## enable it when compiled with ee feature ## statement ok ## CREATE OR REPLACE FUNCTION gcd_py (INT, INT) RETURNS BIGINT LANGUAGE python HANDLER = 'gcd' AS $$ +## # /// script +## # requires-python = ">=3.12" +## # dependencies = ["numpy", "pandas"] +## # /// +## import numpy as np +## import pandas as pd +## ## def gcd(a: int, b: int) -> int: +## x = int(pd.DataFrame(np.random.rand(3, 3)).sum().sum()) +## a += x +## b -= x +## a -= x +## b += x ## while b: ## a, b = b, a % b ## return a