From 47e5e9ca3147dbaca81f122883d88928d23d31aa Mon Sep 17 00:00:00 2001 From: Clifton Cunningham Date: Fri, 27 Mar 2026 05:31:13 +0100 Subject: [PATCH 1/3] feat: support server-side parameter binding via /v1/query params field ref https://github.com/databendlabs/bendsql/issues/759 When the server supports it (version > 1.2.900), send raw SQL with a JSON `params` field instead of interpolating parameters client-side. Falls back to client-side interpolation for older servers or when SQL contains `$N` column position placeholders (which the server uses for stage column refs). Changes: - Add `params` field to `QueryRequest` (core) - Add `server_side_params` capability flag with version threshold - Thread params through `start_query` / `query_all` in core client - Add `Params::to_json_value()` with `sql_string_to_json()` reverse parser - Add `PlaceholderVisitor::has_dollar_positions()` for `$N` detection - Add `*_with_params()` methods to `IConnection` trait with defaults - Override in `RestAPIConnection` to pass params to server - Route in `QueryBuilder`/`ExecBuilder`: server-side when supported and no `$N`, client-side otherwise - Add `to_json_params()` helper in Python bindings for future use --- bindings/python/Cargo.toml | 1 + bindings/python/src/utils.rs | 50 +++++++++++++++++ core/src/capability.rs | 3 ++ core/src/client.rs | 32 ++++++++--- core/src/request.rs | 22 ++++++++ core/tests/core/retry.rs | 10 ++-- core/tests/core/simple.rs | 5 +- core/tests/core/stage.rs | 6 +-- driver/src/client.rs | 101 +++++++++++++++++++++++++++++++++++ driver/src/conn.rs | 24 +++++++++ driver/src/params.rs | 78 +++++++++++++++++++++++++++ driver/src/placeholder.rs | 6 +++ driver/src/rest_api.rs | 40 ++++++++++++-- 13 files changed, 357 insertions(+), 21 deletions(-) diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 80c423cdd..ee03ca899 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -19,6 +19,7 @@ databend-driver = { workspace = true, features = ["rustls", "flight-sql"] } databend-driver-core = { workspace = true } tokio-stream = { workspace = true } +serde_json = "1.0" ctor = "0.2" env_logger = "0.11.8" http = "1.0" diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index aa2225d39..33ea21f41 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -96,6 +96,56 @@ fn to_sql_string(v: Bound) -> PyResult { } } +/// Convert Python params directly to JSON values, preserving native types. +pub(crate) fn to_json_params(v: Option>) -> Option { + match v { + Some(v) => { + if let Ok(v) = v.downcast::() { + let mut map = serde_json::Map::new(); + for (k, v) in v.iter() { + let k = k.extract::().unwrap(); + let v = py_to_json(v).unwrap(); + map.insert(k, v); + } + Some(serde_json::Value::Object(map)) + } else if let Ok(v) = v.downcast::() { + let arr: Vec = + v.iter().map(|v| py_to_json(v).unwrap()).collect(); + Some(serde_json::Value::Array(arr)) + } else if let Ok(v) = v.downcast::() { + let arr: Vec = + v.iter().map(|v| py_to_json(v).unwrap()).collect(); + Some(serde_json::Value::Array(arr)) + } else { + Some(serde_json::Value::Array(vec![py_to_json(v).unwrap()])) + } + } + None => None, + } +} + +fn py_to_json(v: Bound) -> PyResult { + if v.is_none() { + return Ok(serde_json::Value::Null); + } + // Check bool before int (bool is a subclass of int in Python) + if let Ok(v) = v.extract::() { + return Ok(serde_json::Value::Bool(v)); + } + if let Ok(v) = v.extract::() { + return Ok(serde_json::json!(v)); + } + if let Ok(v) = v.extract::() { + return Ok(serde_json::json!(v)); + } + if let Ok(v) = v.extract::() { + return Ok(serde_json::Value::String(v)); + } + Err(PyAttributeError::new_err(format!( + "Invalid parameter type for: {v:?}, expected str, bool, int or float" + ))) +} + pub(super) fn options_as_ref( format_options: &Option>, ) -> Option> { diff --git a/core/src/capability.rs b/core/src/capability.rs index 140da08f8..d015dd8d8 100644 --- a/core/src/capability.rs +++ b/core/src/capability.rs @@ -18,6 +18,7 @@ use semver::Version; pub struct Capability { pub streaming_load: bool, pub arrow_data: bool, + pub server_side_params: bool, } impl Capability { @@ -25,6 +26,8 @@ impl Capability { Capability { streaming_load: ver > &Version::new(1, 2, 781), arrow_data: ver > &Version::new(1, 2, 835), + // TODO: update version threshold when server PR gets a release version + server_side_params: ver > &Version::new(1, 2, 900), } } } diff --git a/core/src/client.rs b/core/src/client.rs index dcad741ce..988424282 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -553,9 +553,14 @@ impl APIClient { } } - pub async fn start_query(self: &Arc, sql: &str, need_progress: bool) -> Result { + pub async fn start_query( + self: &Arc, + sql: &str, + need_progress: bool, + params: Option, + ) -> Result { info!("start query: {sql}"); - let (resp, batches) = self.start_query_inner(sql, None, false).await?; + let (resp, batches) = self.start_query_inner(sql, None, false, params).await?; Pages::new(self.clone(), resp, batches, need_progress) } @@ -589,6 +594,7 @@ impl APIClient { sql: &str, stage_attachment_config: Option>, force_json_body: bool, + params: Option, ) -> Result<(QueryResponse, Vec)> { if !self.in_active_transaction() { self.route_hint.next(); @@ -601,7 +607,8 @@ impl APIClient { let req = QueryRequest::new(sql) .with_pagination(self.make_pagination()) .with_session(Some(session_state)) - .with_stage_attachment(stage_attachment_config); + .with_stage_attachment(stage_attachment_config) + .with_params(params); // headers let query_id = self.gen_query_id(); @@ -766,16 +773,23 @@ impl APIClient { Ok(()) } - pub async fn query_all(self: &Arc, sql: &str) -> Result { - self.query_all_inner(sql, false).await + pub async fn query_all( + self: &Arc, + sql: &str, + params: Option, + ) -> Result { + self.query_all_inner(sql, false, params).await } pub async fn query_all_inner( self: &Arc, sql: &str, force_json_body: bool, + params: Option, ) -> Result { - let (resp, batches) = self.start_query_inner(sql, None, force_json_body).await?; + let (resp, batches) = self + .start_query_inner(sql, None, force_json_body, params) + .await?; let mut pages = Pages::new(self.clone(), resp, batches, false)?; let mut all = Page::default(); while let Some(page) = pages.next().await { @@ -842,7 +856,9 @@ impl APIClient { file_format_options: Some(file_format_options), copy_options: Some(copy_options), }); - let (resp, batches) = self.start_query_inner(sql, stage_attachment, true).await?; + let (resp, batches) = self + .start_query_inner(sql, stage_attachment, true, None) + .await?; let mut pages = Pages::new(self.clone(), resp, batches, false)?; let mut all = Page::default(); while let Some(page) = pages.next().await { @@ -854,7 +870,7 @@ impl APIClient { async fn get_presigned_upload_url(self: &Arc, stage: &str) -> Result { info!("get presigned upload url: {stage}"); let sql = format!("PRESIGN UPLOAD {stage}"); - let resp = self.query_all_inner(&sql, true).await?; + let resp = self.query_all_inner(&sql, true, None).await?; if resp.data.len() != 1 { return Err(Error::Decode( "Empty response from server for presigned request".to_string(), diff --git a/core/src/request.rs b/core/src/request.rs index 6857bb7ab..cc8e35c48 100644 --- a/core/src/request.rs +++ b/core/src/request.rs @@ -26,6 +26,8 @@ pub struct QueryRequest<'a> { pagination: Option, #[serde(skip_serializing_if = "Option::is_none")] stage_attachment: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + params: Option, } #[derive(Serialize, Debug)] @@ -54,6 +56,7 @@ impl<'r, 't: 'r> QueryRequest<'r> { sql, pagination: None, stage_attachment: None, + params: None, } } @@ -74,6 +77,11 @@ impl<'r, 't: 'r> QueryRequest<'r> { self.stage_attachment = stage_attachment; self } + + pub fn with_params(mut self, params: Option) -> Self { + self.params = params; + self + } } #[cfg(test)] @@ -103,4 +111,18 @@ mod test { ); Ok(()) } + + #[test] + fn build_request_with_params() -> Result<()> { + let req = QueryRequest::new("SELECT ? + ?").with_params(Some(serde_json::json!([1, 2]))); + assert_eq!( + serde_json::to_string(&req)?, + r#"{"sql":"SELECT ? + ?","params":[1,2]}"# + ); + + // params=None should not appear in serialized output + let req = QueryRequest::new("SELECT 1").with_params(None); + assert_eq!(serde_json::to_string(&req)?, r#"{"sql":"SELECT 1"}"#); + Ok(()) + } } diff --git a/core/tests/core/retry.rs b/core/tests/core/retry.rs index 1a09cba82..2a53b7b84 100644 --- a/core/tests/core/retry.rs +++ b/core/tests/core/retry.rs @@ -262,7 +262,7 @@ async fn retry_503_then_success() { let dsn = build_dsn(port, 2, ""); let client = APIClient::new(&dsn, None).await.unwrap(); - let result = client.query_all("select 42").await.unwrap(); + let result = client.query_all("select 42", None).await.unwrap(); assert_eq!(result.data, vec![vec![Some("42".to_string())]]); assert_eq!(requests.lock().unwrap().len(), 2); @@ -303,7 +303,7 @@ async fn retry_401_with_access_token_file_reload_then_success() { &format!("access_token_file={}", token_file.to_string_lossy()), ); let client = APIClient::new(&dsn, None).await.unwrap(); - let result = client.query_all("select 'reloaded'").await.unwrap(); + let result = client.query_all("select 'reloaded'", None).await.unwrap(); assert_eq!(result.data, vec![vec![Some("reloaded".to_string())]]); assert_eq!(requests.lock().unwrap().len(), 2); @@ -329,7 +329,7 @@ async fn retry_401_auth_reload_stops_at_max_retries() { &format!("access_token_file={}", token_file.to_string_lossy()), ); let client = APIClient::new(&dsn, None).await.unwrap(); - let err = match client.query_all("select 1").await { + let err = match client.query_all("select 1", None).await { Ok(_) => panic!("expected unauthorized error"), Err(err) => err, }; @@ -360,7 +360,7 @@ async fn start_query_404_keeps_logic_error() { let dsn = build_dsn(port, 2, ""); let client = APIClient::new(&dsn, None).await.unwrap(); - let err = match client.query_all("select 1").await { + let err = match client.query_all("select 1", None).await { Ok(_) => panic!("expected logic error"), Err(err) => err, }; @@ -403,7 +403,7 @@ async fn query_page_404_maps_to_query_not_found() { let dsn = build_dsn(port, 2, ""); let client = APIClient::new(&dsn, None).await.unwrap(); - let err = match client.query_all("select 1").await { + let err = match client.query_all("select 1", None).await { Ok(_) => panic!("expected QueryNotFound"), Err(err) => err, }; diff --git a/core/tests/core/simple.rs b/core/tests/core/simple.rs index 73137aa4e..9ccd0d62a 100644 --- a/core/tests/core/simple.rs +++ b/core/tests/core/simple.rs @@ -21,7 +21,10 @@ use crate::common::DEFAULT_DSN; async fn select_simple() { let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN); let client = APIClient::new(dsn, None).await.unwrap(); - let mut pages = client.start_query("select 15532", true).await.unwrap(); + let mut pages = client + .start_query("select 15532", true, None) + .await + .unwrap(); let page = pages.next().await.unwrap().unwrap(); assert_eq!(page.data, [[Some("15532".to_string())]]); } diff --git a/core/tests/core/stage.rs b/core/tests/core/stage.rs index 5ca3a7b7b..2e16f64dc 100644 --- a/core/tests/core/stage.rs +++ b/core/tests/core/stage.rs @@ -48,7 +48,7 @@ async fn insert_with_stage(presign: bool) { .await .unwrap(); let sql = format!("CREATE TABLE `{table}` (id UInt64, city String, number UInt64)"); - client.query_all(&sql).await.unwrap(); + client.query_all(&sql, None).await.unwrap(); let sql = format!("INSERT INTO `{table}` VALUES"); let file_format_options = vec![ @@ -68,7 +68,7 @@ async fn insert_with_stage(presign: bool) { .unwrap(); let sql = format!("SELECT * FROM `{table}`"); - let resp = client.query_all(&sql).await.unwrap(); + let resp = client.query_all(&sql, None).await.unwrap(); assert_eq!(resp.data.len(), 6); let expect = [ ["1", "Beijing", "100"], @@ -90,7 +90,7 @@ async fn insert_with_stage(presign: bool) { assert_eq!(result, expect); let sql = format!("DROP TABLE `{table}`;"); - client.query_all(&sql).await.unwrap(); + client.query_all(&sql, None).await.unwrap(); } #[tokio::test] diff --git a/driver/src/client.rs b/driver/src/client.rs index c925fb300..b763dc1a8 100644 --- a/driver/src/client.rs +++ b/driver/src/client.rs @@ -22,14 +22,17 @@ use url::Url; use crate::conn::IConnection; #[cfg(feature = "flight-sql")] use crate::flight_sql::FlightSQLConnection; +use crate::placeholder::PlaceholderVisitor; use crate::ConnectionInfo; use crate::Params; use databend_client::PresignedResponse; +use databend_common_ast::parser::Dialect; use databend_driver_core::error::{Error, Result}; use databend_driver_core::raw_rows::{RawRow, RawRowIterator}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, ServerStats}; use databend_driver_core::value::Value; +use tokio_stream::StreamExt; use crate::rest_api::RestAPIConnection; @@ -448,21 +451,66 @@ impl<'a> QueryBuilder<'a> { } pub async fn iter(self) -> Result { + if let Some(params) = &self.params { + if self.should_use_server_side_params() { + let json_params = params.to_json_value(); + return self + .connection + .inner + .query_iter_with_params(&self.sql, Some(json_params)) + .await; + } + } let sql = self.get_final_sql(); self.connection.inner.query_iter(&sql).await } pub async fn iter_ext(self) -> Result { + if let Some(params) = &self.params { + if self.should_use_server_side_params() { + let json_params = params.to_json_value(); + return self + .connection + .inner + .query_iter_ext_with_params(&self.sql, Some(json_params)) + .await; + } + } let sql = self.get_final_sql(); self.connection.inner.query_iter_ext(&sql).await } pub async fn one(self) -> Result> { + if let Some(params) = &self.params { + if self.should_use_server_side_params() { + let json_params = params.to_json_value(); + let mut rows = self + .connection + .inner + .query_iter_with_params(&self.sql, Some(json_params)) + .await?; + return match rows.next().await { + Some(r) => Ok(Some(r?)), + None => Ok(None), + }; + } + } let sql = self.get_final_sql(); self.connection.inner.query_row(&sql).await } pub async fn all(self) -> Result> { + if let Some(params) = &self.params { + if self.should_use_server_side_params() { + let json_params = params.to_json_value(); + let rows = self + .connection + .inner + .query_iter_with_params(&self.sql, Some(json_params)) + .await?; + return rows.collect().await; + } + } let sql = self.get_final_sql(); self.connection.inner.query_all(&sql).await } @@ -482,6 +530,29 @@ impl<'a> QueryBuilder<'a> { Ok(QueryCursor::new(row_iter)) } + /// Check if we should use server-side parameter binding. + /// Returns true when the server supports it AND the SQL does not contain `$N` placeholders. + fn should_use_server_side_params(&self) -> bool { + if !self.connection.inner.supports_server_side_params() { + return false; + } + !self.has_dollar_placeholders() + } + + /// Detect if the SQL contains `$N` column position placeholders. + fn has_dollar_placeholders(&self) -> bool { + let tokens = match databend_common_ast::parser::tokenize_sql(&self.sql) { + Ok(t) => t, + Err(_) => return false, + }; + if let Ok((stmt, _)) = databend_common_ast::parser::parse_sql(&tokens, Dialect::PostgreSQL) + { + let mut visitor = PlaceholderVisitor::new(); + return visitor.has_dollar_positions(&stmt); + } + false + } + fn get_final_sql(&self) -> String { match &self.params { Some(params) => params.replace(&self.sql), @@ -512,12 +583,42 @@ impl<'a> ExecBuilder<'a> { } pub async fn execute(self) -> Result { + if let Some(ref params) = self.params { + if self.should_use_server_side_params() { + let json_params = params.to_json_value(); + return self + .connection + .inner + .exec_with_params(&self.sql, Some(json_params)) + .await; + } + } let sql = match self.params { Some(params) => params.replace(&self.sql), None => self.sql, }; self.connection.inner.exec(&sql).await } + + fn should_use_server_side_params(&self) -> bool { + if !self.connection.inner.supports_server_side_params() { + return false; + } + !self.has_dollar_placeholders() + } + + fn has_dollar_placeholders(&self) -> bool { + let tokens = match databend_common_ast::parser::tokenize_sql(&self.sql) { + Ok(t) => t, + Err(_) => return false, + }; + if let Ok((stmt, _)) = databend_common_ast::parser::parse_sql(&tokens, Dialect::PostgreSQL) + { + let mut visitor = PlaceholderVisitor::new(); + return visitor.has_dollar_positions(&stmt); + } + false + } } impl<'a> std::future::IntoFuture for ExecBuilder<'a> { diff --git a/driver/src/conn.rs b/driver/src/conn.rs index 7dc48201d..91a01d6b2 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -74,6 +74,30 @@ pub trait IConnection: Send + Sync { async fn query_iter(&self, sql: &str) -> Result; async fn query_iter_ext(&self, sql: &str) -> Result; + fn supports_server_side_params(&self) -> bool { + false + } + + async fn exec_with_params(&self, sql: &str, _params: Option) -> Result { + self.exec(sql).await + } + + async fn query_iter_with_params( + &self, + sql: &str, + _params: Option, + ) -> Result { + self.query_iter(sql).await + } + + async fn query_iter_ext_with_params( + &self, + sql: &str, + _params: Option, + ) -> Result { + self.query_iter_ext(sql).await + } + async fn query_row(&self, sql: &str) -> Result> { let rows = self.query_all(sql).await?; let row = rows.into_iter().next(); diff --git a/driver/src/params.rs b/driver/src/params.rs index 94bab635e..f74a3e2bb 100644 --- a/driver/src/params.rs +++ b/driver/src/params.rs @@ -35,6 +35,41 @@ impl Default for Params { } } +/// Reverse-parse an `as_sql_string()` output back into a typed JSON value. +fn sql_string_to_json(s: &str) -> serde_json::Value { + if s == "NULL" { + return serde_json::Value::Null; + } + if s == "TRUE" { + return serde_json::Value::Bool(true); + } + if s == "FALSE" { + return serde_json::Value::Bool(false); + } + // Try integer + if let Ok(n) = s.parse::() { + return serde_json::json!(n); + } + // Try float + if let Ok(n) = s.parse::() { + return serde_json::json!(n); + } + // Strip surrounding single quotes for strings + if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 { + let inner = &s[1..s.len() - 1]; + // Handle JSON cast suffix like '{"a": 1}'::JSON + return serde_json::Value::String(inner.to_string()); + } + // Strip JSON cast suffix + if let Some(json_str) = s.strip_suffix("'::JSON") { + if let Some(inner) = json_str.strip_prefix('\'') { + return serde_json::Value::String(inner.to_string()); + } + } + // Fallback: treat as string + serde_json::Value::String(s.to_string()) +} + impl Params { pub fn len(&self) -> usize { match self { @@ -77,6 +112,23 @@ impl Params { } } + /// Convert params to a JSON value suitable for server-side parameter binding. + /// `QuestionParams` → `Value::Array`, `NamedParams` → `Value::Object`. + pub fn to_json_value(&self) -> serde_json::Value { + match self { + Params::QuestionParams(vec) => { + serde_json::Value::Array(vec.iter().map(|s| sql_string_to_json(s)).collect()) + } + Params::NamedParams(map) => { + let obj: serde_json::Map = map + .iter() + .map(|(k, v)| (k.clone(), sql_string_to_json(v))) + .collect(); + serde_json::Value::Object(obj) + } + } + } + pub fn replace(&self, sql: &str) -> String { if !self.is_empty() { let tokens = databend_common_ast::parser::tokenize_sql(sql).unwrap(); @@ -376,6 +428,32 @@ mod tests { } } + #[test] + fn test_to_json_value() { + // Test positional params + let params = params! {1, "hello", 9.99}; + let json = params.to_json_value(); + assert_eq!(json, serde_json::json!([1, "hello", 9.99])); + + // Test named params + let params = params! {a => 1, b => "hello", c => true}; + let json = params.to_json_value(); + let obj = json.as_object().unwrap(); + assert_eq!(obj.get("a").unwrap(), &serde_json::json!(1)); + assert_eq!(obj.get("b").unwrap(), &serde_json::json!("hello")); + assert_eq!(obj.get("c").unwrap(), &serde_json::json!(true)); + + // Test NULL + let params = params! {()}; + let json = params.to_json_value(); + assert_eq!(json, serde_json::json!([null])); + + // Test Option + let params: Params = (Some(42), None::<()>, Some("world")).into(); + let json = params.to_json_value(); + assert_eq!(json, serde_json::json!([42, null, "world"])); + } + #[test] fn test_replace() { let params = params! {1, "44", 2, 3, "55", "66"}; diff --git a/driver/src/placeholder.rs b/driver/src/placeholder.rs index c2bcf3372..e3b23bdfd 100644 --- a/driver/src/placeholder.rs +++ b/driver/src/placeholder.rs @@ -76,6 +76,12 @@ impl PlaceholderVisitor { } } + /// Returns true if the SQL contains `$N` column position placeholders. + pub fn has_dollar_positions(&mut self, stmt: &Statement) -> bool { + stmt.drive(self); + !self.column_positions.is_empty() + } + pub fn replace_sql(&mut self, params: &Params, stmt: &Statement, sql: &str) -> String { stmt.drive(self); self.placeholders.sort_by(|l, r| l.start.cmp(&r.start)); diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 5a8c8c2e5..cea383619 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -62,7 +62,7 @@ impl RestAPIConnection { self.upload_to_stage(&location, data, size).await?; if self.client.capability().streaming_load { let sql = sql.replace(LOAD_PLACEHOLDER, &location); - let page = self.client.query_all(&sql).await?; + let page = self.client.query_all(&sql, None).await?; Ok(ServerStats::from(page.stats)) } else { let file_format_options = Self::default_file_format_options(); @@ -168,9 +168,19 @@ impl IConnection for RestAPIConnection { Ok(()) } + fn supports_server_side_params(&self) -> bool { + self.client.capability().server_side_params + } + async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); - let page = self.client.query_all(sql).await?; + let page = self.client.query_all(sql, None).await?; + Ok(page.stats.progresses.write_progress.rows as i64) + } + + async fn exec_with_params(&self, sql: &str, params: Option) -> Result { + info!("exec with params: {}", sql); + let page = self.client.query_all(sql, params).await?; Ok(page.stats.progresses.write_progress.rows as i64) } @@ -187,7 +197,29 @@ impl IConnection for RestAPIConnection { async fn query_iter_ext(&self, sql: &str) -> Result { info!("query iter ext: {}", sql); - let pages = self.client.start_query(sql, true).await?; + let pages = self.client.start_query(sql, true, None).await?; + let (schema, rows) = RestAPIRows::::from_pages(pages).await?; + Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) + } + + async fn query_iter_with_params( + &self, + sql: &str, + params: Option, + ) -> Result { + info!("query iter with params: {}", sql); + let rows_with_progress = self.query_iter_ext_with_params(sql, params).await?; + let rows = rows_with_progress.filter_rows().await?; + Ok(rows) + } + + async fn query_iter_ext_with_params( + &self, + sql: &str, + params: Option, + ) -> Result { + info!("query iter ext with params: {}", sql); + let pages = self.client.start_query(sql, true, params).await?; let (schema, rows) = RestAPIRows::::from_pages(pages).await?; Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) } @@ -195,7 +227,7 @@ impl IConnection for RestAPIConnection { // raw data response query, only for test async fn query_raw_iter(&self, sql: &str) -> Result { info!("query raw iter: {}", sql); - let pages = self.client.start_query(sql, true).await?; + let pages = self.client.start_query(sql, true, None).await?; let (schema, rows) = RestAPIRows::::from_pages(pages).await?; Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows))) } From b83f84a2a20282e0311de5d487bf7fc343559fac Mon Sep 17 00:00:00 2001 From: Clifton Cunningham Date: Fri, 27 Mar 2026 06:06:09 +0100 Subject: [PATCH 2/3] fix: handle lowercase bools and wide integers in sql_string_to_json - Accept case-insensitive TRUE/FALSE (serde_json::Value::Bool produces lowercase "true"/"false") - Try u64 parse before f64 to avoid precision loss on values above i64::MAX - Only attempt f64 parse when string contains '.', 'e', or 'E' - Add test cases for both edge cases --- driver/src/params.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/driver/src/params.rs b/driver/src/params.rs index f74a3e2bb..6acec9a73 100644 --- a/driver/src/params.rs +++ b/driver/src/params.rs @@ -40,20 +40,27 @@ fn sql_string_to_json(s: &str) -> serde_json::Value { if s == "NULL" { return serde_json::Value::Null; } - if s == "TRUE" { + // Accept both uppercase (from Param for bool) and lowercase (from serde_json::Value::Bool) + if s.eq_ignore_ascii_case("TRUE") { return serde_json::Value::Bool(true); } - if s == "FALSE" { + if s.eq_ignore_ascii_case("FALSE") { return serde_json::Value::Bool(false); } - // Try integer + // Try i64 first if let Ok(n) = s.parse::() { return serde_json::json!(n); } - // Try float - if let Ok(n) = s.parse::() { + // Try u64 for values above i64::MAX (e.g. large u64/u128 IDs) + if let Ok(n) = s.parse::() { return serde_json::json!(n); } + // Try float (only if not a pure integer string, to avoid precision loss on u128/i128) + if s.contains('.') || s.contains('e') || s.contains('E') { + if let Ok(n) = s.parse::() { + return serde_json::json!(n); + } + } // Strip surrounding single quotes for strings if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 { let inner = &s[1..s.len() - 1]; @@ -452,6 +459,17 @@ mod tests { let params: Params = (Some(42), None::<()>, Some("world")).into(); let json = params.to_json_value(); assert_eq!(json, serde_json::json!([42, null, "world"])); + + // Test lowercase bool (from serde_json::Value::Bool) + let params: Params = serde_json::json!([true, false]).into(); + let json = params.to_json_value(); + assert_eq!(json, serde_json::json!([true, false])); + + // Test large u64 above i64::MAX + let big: u64 = u64::MAX; + let params = Params::QuestionParams(vec![big.to_string()]); + let json = params.to_json_value(); + assert_eq!(json, serde_json::json!([big])); } #[test] From c4abdae5e507a2f9b8943faae6d3ba323af68c22 Mon Sep 17 00:00:00 2001 From: cliftonc Date: Fri, 10 Apr 2026 06:56:03 +0200 Subject: [PATCH 3/3] refactor: preserve original types in Params instead of stringify-then-parse Store serde_json::Value directly in Params enum instead of SQL strings, eliminating the lossy sql_string_to_json() reverse-parse roundtrip. This preserves type fidelity for server-side parameter binding (no more f64 coercion of large integers or bool case issues). - Add as_json_value() as primary method on Param trait - Change Params to store serde_json::Value instead of String - Add json_value_to_sql_string() for client-side binding path - Add Value::to_json_value() for ORM insert path - Simplify Python bindings to use py_to_json directly - Remove sql_string_to_json() reverse parser --- bindings/python/src/utils.rs | 71 +------ driver/src/client.rs | 6 +- driver/src/params.rs | 267 +++++++++++++------------- driver/src/placeholder.rs | 7 +- sql/src/value/format/to_sql_string.rs | 66 ++++++- 5 files changed, 210 insertions(+), 207 deletions(-) diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index 33ea21f41..8e3992655 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -15,7 +15,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; -use databend_driver::Param; use databend_driver::Params; use pyo3::exceptions::PyAttributeError; use pyo3::types::PyTuple; @@ -46,84 +45,26 @@ pub(crate) fn to_sql_params(v: Option>) -> Params { let mut params = HashMap::new(); for (k, v) in v.iter() { let k = k.extract::().unwrap(); - let v = to_sql_string(v).unwrap(); + let v = py_to_json(v).unwrap(); params.insert(k, v); } Params::NamedParams(params) } else if let Ok(v) = v.downcast::() { - let mut params = vec![]; - for v in v.iter() { - let v = to_sql_string(v).unwrap(); - params.push(v); - } + let params: Vec = + v.iter().map(|v| py_to_json(v).unwrap()).collect(); Params::QuestionParams(params) } else if let Ok(v) = v.downcast::() { - let mut params = vec![]; - for v in v.iter() { - let v = to_sql_string(v).unwrap(); - params.push(v); - } + let params: Vec = + v.iter().map(|v| py_to_json(v).unwrap()).collect(); Params::QuestionParams(params) } else { - Params::QuestionParams(vec![to_sql_string(v).unwrap()]) + Params::QuestionParams(vec![py_to_json(v).unwrap()]) } } None => Params::default(), } } -fn to_sql_string(v: Bound) -> PyResult { - if v.is_none() { - return Ok("NULL".to_string()); - } - match v.downcast::() { - Ok(v) => { - if let Ok(v) = v.extract::() { - Ok(v.as_sql_string()) - } else if let Ok(v) = v.extract::() { - Ok(v.as_sql_string()) - } else if let Ok(v) = v.extract::() { - Ok(v.as_sql_string()) - } else if let Ok(v) = v.extract::() { - Ok(v.as_sql_string()) - } else { - Err(PyAttributeError::new_err(format!( - "Invalid parameter type for: {v:?}, expected str, bool, int or float" - ))) - } - } - Err(e) => Err(e.into()), - } -} - -/// Convert Python params directly to JSON values, preserving native types. -pub(crate) fn to_json_params(v: Option>) -> Option { - match v { - Some(v) => { - if let Ok(v) = v.downcast::() { - let mut map = serde_json::Map::new(); - for (k, v) in v.iter() { - let k = k.extract::().unwrap(); - let v = py_to_json(v).unwrap(); - map.insert(k, v); - } - Some(serde_json::Value::Object(map)) - } else if let Ok(v) = v.downcast::() { - let arr: Vec = - v.iter().map(|v| py_to_json(v).unwrap()).collect(); - Some(serde_json::Value::Array(arr)) - } else if let Ok(v) = v.downcast::() { - let arr: Vec = - v.iter().map(|v| py_to_json(v).unwrap()).collect(); - Some(serde_json::Value::Array(arr)) - } else { - Some(serde_json::Value::Array(vec![py_to_json(v).unwrap()])) - } - } - None => None, - } -} - fn py_to_json(v: Bound) -> PyResult { if v.is_none() { return Ok(serde_json::Value::Null); diff --git a/driver/src/client.rs b/driver/src/client.rs index b763dc1a8..b492457ee 100644 --- a/driver/src/client.rs +++ b/driver/src/client.rs @@ -352,9 +352,9 @@ where let mut total_inserted = 0; for row in &self.rows { let values = row.to_values(); - let param_strings: Vec = - values.into_iter().map(|v| v.to_sql_string()).collect(); - let params = Params::QuestionParams(param_strings); + let json_values: Vec = + values.into_iter().map(|v| v.to_json_value()).collect(); + let params = Params::QuestionParams(json_values); let inserted = connection.exec(&sql).bind(params).await?; total_inserted += inserted; } diff --git a/driver/src/params.rs b/driver/src/params.rs index 6acec9a73..163373d96 100644 --- a/driver/src/params.rs +++ b/driver/src/params.rs @@ -18,15 +18,19 @@ use std::fmt::Debug; use databend_common_ast::parser::Dialect; pub trait Param: Debug { - fn as_sql_string(&self) -> String; + fn as_json_value(&self) -> serde_json::Value; + + fn as_sql_string(&self) -> String { + json_value_to_sql_string(&self.as_json_value()) + } } #[derive(Debug)] pub enum Params { // ?, ? - QuestionParams(Vec), + QuestionParams(Vec), // :name, :age - NamedParams(HashMap), + NamedParams(HashMap), } impl Default for Params { @@ -35,46 +39,35 @@ impl Default for Params { } } -/// Reverse-parse an `as_sql_string()` output back into a typed JSON value. -fn sql_string_to_json(s: &str) -> serde_json::Value { - if s == "NULL" { - return serde_json::Value::Null; - } - // Accept both uppercase (from Param for bool) and lowercase (from serde_json::Value::Bool) - if s.eq_ignore_ascii_case("TRUE") { - return serde_json::Value::Bool(true); - } - if s.eq_ignore_ascii_case("FALSE") { - return serde_json::Value::Bool(false); - } - // Try i64 first - if let Ok(n) = s.parse::() { - return serde_json::json!(n); - } - // Try u64 for values above i64::MAX (e.g. large u64/u128 IDs) - if let Ok(n) = s.parse::() { - return serde_json::json!(n); - } - // Try float (only if not a pure integer string, to avoid precision loss on u128/i128) - if s.contains('.') || s.contains('e') || s.contains('E') { - if let Ok(n) = s.parse::() { - return serde_json::json!(n); +/// Convert a `serde_json::Value` to a SQL string representation for client-side binding. +pub fn json_value_to_sql_string(v: &serde_json::Value) -> String { + match v { + serde_json::Value::Null => "NULL".to_string(), + serde_json::Value::Bool(b) => { + if *b { + "TRUE".to_string() + } else { + "FALSE".to_string() + } } - } - // Strip surrounding single quotes for strings - if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 { - let inner = &s[1..s.len() - 1]; - // Handle JSON cast suffix like '{"a": 1}'::JSON - return serde_json::Value::String(inner.to_string()); - } - // Strip JSON cast suffix - if let Some(json_str) = s.strip_suffix("'::JSON") { - if let Some(inner) = json_str.strip_prefix('\'') { - return serde_json::Value::String(inner.to_string()); + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::String(s) => format!("'{s}'"), + serde_json::Value::Array(arr) => { + let items: Vec = arr.iter().map(json_value_to_sql_string).collect(); + format!("[{}]", items.join(", ")) + } + serde_json::Value::Object(map) => { + let mut s = String::from("'{"); + for (i, (k, v)) in map.iter().enumerate() { + if i > 0 { + s.push_str(", "); + } + s.push_str(&format!("\"{k}\": {}", json_value_to_sql_string(v))); + } + s.push_str("}'::JSON"); + s } } - // Fallback: treat as string - serde_json::Value::String(s.to_string()) } impl Params { @@ -90,7 +83,7 @@ impl Params { } // index based from 1 - pub fn get_by_index(&self, index: usize) -> Option<&String> { + pub fn get_by_index(&self, index: usize) -> Option<&serde_json::Value> { if index == 0 { return None; } @@ -100,7 +93,7 @@ impl Params { } } - pub fn get_by_name(&self, name: &str) -> Option<&String> { + pub fn get_by_name(&self, name: &str) -> Option<&serde_json::Value> { match self { Params::NamedParams(map) => map.get(name), _ => None, @@ -123,14 +116,10 @@ impl Params { /// `QuestionParams` → `Value::Array`, `NamedParams` → `Value::Object`. pub fn to_json_value(&self) -> serde_json::Value { match self { - Params::QuestionParams(vec) => { - serde_json::Value::Array(vec.iter().map(|s| sql_string_to_json(s)).collect()) - } + Params::QuestionParams(vec) => serde_json::Value::Array(vec.clone()), Params::NamedParams(map) => { - let obj: serde_json::Map = map - .iter() - .map(|(k, v)| (k.clone(), sql_string_to_json(v))) - .collect(); + let obj: serde_json::Map = + map.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); serde_json::Value::Object(obj) } } @@ -150,48 +139,63 @@ impl Params { } } -// impl param for all integer types and string types -macro_rules! impl_param_for_integer { +// Implement Param for numeric types that fit in serde_json::Number +macro_rules! impl_param_for_json_number { ($($t:ty)*) => ($( impl Param for $t { - fn as_sql_string(&self) -> String { - self.to_string() + fn as_json_value(&self) -> serde_json::Value { + serde_json::json!(self) } } )*) } -impl_param_for_integer! { i8 i16 i32 i64 f32 f64 i128 isize u8 u16 u32 u64 u128 usize } +impl_param_for_json_number! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize f32 f64 } -// Implement Param for String -impl Param for bool { - fn as_sql_string(&self) -> String { - if *self { - "TRUE".to_string() +// i128/u128 cannot be represented in JSON numbers, store as string +impl Param for i128 { + fn as_json_value(&self) -> serde_json::Value { + // If it fits in i64, use a number; otherwise use a string to avoid precision loss + if *self >= i128::from(i64::MIN) && *self <= i128::from(i64::MAX) { + serde_json::json!(*self as i64) + } else { + serde_json::Value::String(self.to_string()) + } + } +} + +impl Param for u128 { + fn as_json_value(&self) -> serde_json::Value { + // If it fits in u64, use a number; otherwise use a string to avoid precision loss + if *self <= u128::from(u64::MAX) { + serde_json::json!(*self as u64) } else { - "FALSE".to_string() + serde_json::Value::String(self.to_string()) } } } -// Implement Param for String +impl Param for bool { + fn as_json_value(&self) -> serde_json::Value { + serde_json::Value::Bool(*self) + } +} + impl Param for String { - fn as_sql_string(&self) -> String { - format!("'{self}'") + fn as_json_value(&self) -> serde_json::Value { + serde_json::Value::String(self.clone()) } } -// Implement Param for &str impl Param for &str { - fn as_sql_string(&self) -> String { - format!("'{self}'") + fn as_json_value(&self) -> serde_json::Value { + serde_json::Value::String(self.to_string()) } } -// Impl Param for None impl Param for () { - fn as_sql_string(&self) -> String { - "NULL".to_string() + fn as_json_value(&self) -> serde_json::Value { + serde_json::Value::Null } } @@ -199,44 +203,17 @@ impl Param for Option where T: Param, { - fn as_sql_string(&self) -> String { + fn as_json_value(&self) -> serde_json::Value { match self { - Some(s) => s.as_sql_string(), - None => "NULL".to_string(), + Some(s) => s.as_json_value(), + None => serde_json::Value::Null, } } } impl Param for serde_json::Value { - fn as_sql_string(&self) -> String { - match self { - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::String(s) => format!("'{s}'"), - serde_json::Value::Bool(b) => b.to_string(), - serde_json::Value::Null => "NULL".to_string(), - serde_json::Value::Array(values) => { - let mut s = String::from("["); - for (i, v) in values.iter().enumerate() { - if i > 0 { - s.push_str(", "); - } - s.push_str(&v.as_sql_string()); - } - s.push(']'); - s - } - serde_json::Value::Object(map) => { - let mut s = String::from("'{"); - for (i, (k, v)) in map.iter().enumerate() { - if i > 0 { - s.push_str(", "); - } - s.push_str(&format!("\"{k}\": {}", v.as_sql_string())); - } - s.push_str("}'::JSON"); - s - } - } + fn as_json_value(&self) -> serde_json::Value { + self.clone() } } @@ -255,7 +232,7 @@ macro_rules! params { let mut map = HashMap::new(); $( - map.insert(stringify!($key).to_string(), $crate::Param::as_sql_string(&$value)); + map.insert(stringify!($key).to_string(), $crate::Param::as_json_value(&$value)); )* map }) @@ -264,7 +241,7 @@ macro_rules! params { ($($value:expr),* $(,)?) => { $crate::Params::QuestionParams(vec![ $( - $crate::Param::as_sql_string(&$value), + $crate::Param::as_json_value(&$value), )* ]) }; @@ -287,8 +264,8 @@ macro_rules! impl_from_tuple_for_params { impl<$head: Param, $($tail: Param),*> From<($head, $($tail),*)> for Params { fn from(tuple: ($head, $($tail),*)) -> Self { let (h, $($tail),*) = tuple; - let mut params = Params::QuestionParams(vec![h.as_sql_string()]); - $(params.merge(Params::QuestionParams(vec![$tail.as_sql_string()]));)* + let mut params = Params::QuestionParams(vec![h.as_json_value()]); + $(params.merge(Params::QuestionParams(vec![$tail.as_json_value()]));)* params } } @@ -300,7 +277,7 @@ macro_rules! impl_from_tuple_for_params { ($last:ident) => { impl<$last: Param> From<($last,)> for Params { fn from(tuple: ($last,)) -> Self { - Params::QuestionParams(vec![tuple.0.as_sql_string()]) + Params::QuestionParams(vec![tuple.0.as_json_value()]) } } }; @@ -320,21 +297,9 @@ impl From> for Params { impl From for Params { fn from(value: serde_json::Value) -> Self { match value { - serde_json::Value::Array(obj) => { - let mut array = Vec::new(); - for v in obj { - array.push(v.as_sql_string()); - } - Params::QuestionParams(array) - } - serde_json::Value::Object(obj) => { - let mut map = HashMap::new(); - for (k, v) in obj { - map.insert(k, v.as_sql_string()); - } - Params::NamedParams(map) - } - other => Params::QuestionParams(vec![other.as_sql_string()]), + serde_json::Value::Array(arr) => Params::QuestionParams(arr), + serde_json::Value::Object(obj) => Params::NamedParams(obj.into_iter().collect()), + other => Params::QuestionParams(vec![other]), } } } @@ -352,9 +317,9 @@ mod tests { let params = params! {a => 1, b => age, c => name}; match params { Params::NamedParams(map) => { - assert_eq!(map.get("a").unwrap(), "1"); - assert_eq!(map.get("b").unwrap(), "4"); - assert_eq!(map.get("c").unwrap(), "'d'"); + assert_eq!(map.get("a").unwrap(), &serde_json::json!(1)); + assert_eq!(map.get("b").unwrap(), &serde_json::json!(4)); + assert_eq!(map.get("c").unwrap(), &serde_json::json!("d")); } _ => panic!("Expected NamedParams"), } @@ -369,7 +334,14 @@ mod tests { let params = params! {name, age, 33u64}; match params { Params::QuestionParams(vec) => { - assert_eq!(vec, vec!["'d'", "4", "33"]); + assert_eq!( + vec, + vec![ + serde_json::json!("d"), + serde_json::json!(4), + serde_json::json!(33u64) + ] + ); } _ => panic!("Expected QuestionParams"), } @@ -380,7 +352,17 @@ mod tests { let params: Params = (1, "44", 2, 3, "55", "66").into(); match params { Params::QuestionParams(vec) => { - assert_eq!(vec, vec!["1", "'44'", "2", "3", "'55'", "'66'"]); + assert_eq!( + vec, + vec![ + serde_json::json!(1), + serde_json::json!("44"), + serde_json::json!(2), + serde_json::json!(3), + serde_json::json!("55"), + serde_json::json!("66"), + ] + ); } _ => panic!("Expected QuestionParams"), } @@ -390,7 +372,15 @@ mod tests { { let params: Params = (Some(1), None::<()>, Some("44"), None::<()>).into(); match params { - Params::QuestionParams(vec) => assert_eq!(vec, vec!["1", "NULL", "'44'", "NULL"]), + Params::QuestionParams(vec) => assert_eq!( + vec, + vec![ + serde_json::json!(1), + serde_json::Value::Null, + serde_json::json!("44"), + serde_json::Value::Null, + ] + ), _ => panic!("Expected QuestionParams"), } } @@ -408,12 +398,12 @@ mod tests { .into(); match params { Params::NamedParams(map) => { - assert_eq!(map.get("a").unwrap(), "1"); - assert_eq!(map.get("b").unwrap(), "'44'"); - assert_eq!(map.get("c").unwrap(), "2"); - assert_eq!(map.get("d").unwrap(), "3"); - assert_eq!(map.get("e").unwrap(), "'55'"); - assert_eq!(map.get("f").unwrap(), "'66'"); + assert_eq!(map.get("a").unwrap(), &serde_json::json!(1)); + assert_eq!(map.get("b").unwrap(), &serde_json::json!("44")); + assert_eq!(map.get("c").unwrap(), &serde_json::json!(2)); + assert_eq!(map.get("d").unwrap(), &serde_json::json!(3)); + assert_eq!(map.get("e").unwrap(), &serde_json::json!("55")); + assert_eq!(map.get("f").unwrap(), &serde_json::json!("66")); } _ => panic!("Expected NamedParams"), } @@ -427,7 +417,14 @@ mod tests { Params::QuestionParams(vec) => { assert_eq!( vec, - vec!["1", "'44'", "2", "'{\"a\": 1}'::JSON", "'55'", "'66'"] + vec![ + serde_json::json!(1), + serde_json::json!("44"), + serde_json::json!(2), + serde_json::json!({"a": 1}), + serde_json::json!("55"), + serde_json::json!("66"), + ] ); } _ => panic!("Expected QuestionParams"), @@ -467,7 +464,7 @@ mod tests { // Test large u64 above i64::MAX let big: u64 = u64::MAX; - let params = Params::QuestionParams(vec![big.to_string()]); + let params: Params = (big,).into(); let json = params.to_json_value(); assert_eq!(json, serde_json::json!([big])); } diff --git a/driver/src/placeholder.rs b/driver/src/placeholder.rs index e3b23bdfd..21604e26e 100644 --- a/driver/src/placeholder.rs +++ b/driver/src/placeholder.rs @@ -25,6 +25,7 @@ use databend_common_ast::Range; use derive_visitor::Drive; use derive_visitor::Visitor; +use crate::params::json_value_to_sql_string; use crate::Params; #[derive(Visitor)] @@ -90,13 +91,13 @@ impl PlaceholderVisitor { for (index, range) in self.placeholders.iter().enumerate() { if let Some(v) = params.get_by_index(index + 1) { - results.push((v.to_string(), *range)); + results.push((json_value_to_sql_string(v), *range)); } } for (name, range) in self.names.iter() { if let Some(v) = params.get_by_name(name) { - results.push((v.to_string(), *range)); + results.push((json_value_to_sql_string(v), *range)); } } @@ -118,7 +119,7 @@ impl PlaceholderVisitor { if let Some(value) = params.get_by_index(*index) { let start = r.start as usize; let end = r.end as usize; - sql.replace_range(start..end, value); + sql.replace_range(start..end, &json_value_to_sql_string(value)); } } } diff --git a/sql/src/value/format/to_sql_string.rs b/sql/src/value/format/to_sql_string.rs index 0b09107e7..1a79ac890 100644 --- a/sql/src/value/format/to_sql_string.rs +++ b/sql/src/value/format/to_sql_string.rs @@ -13,10 +13,74 @@ // limitations under the License. use crate::_macro_internal::Value; -use crate::value::base::{DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT}; +use crate::value::base::{NumberValue, DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT}; use chrono::NaiveDate; impl Value { + /// Convert a Value to a serde_json::Value for use in Params. + pub fn to_json_value(&self) -> serde_json::Value { + match self { + Value::Null => serde_json::Value::Null, + Value::EmptyArray => serde_json::json!([]), + Value::EmptyMap => serde_json::json!({}), + Value::Boolean(b) => serde_json::Value::Bool(*b), + Value::String(s) => serde_json::Value::String(s.clone()), + Value::Number(n) => match n { + NumberValue::Int8(v) => serde_json::json!(v), + NumberValue::Int16(v) => serde_json::json!(v), + NumberValue::Int32(v) => serde_json::json!(v), + NumberValue::Int64(v) => serde_json::json!(v), + NumberValue::UInt8(v) => serde_json::json!(v), + NumberValue::UInt16(v) => serde_json::json!(v), + NumberValue::UInt32(v) => serde_json::json!(v), + NumberValue::UInt64(v) => serde_json::json!(v), + NumberValue::Float32(v) => serde_json::json!(v), + NumberValue::Float64(v) => serde_json::json!(v), + NumberValue::Decimal64(v, _) => serde_json::Value::String(v.to_string()), + NumberValue::Decimal128(v, _) => serde_json::Value::String(v.to_string()), + NumberValue::Decimal256(v, _) => serde_json::Value::String(v.to_string()), + }, + Value::Timestamp(dt) => { + serde_json::Value::String(dt.strftime(TIMESTAMP_FORMAT).to_string()) + } + Value::TimestampTz(dt) => { + serde_json::Value::String(dt.strftime(TIMESTAMP_TIMEZONE_FORMAT).to_string()) + } + Value::Date(d) => { + let date = NaiveDate::from_num_days_from_ce_opt(*d + DAYS_FROM_CE).unwrap(); + serde_json::Value::String(date.format("%Y-%m-%d").to_string()) + } + Value::Binary(b) => serde_json::Value::String(hex::encode(b)), + Value::Array(arr) => { + serde_json::Value::Array(arr.iter().map(|v| v.to_json_value()).collect()) + } + Value::Map(map) => { + let obj: serde_json::Map = map + .iter() + .map(|(k, v)| { + let key = match k { + Value::String(s) => s.clone(), + other => format!("{:?}", other), + }; + (key, v.to_json_value()) + }) + .collect(); + serde_json::Value::Object(obj) + } + Value::Tuple(tuple) => { + serde_json::Value::Array(tuple.iter().map(|v| v.to_json_value()).collect()) + } + Value::Bitmap(b) => serde_json::Value::String(b.clone()), + Value::Variant(v) => serde_json::Value::String(v.clone()), + Value::Geometry(g) => serde_json::Value::String(g.clone()), + Value::Geography(g) => serde_json::Value::String(g.clone()), + Value::Interval(i) => serde_json::Value::String(i.clone()), + Value::Vector(v) => { + serde_json::Value::Array(v.iter().map(|f| serde_json::json!(f)).collect()) + } + } + } + // for now only used in ORM to fmt values to insert, // for Params, rust use Param::as_sql_string, and py/js bindings are handled in binding code pub fn to_sql_string(&self) -> String {