Skip to content

Commit 22019fb

Browse files
committed
feat: Add Oracle table provider with rust-oracle driver
Implement Oracle support using rust-oracle (ODPI-C) and bb8 connection pooling. Includes comprehensive type mappings (NUMBER, DATE, TIMESTAMP, CLOB, BLOB, RAW), schema inference, and 12 integration tests. Tested against Oracle Database 23c Free. Note: This is my first contribution to the project. Feedback welcome! :)
1 parent 8b277b5 commit 22019fb

15 files changed

Lines changed: 1916 additions & 0 deletions

File tree

README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let ctx = SessionContext::with_state(state);
2222

2323
- PostgreSQL
2424
- MySQL
25+
- Oracle
2526
- SQLite
2627
- ClickHouse
2728
- DuckDB
@@ -165,6 +166,57 @@ EOF
165166
cargo run -p datafusion-table-providers --example mysql --features mysql
166167
```
167168

169+
### Oracle
170+
171+
In order to run the Oracle example, you need to have an Oracle database server running. You can use the following command to start an Oracle Free server in a Docker container the example can use:
172+
173+
```bash
174+
docker run --name oracle-free \
175+
-e ORACLE_PASSWORD=OraclePassword123 \
176+
-p 1521:1521 \
177+
-d gvenzl/oracle-free:latest
178+
179+
# Wait for the Oracle server to start and healthcheck to pass
180+
echo "Waiting for Oracle to start (this may take 1-2 minutes)..."
181+
until docker exec oracle-free /usr/local/bin/checkHealth.sh >/dev/null 2>&1; do
182+
sleep 5
183+
done
184+
echo "Oracle is ready!"
185+
186+
# Create a table in the Oracle server and insert some data
187+
docker exec -i oracle-free sqlplus system/OraclePassword123@FREEPDB1 <<EOF
188+
CREATE TABLE companies (
189+
id NUMBER PRIMARY KEY,
190+
name VARCHAR2(100)
191+
);
192+
193+
INSERT INTO companies (id, name) VALUES (1, 'Acme Corporation');
194+
INSERT INTO companies (id, name) VALUES (2, 'Widget Inc.');
195+
COMMIT;
196+
EXIT;
197+
EOF
198+
```
199+
200+
**Prerequisites:** The `rust-oracle` crate requires Oracle Instant Client libraries (ODPI-C). Install them:
201+
202+
- **Linux (Debian/Ubuntu):**
203+
```bash
204+
apt-get install libaio1 wget unzip
205+
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basiclite-linuxx64.zip
206+
unzip instantclient-basiclite-linuxx64.zip -d /opt/oracle
207+
export LD_LIBRARY_PATH=/opt/oracle/instantclient_XX_X:$LD_LIBRARY_PATH
208+
```
209+
210+
- **macOS:**
211+
```bash
212+
brew install instantclient-basic
213+
```
214+
215+
```bash
216+
# Run from repo folder
217+
cargo run -p datafusion-table-providers --example oracle --features oracle
218+
```
219+
168220
### Flight SQL
169221

170222
```bash

core/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ rust_decimal = { version = "1.38.0", features = ["db-postgres"] }
103103
adbc_driver_manager = { workspace = true, optional = true }
104104
adbc_core = { workspace = true, optional = true }
105105
r2d2_adbc = { version = "0.2.0", optional = true }
106+
oracle = { version = "0.6", optional = true }
107+
bb8-oracle = { version = "0.3", optional = true }
106108

107109
[dev-dependencies]
108110
anyhow = "1.0"
@@ -167,6 +169,14 @@ adbc = [
167169
"dep:r2d2",
168170
]
169171
adbc-federation = ["adbc", "federation"]
172+
oracle = [
173+
"dep:oracle",
174+
"dep:bb8",
175+
"dep:bb8-oracle",
176+
"dep:async-stream",
177+
"dep:arrow-schema",
178+
]
179+
oracle-federation = ["oracle", "federation"]
170180

171181
# docs.rs-specific configuration
172182
[package.metadata.docs.rs]

