Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::LazyLock;

use arrow_array::RecordBatch;
use arrow_udf_runtime::javascript::FunctionOptions;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_cache::Cache;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::converts::arrow::ARROW_EXT_TYPE_VARIANT;
Expand All @@ -38,6 +40,7 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc;
use databend_common_sql::plans::UDFLanguage;
use databend_common_sql::plans::UDFScriptCode;
use databend_common_sql::plans::UDFType;
use tempfile::TempDir;

use super::runtime_pool::Pool;
use super::runtime_pool::RuntimeBuilder;
Expand All @@ -49,8 +52,11 @@ pub enum ScriptRuntime {
Python(python_pool::PyRuntimePool),
}

static PY_VERSION: LazyLock<String> =
LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string()));

impl ScriptRuntime {
pub fn try_create(func: &UdfFunctionDesc) -> Result<Self> {
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option<Arc<TempDir>>) -> Result<Self> {
let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else {
unreachable!()
};
Expand Down Expand Up @@ -81,10 +87,21 @@ impl ScriptRuntime {
}
#[cfg(feature = "python-udf")]
UDFLanguage::Python => {
let code = String::from_utf8(code.to_vec())?;
let code = if let Some(temp_dir) = _temp_dir {
format!(
"import sys\nsys.path.append('{}')\n{}",
temp_dir.path().display(),
code
)
} else {
code
};

let builder = PyRuntimeBuilder {
name: func.name.clone(),
handler: func.func_name.clone(),
code: String::from_utf8(code.to_vec())?,
code,
output_type: func.data_type.as_ref().clone(),
counter: Default::default(),
};
Expand Down Expand Up @@ -259,14 +276,14 @@ mod python_pool {

pub struct TransformUdfScript {
funcs: Vec<UdfFunctionDesc>,
script_runtimes: BTreeMap<String, Arc<ScriptRuntime>>,
script_runtimes: RuntimeTimeRes,
}

impl TransformUdfScript {
pub fn new(
_func_ctx: FunctionContext,
funcs: Vec<UdfFunctionDesc>,
script_runtimes: BTreeMap<String, Arc<ScriptRuntime>>,
script_runtimes: RuntimeTimeRes,
) -> Self {
Self {
funcs,
Expand All @@ -289,38 +306,97 @@ impl Transform for TransformUdfScript {
let num_rows = data_block.num_rows();
let block_entries = self.prepare_block_entries(func, &data_block)?;
let input_batch = self.create_input_batch(block_entries, num_rows)?;
let runtime = self.script_runtimes.get(&func.name).unwrap();
let (runtime, _) = self.script_runtimes.get(&func.name).unwrap();
let result_batch = runtime.handle_execution(func, &input_batch)?;
self.update_datablock(func, result_batch, &mut data_block)?;
}
Ok(data_block)
}
}

impl TransformUdfScript {
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<BTreeMap<String, Arc<ScriptRuntime>>> {
let mut script_runtimes: BTreeMap<String, Arc<ScriptRuntime>> = BTreeMap::new();
type RuntimeTimeRes = BTreeMap<String, (Arc<ScriptRuntime>, Option<Arc<TempDir>>)>;

impl TransformUdfScript {
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<RuntimeTimeRes> {
let mut script_runtimes = BTreeMap::new();
for func in funcs {
let code = match &func.udf_type {
UDFType::Script(code) => code,
let (code, code_str) = match &func.udf_type {
UDFType::Script(script_code) => {
(script_code, String::from_utf8(script_code.code.to_vec())?)
}
_ => continue,
};

let temp_dir = match &func.udf_type {
UDFType::Script(UDFScriptCode {
language: UDFLanguage::Python,
..
}) => {
let dependencies = Self::extract_deps(&code_str);
if !dependencies.is_empty() {
// try to find the temp dir from cache
let key = venv::PyVenvKeyEntry {
udf_desc: func.clone(),
};
let mut w = venv::PY_VENV_CACHE.write();
let entry = w.get(&key);
if let Some(entry) = entry {
Some(entry.temp_dir.clone())
} else {
let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?);
venv::install_deps(temp_dir.path(), &dependencies)?;
w.insert(key, venv::PyVenvCacheEntry {
temp_dir: temp_dir.clone(),
});
Some(temp_dir)
}
} else {
None
}
}
_ => None,
};

if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) {
let runtime = ScriptRuntime::try_create(func).map_err(|err| {
let runtime = ScriptRuntime::try_create(func, temp_dir.clone()).map_err(|err| {
ErrorCode::UDFDataError(format!(
"Failed to create UDF runtime for language {:?} with error: {err}",
code.language
))
})?;
entry.insert(Arc::new(runtime));
entry.insert((Arc::new(runtime), temp_dir));
};
}

Ok(script_runtimes)
}

fn extract_deps(script: &str) -> Vec<String> {
let mut ss = String::new();
let mut meta_start = false;
for line in script.lines() {
if meta_start {
if line.starts_with("# ///") {
break;
}
ss.push_str(line.trim_start_matches('#').trim());
ss.push('\n');
}
if !meta_start && line.starts_with("# /// script") {
meta_start = true;
}
}

let parsed = ss.parse::<toml::Value>().unwrap();
if let Some(deps) = parsed["dependencies"].as_array() {
deps.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
} else {
Vec::new()
}
}

fn prepare_block_entries(
&self,
func: &UdfFunctionDesc,
Expand Down Expand Up @@ -431,3 +507,101 @@ impl TransformUdfScript {
Ok(())
}
}

mod venv {
use std::path::Path;
use std::process::Command;
use std::sync::Arc;
use std::sync::LazyLock;

use databend_common_cache::LruCache;
use databend_common_cache::MemSized;
use databend_common_sql::executor::physical_plans::UdfFunctionDesc;
use parking_lot::RwLock;
use tempfile::TempDir;

pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> {
let target_path = temp_dir_path.display().to_string();
let status = Command::new("python")
.args(["-m", "pip", "install"])
.args(deps)
.args(["--target", &target_path])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map_err(|e| format!("Failed to install dependencies: {}", e))?;

log::info!("Dependency installation success {}", deps.join(", "));

if status.success() {
Ok(())
} else {
Err("Dependency installation failed".into())
}
}

pub fn create_venv(_python_version: &str) -> Result<TempDir, String> {
let temp_dir =
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;

// let env_path = temp_dir.path().join(".venv");
// Command::new("python")
// .args(["-m", "venv", env_path.to_str().unwrap()])
// .stdout(std::process::Stdio::null())
// .stderr(std::process::Stdio::null())
// .status()
// .map_err(|e| format!("Failed to create venv: {}", e))?;

Ok(temp_dir)
}

pub fn detect_python_version() -> Result<String, String> {
let output = Command::new("python")
.arg("--version")
.output()
.map_err(|e| format!("Failed to detect python version: {}", e))?;

if output.status.success() {
let version = String::from_utf8_lossy(&output.stdout);
let version = version
.trim()
.to_string()
.replace("Python ", "")
.split('.')
.take(2)
.collect::<Vec<_>>()
.join(".");
Ok(version)
} else {
Err("Failed to detect python version".into())
}
}

// cached temp dir for python udf
// Add this after the PY_VERSION LazyLock declaration
// A simple LRU cache for Python virtual environments
#[derive(Clone)]
pub(crate) struct PyVenvCacheEntry {
pub(crate) temp_dir: Arc<TempDir>,
}

#[derive(Eq, Hash, PartialEq)]
pub(crate) struct PyVenvKeyEntry {
pub(crate) udf_desc: UdfFunctionDesc,
}

impl MemSized for PyVenvKeyEntry {
fn mem_bytes(&self) -> usize {
std::mem::size_of::<PyVenvKeyEntry>()
}
}

impl MemSized for PyVenvCacheEntry {
fn mem_bytes(&self) -> usize {
std::mem::size_of::<PyVenvCacheEntry>()
}
}

pub static PY_VENV_CACHE: LazyLock<RwLock<LruCache<PyVenvKeyEntry, PyVenvCacheEntry>>> =
LazyLock::new(|| RwLock::new(LruCache::with_items_capacity(64)));
}
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plans/physical_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Udf {
}
}

#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct UdfFunctionDesc {
pub name: String,
pub func_name: String,
Expand Down
12 changes: 12 additions & 0 deletions tests/sqllogictests/suites/udf_native/03_0001_udf_py.test
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
## enable it when compiled with ee feature
## statement ok
## CREATE OR REPLACE FUNCTION gcd_py (INT, INT) RETURNS BIGINT LANGUAGE python HANDLER = 'gcd' AS $$
## # /// script
## # requires-python = ">=3.12"
## # dependencies = ["numpy", "pandas"]
## # ///
## import numpy as np
## import pandas as pd
##
## def gcd(a: int, b: int) -> int:
## x = int(pd.DataFrame(np.random.rand(3, 3)).sum().sum())
## a += x
## b -= x
## a -= x
## b += x
## while b:
## a, b = b, a % b
## return a
Expand Down
Loading