Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 180af86

Browse files
authored
feat(core): introduce dialect-specific function list and refactor BigQuery function lists (#1366)
1 parent f4e007f commit 180af86

27 files changed

Lines changed: 3071 additions & 257 deletions

File tree

ibis-server/app/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ def update(self, diagnose: bool):
6161
def get_remote_function_list_path(self, data_source: str) -> str:
6262
if not self.remote_function_list_path:
6363
return None
64+
65+
# The function list has been defined by Wren Core
66+
if data_source in {"bigquery"}:
67+
return None
68+
6469
if data_source in {"local_file", "s3_file", "minio_file", "gcs_file"}:
6570
data_source = "duckdb"
6671
base_path = os.path.normpath(self.remote_function_list_path)

ibis-server/app/mdl/core.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55

66
@cache
77
def get_session_context(
8-
manifest_str: str | None, function_path: str, properties: frozenset | None = None
8+
manifest_str: str | None,
9+
function_path: str,
10+
properties: frozenset | None = None,
11+
data_source: str | None = None,
912
) -> wren_core.SessionContext:
10-
return wren_core.SessionContext(manifest_str, function_path, properties)
13+
return wren_core.SessionContext(
14+
manifest_str, function_path, properties, data_source
15+
)
1116

1217

1318
def get_manifest_extractor(manifest_str: str) -> wren_core.ManifestExtractor:

ibis-server/app/mdl/rewriter.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ def __init__(
4242
self.properties = properties
4343
if experiment:
4444
function_path = get_config().get_remote_function_list_path(data_source)
45-
self._rewriter = EmbeddedEngineRewriter(function_path)
45+
self._rewriter = EmbeddedEngineRewriter(
46+
function_path=function_path, data_source=data_source
47+
)
4648
else:
4749
self._rewriter = ExternalEngineRewriter(java_engine_connector)
4850

@@ -130,7 +132,8 @@ def handle_extract_exception(e: Exception):
130132

131133

132134
class EmbeddedEngineRewriter:
133-
def __init__(self, function_path: str):
135+
def __init__(self, function_path: str, data_source: DataSource = None):
136+
self.data_source = data_source
134137
self.function_path = function_path
135138

136139
@tracer.start_as_current_span("embedded_rewrite", kind=trace.SpanKind.INTERNAL)
@@ -140,7 +143,10 @@ async def rewrite(
140143
try:
141144
processed_properties = self.get_session_properties(properties)
142145
session_context = get_session_context(
143-
manifest_str, self.function_path, processed_properties
146+
manifest_str,
147+
self.function_path,
148+
processed_properties,
149+
self.data_source.name if self.data_source else None,
144150
)
145151
return await to_thread.run_sync(
146152
session_context.transform_sql,
@@ -151,12 +157,18 @@ async def rewrite(
151157

152158
@tracer.start_as_current_span("embedded_rewrite", kind=trace.SpanKind.INTERNAL)
153159
def rewrite_sync(
154-
self, manifest_str: str, sql: str, properties: dict | None = None
160+
self,
161+
manifest_str: str,
162+
sql: str,
163+
properties: dict | None = None,
155164
) -> str:
156165
try:
157166
processed_properties = self.get_session_properties(properties)
158167
session_context = get_session_context(
159-
manifest_str, self.function_path, processed_properties
168+
manifest_str,
169+
self.function_path,
170+
processed_properties,
171+
self.data_source.name if self.data_source else None,
160172
)
161173
return session_context.transform_sql(sql)
162174
except Exception as e:

ibis-server/app/model/error.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pydantic import BaseModel, Field
66
from starlette.status import (
77
HTTP_404_NOT_FOUND,
8-
HTTP_422_UNPROCESSABLE_ENTITY,
8+
HTTP_422_UNPROCESSABLE_CONTENT,
99
HTTP_500_INTERNAL_SERVER_ERROR,
1010
HTTP_501_NOT_IMPLEMENTED,
1111
HTTP_502_BAD_GATEWAY,
@@ -109,7 +109,7 @@ def get_http_status_code(self) -> int:
109109
return HTTP_504_GATEWAY_TIMEOUT
110110
case e:
111111
if e.value < 100:
112-
return HTTP_422_UNPROCESSABLE_ENTITY
112+
return HTTP_422_UNPROCESSABLE_CONTENT
113113
return HTTP_500_INTERNAL_SERVER_ERROR
114114

115115

ibis-server/resources/function_list/bigquery.csv

Lines changed: 0 additions & 44 deletions
This file was deleted.

ibis-server/tools/query_local_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
print("### Session Properties ###")
7878
for key, value in properties:
7979
print(f"# {key}: {value}")
80-
session_context = SessionContext(encoded_str, function_list_path + f"/{data_source}.csv", properties)
80+
session_context = SessionContext(encoded_str, function_list_path + f"/{data_source}.csv", properties, data_source)
8181
planned_sql = session_context.transform_sql(sql)
8282
print("# Planned SQL:\n", planned_sql)
8383

ibis-server/wren/session/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ def plan(self, input_sql):
9797
)
9898

9999
self.planned_sql = self.context.rewriter.rewrite_sync(
100-
self.manifest, self.wren_sql, self.properties
100+
self.manifest,
101+
self.wren_sql,
102+
self.properties,
101103
)
102104

103105
read = self._get_read_dialect()

wren-core-base/src/mdl/manifest.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::error::Error;
12
/*
23
* Licensed to the Apache Software Foundation (ASF) under one
34
* or more contributor license agreements. See the NOTICE file
@@ -17,6 +18,7 @@
1718
* under the License.
1819
*/
1920
use std::fmt::Display;
21+
use std::str::FromStr;
2022
use std::sync::Arc;
2123

2224
#[cfg(not(feature = "python-binding"))]
@@ -99,6 +101,32 @@ mod manifest_impl {
99101

100102
pub use crate::mdl::manifest::manifest_impl::*;
101103

104+
#[derive(Debug, Clone, PartialEq, Eq)]
105+
pub struct ParsedDataSourceError {
106+
pub message: String,
107+
}
108+
109+
impl ParsedDataSourceError {
110+
pub fn new(msg: &str) -> ParsedDataSourceError {
111+
ParsedDataSourceError {
112+
message: msg.to_string(),
113+
}
114+
}
115+
}
116+
117+
impl Display for ParsedDataSourceError {
118+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119+
write!(f, "ParsedDataSourceError: {}", self.message)
120+
}
121+
}
122+
123+
impl Error for ParsedDataSourceError {
124+
#[allow(deprecated)]
125+
fn description(&self) -> &str {
126+
&self.message
127+
}
128+
}
129+
102130
impl Display for DataSource {
103131
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104132
match self {
@@ -124,6 +152,37 @@ impl Display for DataSource {
124152
}
125153
}
126154

155+
impl FromStr for DataSource {
156+
type Err = ParsedDataSourceError;
157+
158+
fn from_str(s: &str) -> Result<Self, Self::Err> {
159+
match s.to_uppercase().as_str() {
160+
"BIGQUERY" => Ok(DataSource::BigQuery),
161+
"CLICKHOUSE" => Ok(DataSource::Clickhouse),
162+
"CANNER" => Ok(DataSource::Canner),
163+
"TRINO" => Ok(DataSource::Trino),
164+
"MSSQL" => Ok(DataSource::MSSQL),
165+
"MYSQL" => Ok(DataSource::MySQL),
166+
"POSTGRES" => Ok(DataSource::Postgres),
167+
"SNOWFLAKE" => Ok(DataSource::Snowflake),
168+
"DATAFUSION" => Ok(DataSource::Datafusion),
169+
"DUCKDB" => Ok(DataSource::DuckDB),
170+
"LOCAL_FILE" => Ok(DataSource::LocalFile),
171+
"S3_FILE" => Ok(DataSource::S3File),
172+
"GCS_FILE" => Ok(DataSource::GcsFile),
173+
"MINIO_FILE" => Ok(DataSource::MinioFile),
174+
"ORACLE" => Ok(DataSource::Oracle),
175+
"ATHENA" => Ok(DataSource::Athena),
176+
"REDSHIFT" => Ok(DataSource::Redshift),
177+
"DATABRICKS" => Ok(DataSource::Databricks),
178+
_ => Err(ParsedDataSourceError::new(&format!(
179+
"Unknown data source: {}",
180+
s
181+
))),
182+
}
183+
}
184+
}
185+
127186
mod table_reference {
128187
use serde::{self, Deserialize, Deserializer, Serialize, Serializer};
129188

@@ -260,7 +319,7 @@ impl Model {
260319
self.columns
261320
.iter()
262321
.filter(|c| c.relationship.is_none())
263-
.map(|c| Arc::clone(&c))
322+
.map(Arc::clone)
264323
.collect()
265324
}
266325
}
@@ -286,7 +345,7 @@ impl Model {
286345
self.columns
287346
.iter()
288347
.find(|c| c.name == column_name)
289-
.map(|c| Arc::clone(&c))
348+
.map(Arc::clone)
290349
}
291350

292351
/// Return the primary key of the model

0 commit comments

Comments
 (0)