core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub mod flight;
1717
pub mod mysql;
1818
#[cfg(feature = "odbc")]
1919
pub mod odbc;
20+
#[cfg(feature = "oracle")]
21+
pub mod oracle;
2022
#[cfg(feature = "postgres")]
2123
pub mod postgres;
2224
#[cfg(feature = "sqlite")]

core/src/oracle.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use crate::sql::db_connection_pool::oraclepool::OracleConnectionPool;
2+
use crate::sql::{db_connection_pool, sql_provider_datafusion::SqlTable};
3+
use async_trait::async_trait;
4+
5+
use datafusion::error::DataFusionError;
6+
use datafusion::{
7+
catalog::{Session, TableProviderFactory},
8+
datasource::TableProvider,
9+
logical_expr::CreateExternalTable,
10+
sql::TableReference,
11+
};
12+
use secrecy::SecretString;
13+
use snafu::prelude::*;
14+
use std::collections::HashMap;
15+
use std::sync::Arc;
16+
17+
#[cfg(feature = "oracle-federation")]
18+
pub mod federation;
19+
pub mod sql_table;
20+
pub mod write;
21+
22+
use self::sql_table::OracleTable;
23+
use crate::sql::db_connection_pool::dbconnection::oracleconn::OraclePooledConnection;
24+
25+
#[derive(Debug, Snafu)]
26+
pub enum Error {
27+
#[snafu(display("DbConnectionError: {source}"))]
28+
DbConnectionError {
29+
source: db_connection_pool::dbconnection::GenericError,
30+
},
31+
32+
#[snafu(display("Unable to create Oracle connection pool: {source}"))]
33+
UnableToCreateConnectionPool {
34+
source: db_connection_pool::oraclepool::Error,
35+
},
36+
37+
#[snafu(display("Unable to create table provider: {source}"))]
38+
UnableToCreateTableProvider {
39+
source: Box<dyn std::error::Error + Send + Sync>,
40+
},
41+
}
42+
43+
pub type Result<T, E = Error> = std::result::Result<T, E>;
44+
45+
pub struct OracleTableFactory {
46+
pool: Arc<OracleConnectionPool>,
47+
}
48+
49+
impl OracleTableFactory {
50+
#[must_use]
51+
pub fn new(pool: Arc<OracleConnectionPool>) -> Self {
52+
Self { pool }
53+
}
54+
55+
pub async fn table_provider(
56+
&self,
57+
table_reference: TableReference,
58+
) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync>> {
59+
let pool = Arc::clone(&self.pool);
60+
let dyn_pool = pool as Arc<
61+
dyn db_connection_pool::DbConnectionPool<
62+
OraclePooledConnection,
63+
oracle::sql_type::OracleType,
64+
> + Send
65+
+ Sync
66+
+ 'static,
67+
>;
68+
69+
let table = SqlTable::new("oracle", &dyn_pool, table_reference)
70+
.await
71+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
72+
73+
let oracle_table = Arc::new(OracleTable::new(Arc::clone(&self.pool), table));
74+
75+
#[cfg(feature = "oracle-federation")]
76+
let oracle_table = Arc::new(
77+
oracle_table
78+
.create_federated_table_provider()
79+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
80+
);
81+
82+
Ok(oracle_table)
83+
}
84+
}
85+
86+
#[derive(Debug)]
87+
pub struct OracleTableProviderFactory {}
88+
89+
impl OracleTableProviderFactory {
90+
#[must_use]
91+
pub fn new() -> Self {
92+
Self {}
93+
}
94+
}
95+
96+
impl Default for OracleTableProviderFactory {
97+
fn default() -> Self {
98+
Self::new()
99+
}
100+
}
101+
102+
#[async_trait]
103+
impl TableProviderFactory for OracleTableProviderFactory {
104+
async fn create(
105+
&self,
106+
_state: &dyn Session,
107+
cmd: &CreateExternalTable,
108+
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
109+
let name = cmd.name.to_string();
110+
let options = &cmd.options;
111+
112+
// Construct params from options
113+
let mut params: HashMap<String, SecretString> = HashMap::new();
114+
for (k, v) in options {
115+
params.insert(k.clone(), SecretString::from(v.clone()));
116+
}
117+
118+
let pool = OracleConnectionPool::new(params)
119+
.await
120+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
121+
122+
let factory = OracleTableFactory::new(Arc::new(pool));
123+
124+
let table = factory
125+
.table_provider(TableReference::from(name))
126+
.await
127+
.map_err(|e| DataFusionError::External(e))?;
128+
129+
Ok(table)
130+
}
131+
}

