@@ -2,7 +2,7 @@ use adbc_core::{
22 options:: { AdbcVersion , OptionConnection , OptionDatabase , OptionValue } ,
33 Connection , Database , Driver , Statement , LOAD_FLAG_DEFAULT ,
44} ;
5- use adbc_driver_manager:: ManagedDriver ;
5+ use adbc_driver_manager:: { ManagedConnection , ManagedDatabase , ManagedDriver } ;
66use arrow_array:: {
77 Array , BinaryArray , BooleanArray , Date32Array , Date64Array , Decimal128Array , Float16Array ,
88 Float32Array , Float64Array , Int16Array , Int32Array , Int64Array , Int8Array , LargeBinaryArray ,
@@ -12,10 +12,16 @@ use arrow_array::{
1212 UInt32Array , UInt64Array , UInt8Array ,
1313} ;
1414use arrow_ipc:: writer:: StreamWriter ;
15- use arrow_schema:: { DataType , TimeUnit } ;
15+ use arrow_schema:: { DataType , Schema , TimeUnit } ;
1616use std:: io:: Write ;
17+ use std:: path:: PathBuf ;
1718
19+ use crate :: core:: SemanticGraph ;
1820use crate :: error:: { Result , SidemanticError } ;
21+ use crate :: sql:: { QueryRewriter , SemanticQuery , SqlGenerator } ;
22+
23+ use super :: result:: ExecutionResult ;
24+ use super :: url:: ConnectionSpec ;
1925
2026#[ derive( Debug , Clone , PartialEq ) ]
2127pub enum AdbcValue {
@@ -50,6 +56,153 @@ pub struct AdbcExecutionRequest {
5056 pub connection_options : Vec < ( OptionConnection , OptionValue ) > ,
5157}
5258
59+ /// Pure Rust ADBC executor for semantic queries.
60+ ///
61+ /// Drivers are loaded by the ADBC driver manager. Drivers installed with
62+ /// `dbc install <driver>` are found through the manager's normal manifest
63+ /// search paths.
64+ pub struct AdbcExecutor {
65+ pub spec : ConnectionSpec ,
66+ database : ManagedDatabase ,
67+ connection : ManagedConnection ,
68+ }
69+
70+ impl AdbcExecutor {
71+ pub fn connect ( spec : ConnectionSpec ) -> Result < Self > {
72+ let entrypoint = spec. entrypoint . clone ( ) ;
73+ let mut driver = load_managed_driver (
74+ & spec. driver ,
75+ entrypoint. as_deref ( ) ,
76+ spec. adbc_version ,
77+ spec. load_flags ,
78+ spec. additional_search_paths . clone ( ) ,
79+ )
80+ . map_err ( |err| {
81+ SidemanticError :: Database ( format ! (
82+ "failed to load ADBC driver '{}' through the dbc/ADBC registry. \
83+ Install the driver with `dbc install {}` and make sure the ADBC driver \
84+ manager search path can see it. Underlying error: {err}",
85+ spec. driver, spec. driver
86+ ) )
87+ } ) ?;
88+
89+ let options = connection_spec_database_options ( & spec) ;
90+ let database = if options. is_empty ( ) {
91+ driver. new_database ( )
92+ } else {
93+ driver. new_database_with_opts ( options)
94+ } ?;
95+ let connection = database. new_connection ( ) ?;
96+
97+ Ok ( Self {
98+ spec,
99+ database,
100+ connection,
101+ } )
102+ }
103+
104+ pub fn connect_url ( url : & str ) -> Result < Self > {
105+ Self :: connect ( ConnectionSpec :: from_url ( url) ?)
106+ }
107+
108+ /// Execute SQL and return an Arrow record batch reader.
109+ pub fn execute_sql ( & mut self , sql : & str ) -> Result < ExecutionResult > {
110+ let mut statement = self . connection . new_statement ( ) ?;
111+ statement. set_sql_query ( sql) ?;
112+ let mut reader = statement. execute ( ) ?;
113+ let schema = reader. schema ( ) ;
114+ let mut batches = Vec :: new ( ) ;
115+ for batch in & mut reader {
116+ batches. push ( batch?) ;
117+ }
118+ Ok ( ExecutionResult :: new ( sql. to_string ( ) , schema, batches) )
119+ }
120+
121+ /// Execute a SQL statement that does not return a result set.
122+ pub fn execute_update ( & mut self , sql : & str ) -> Result < Option < i64 > > {
123+ let mut statement = self . connection . new_statement ( ) ?;
124+ statement. set_sql_query ( sql) ?;
125+ Ok ( statement. execute_update ( ) ?)
126+ }
127+
128+ /// Generate SQL from a semantic query and execute it through ADBC.
129+ pub fn execute_semantic_query (
130+ & mut self ,
131+ graph : & SemanticGraph ,
132+ query : & SemanticQuery ,
133+ ) -> Result < ExecutionResult > {
134+ let sql = SqlGenerator :: new ( graph) . generate ( query) ?;
135+ self . execute_sql ( & sql)
136+ }
137+
138+ /// Rewrite SQL through the semantic graph, then execute the rewritten SQL.
139+ pub fn rewrite_and_execute (
140+ & mut self ,
141+ graph : & SemanticGraph ,
142+ sql : & str ,
143+ ) -> Result < ExecutionResult > {
144+ let rewritten = QueryRewriter :: new ( graph) . rewrite ( sql) ?;
145+ self . execute_sql ( & rewritten)
146+ }
147+
148+ /// Return an ADBC metadata stream for catalogs, schemas, tables, and columns.
149+ pub fn get_objects (
150+ & self ,
151+ depth : adbc_core:: options:: ObjectDepth ,
152+ ) -> Result < Box < dyn RecordBatchReader + Send + ' _ > > {
153+ Ok ( Box :: new (
154+ self . connection
155+ . get_objects ( depth, None , None , None , None , None ) ?,
156+ ) )
157+ }
158+
159+ /// Get the Arrow schema for a table.
160+ pub fn get_table_schema (
161+ & self ,
162+ catalog : Option < & str > ,
163+ db_schema : Option < & str > ,
164+ table_name : & str ,
165+ ) -> Result < Schema > {
166+ Ok ( self
167+ . connection
168+ . get_table_schema ( catalog, db_schema, table_name) ?)
169+ }
170+
171+ /// Keep a reference to the ADBC database handle for advanced callers.
172+ pub fn database ( & self ) -> & ManagedDatabase {
173+ & self . database
174+ }
175+ }
176+
177+ fn load_managed_driver (
178+ driver : & str ,
179+ entrypoint : Option < & str > ,
180+ adbc_version : AdbcVersion ,
181+ load_flags : adbc_core:: LoadFlags ,
182+ additional_search_paths : Option < Vec < PathBuf > > ,
183+ ) -> std:: result:: Result < ManagedDriver , adbc_core:: error:: Error > {
184+ let entrypoint_bytes = entrypoint. map ( str:: as_bytes) ;
185+ ManagedDriver :: load_from_name (
186+ driver,
187+ entrypoint_bytes,
188+ adbc_version,
189+ load_flags,
190+ additional_search_paths. clone ( ) ,
191+ )
192+ . or_else ( |err| {
193+ if matches ! ( adbc_version, AdbcVersion :: V100 ) {
194+ return Err ( err) ;
195+ }
196+ ManagedDriver :: load_from_name (
197+ driver,
198+ entrypoint_bytes,
199+ AdbcVersion :: V100 ,
200+ load_flags,
201+ additional_search_paths,
202+ )
203+ } )
204+ }
205+
53206fn adbc_error ( context : & str , err : impl std:: fmt:: Display ) -> SidemanticError {
54207 SidemanticError :: InvalidConfig ( format ! ( "{context}: {err}" ) )
55208}
@@ -89,6 +242,29 @@ fn database_options_with_uri(
89242 database_options
90243}
91244
245+ fn connection_spec_database_options ( spec : & ConnectionSpec ) -> Vec < ( OptionDatabase , OptionValue ) > {
246+ let options = spec
247+ . database_options
248+ . iter ( )
249+ . map ( |( key, value) | ( database_option_key ( key) , OptionValue :: String ( value. clone ( ) ) ) )
250+ . collect ( ) ;
251+ database_options_with_uri (
252+ & spec. driver ,
253+ spec. entrypoint . as_deref ( ) ,
254+ spec. uri . clone ( ) ,
255+ options,
256+ )
257+ }
258+
259+ fn database_option_key ( key : & str ) -> OptionDatabase {
260+ match key {
261+ "uri" | "adbc.uri" => OptionDatabase :: Uri ,
262+ "username" | "user" | "adbc.username" => OptionDatabase :: Username ,
263+ "password" | "adbc.password" => OptionDatabase :: Password ,
264+ other => OptionDatabase :: Other ( other. to_string ( ) ) ,
265+ }
266+ }
267+
92268pub fn execute_with_adbc ( request : AdbcExecutionRequest ) -> Result < AdbcExecutionResult > {
93269 let AdbcExecutionRequest {
94270 driver,
0 commit comments