core/src/oracle/federation.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use crate::sql::db_connection_pool::dbconnection::oracleconn::OraclePooledConnection;
2+
use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError};
3+
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
4+
use arrow::datatypes::SchemaRef;
5+
use async_trait::async_trait;
6+
use datafusion::sql::unparser::dialect::Dialect;
7+
use datafusion_federation::sql::{
8+
AstAnalyzer, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
9+
};
10+
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
11+
use futures::TryStreamExt;
12+
use std::sync::Arc;
13+
14+
use super::sql_table::OracleTable;
15+
use datafusion::{
16+
datasource::TableProvider,
17+
error::{DataFusionError, Result as DataFusionResult},
18+
execution::SendableRecordBatchStream,
19+
physical_plan::stream::RecordBatchStreamAdapter,
20+
sql::TableReference,
21+
};
22+
23+
impl OracleTable {
24+
pub fn create_federated_table_source(
25+
self: Arc<Self>,
26+
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
27+
let table_reference = self.base_table.table_reference.clone();
28+
let schema = Arc::clone(&self.base_table.schema());
29+
let fed_provider = Arc::new(SQLFederationProvider::new(self.clone()));
30+
Ok(Arc::new(SQLTableSource::new_with_schema(
31+
fed_provider,
32+
RemoteTableRef::from(table_reference),
33+
schema,
34+
)))
35+
}
36+
37+
pub fn create_federated_table_provider(
38+
self: Arc<Self>,
39+
) -> DataFusionResult<FederatedTableProviderAdaptor> {
40+
let table_source = self.clone().create_federated_table_source()?;
41+
Ok(FederatedTableProviderAdaptor::new_with_provider(
42+
table_source,
43+
self,
44+
))
45+
}
46+
}
47+
48+
#[async_trait]
49+
impl SQLExecutor for OracleTable {
50+
fn name(&self) -> &str {
51+
self.base_table.name()
52+
}
53+
54+
fn compute_context(&self) -> Option<String> {
55+
None
56+
}
57+
58+
fn dialect(&self) -> Arc<dyn Dialect> {
59+
Arc::new(datafusion::sql::unparser::dialect::PostgreSqlDialect {})
60+
}
61+
62+
fn execute(
63+
&self,
64+
query: &str,
65+
schema: SchemaRef,
66+
) -> DataFusionResult<SendableRecordBatchStream> {
67+
let pool = self.base_table.clone_pool();
68+
let dyn_pool = pool as Arc<
69+
dyn crate::sql::db_connection_pool::DbConnectionPool<
70+
OraclePooledConnection,
71+
oracle::sql_type::OracleType,
72+
> + Send
73+
+ Sync,
74+
>;
75+
let fut = get_stream(dyn_pool, query.to_string(), Arc::clone(&schema));
76+
77+
let stream = futures::stream::once(fut).try_flatten();
78+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
79+
}
80+
81+
async fn table_names(&self) -> DataFusionResult<Vec<String>> {
82+
Err(DataFusionError::NotImplemented(
83+
"table inference not implemented".to_string(),
84+
))
85+
}
86+
87+
async fn get_table_schema(&self, table_name: &str) -> DataFusionResult<SchemaRef> {
88+
let pool = self.base_table.clone_pool();
89+
let dyn_pool = pool as Arc<
90+
dyn crate::sql::db_connection_pool::DbConnectionPool<
91+
OraclePooledConnection,
92+
oracle::sql_type::OracleType,
93+
> + Send
94+
+ Sync,
95+
>;
96+
let conn = dyn_pool.connect().await.map_err(to_execution_error)?;
97+
get_schema(conn, &TableReference::from(table_name))
98+
.await
99+
.boxed()
100+
.map_err(|e| DbError::UnableToGetSchema { source: e })
101+
.map_err(to_execution_error)
102+
}
103+
}

0 commit comments

Comments
 (0)