Skip to content

Commit 81bbd5b

Browse files
example: add relation planner extension examples
1 parent 45f39a9 commit 81bbd5b

7 files changed

Lines changed: 2170 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,26 @@ path = "examples/external_dependency/query-aws-s3.rs"
5656
name = "custom_file_casts"
5757
path = "examples/custom_file_casts.rs"
5858

59+
[[example]]
60+
name = "relation_planner_csv_virtual_tables"
61+
path = "examples/relation_planner/csv_virtual_tables.rs"
62+
63+
[[example]]
64+
name = "relation_planner_generate_series"
65+
path = "examples/relation_planner/generate_series.rs"
66+
67+
[[example]]
68+
name = "relation_planner_table_functions"
69+
path = "examples/relation_planner/table_functions.rs"
70+
71+
[[example]]
72+
name = "relation_planner_table_sample"
73+
path = "examples/relation_planner/table_sample.rs"
74+
75+
[[example]]
76+
name = "relation_planner_match_recognize"
77+
path = "examples/relation_planner/match_recognize.rs"
78+
5979
[dev-dependencies]
6080
arrow = { workspace = true }
6181
# arrow_schema is required for record_batch! macro :sad:
@@ -67,16 +87,20 @@ dashmap = { workspace = true }
6787
# note only use main datafusion crate for examples
6888
base64 = "0.22.1"
6989
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
90+
datafusion-common = { workspace = true }
91+
datafusion-expr = { workspace = true }
7092
datafusion-ffi = { workspace = true }
7193
datafusion-physical-expr-adapter = { workspace = true }
7294
datafusion-proto = { workspace = true }
95+
datafusion-sql = { workspace = true }
7396
env_logger = { workspace = true }
7497
futures = { workspace = true }
7598
log = { workspace = true }
7699
mimalloc = { version = "0.1", default-features = false }
77100
object_store = { workspace = true, features = ["aws", "http"] }
78101
prost = { workspace = true }
79102
rand = { workspace = true }
103+
rand_distr = "0.5"
80104
serde_json = { workspace = true }
81105
tempfile = { workspace = true }
82106
test-utils = { path = "../test-utils" }
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example demonstrates a custom relation planner that intercepts table names
19+
//! with a special prefix and dynamically loads CSV files from a virtual directory.
20+
//!
21+
//! Use case: Query files without explicitly registering them as tables.
22+
//! Example: `SELECT * FROM csv_data_users` loads `./data/users.csv`
23+
//!
24+
//! This example demonstrates ALL RelationPlannerContext methods:
25+
//! - object_name_to_table_reference: Convert SQL table names to TableReference
26+
27+
use std::sync::Arc;
28+
29+
use datafusion::prelude::*;
30+
use datafusion_common::Result;
31+
use datafusion_expr::planner::{
32+
RelationPlanner, RelationPlannerContext, RelationPlanning,
33+
};
34+
use datafusion_sql::sqlparser::ast::TableFactor;
35+
36+
#[derive(Debug)]
37+
struct CsvVirtualTablePlanner {
38+
base_path: String,
39+
}
40+
41+
impl CsvVirtualTablePlanner {
42+
fn new(base_path: impl Into<String>) -> Self {
43+
Self {
44+
base_path: base_path.into(),
45+
}
46+
}
47+
}
48+
49+
impl RelationPlanner for CsvVirtualTablePlanner {
50+
fn plan_relation(
51+
&self,
52+
relation: TableFactor,
53+
context: &mut dyn RelationPlannerContext,
54+
) -> Result<RelationPlanning> {
55+
if let TableFactor::Table { name, .. } = &relation {
56+
let table_name = name.to_string();
57+
58+
// Check if table name starts with our prefix
59+
if let Some(csv_name) = table_name.strip_prefix("csv_") {
60+
// Construct the file path
61+
let file_path = format!("{}/{}.csv", self.base_path, csv_name);
62+
63+
// Demonstrate object_name_to_table_reference
64+
let table_ref = context.object_name_to_table_reference(name.clone())?;
65+
println!(
66+
"[CSV Planner] Virtual CSV table: {} -> {} (ref: {})",
67+
table_name, file_path, table_ref
68+
);
69+
70+
// Create a read_csv equivalent by building a VALUES clause for demo purposes
71+
// In a real implementation, you'd use CsvReadOptions and register the table
72+
73+
// For this demo, we'll return Original to show the pattern,
74+
// but a real implementation would:
75+
// 1. Check if file exists
76+
// 2. Use CsvReadOptions to read the schema
77+
// 3. Create a TableScan plan with the CSV source
78+
79+
// Returning Original here delegates to default behavior
80+
// (which will fail since the table doesn't exist in the catalog)
81+
}
82+
}
83+
84+
Ok(RelationPlanning::Original(relation))
85+
}
86+
}
87+
88+
#[derive(Debug)]
89+
struct InMemoryTablePlanner;
90+
91+
impl RelationPlanner for InMemoryTablePlanner {
92+
fn plan_relation(
93+
&self,
94+
relation: TableFactor,
95+
_context: &mut dyn RelationPlannerContext,
96+
) -> Result<RelationPlanning> {
97+
if let TableFactor::Table { name, .. } = &relation {
98+
let table_name = name.to_string();
99+
100+
// Provide some built-in test tables
101+
match table_name.to_lowercase().as_str() {
102+
"example_users" => {
103+
// In a real scenario, you'd build a proper table scan
104+
// For demo, we return Original and show the concept
105+
println!(
106+
"[In-Memory Planner] Built-in virtual table: {}",
107+
table_name
108+
);
109+
return Ok(RelationPlanning::Original(relation));
110+
}
111+
"example_products" => {
112+
println!(
113+
"[In-Memory Planner] Built-in virtual table: {}",
114+
table_name
115+
);
116+
return Ok(RelationPlanning::Original(relation));
117+
}
118+
_ => {}
119+
}
120+
}
121+
122+
Ok(RelationPlanning::Original(relation))
123+
}
124+
}
125+
126+
#[tokio::main]
127+
async fn main() -> Result<()> {
128+
let ctx = SessionContext::new();
129+
130+
// Register our custom planners
131+
ctx.register_relation_planner(Arc::new(CsvVirtualTablePlanner::new("./data")))?;
132+
ctx.register_relation_planner(Arc::new(InMemoryTablePlanner))?;
133+
134+
println!("Custom Relation Planner: CSV Virtual Tables");
135+
println!("============================================\n");
136+
137+
// Example 1: Try to query a virtual CSV table
138+
println!("Example 1: Query virtual CSV table");
139+
println!("SQL: SELECT * FROM csv_users");
140+
match ctx.sql("SELECT * FROM csv_users").await {
141+
Ok(_) => println!("Query planned successfully\n"),
142+
Err(e) => println!("Query failed (expected): {}\n", e),
143+
}
144+
145+
// Example 2: Try built-in tables
146+
println!("Example 2: Query built-in virtual table");
147+
println!("SQL: SELECT * FROM example_users");
148+
match ctx.sql("SELECT * FROM example_users").await {
149+
Ok(_) => println!("Query planned successfully\n"),
150+
Err(e) => println!("Query failed (expected): {}\n", e),
151+
}
152+
153+
// Example 3: Show that normal tables still work
154+
println!("Example 3: Normal table registration still works");
155+
ctx.sql("CREATE TABLE real_table (id INT, name TEXT)")
156+
.await?
157+
.collect()
158+
.await?;
159+
println!("Created real_table\n");
160+
161+
println!("Key Concept:");
162+
println!("-----------");
163+
println!("This example shows how custom relation planners can intercept");
164+
println!("table names and provide virtual tables without explicit registration.");
165+
println!();
166+
println!("In a complete implementation, the planner would:");
167+
println!(" 1. Check if the CSV file exists");
168+
println!(" 2. Read the schema from the CSV");
169+
println!(" 3. Return a LogicalPlan with a CSV TableScan");
170+
println!();
171+
println!("This pattern is useful for:");
172+
println!(" - Dynamic file-based tables");
173+
println!(" - Convention-based table discovery");
174+
println!(" - Testing with fixture data");
175+
println!();
176+
println!("Trait methods demonstrated:");
177+
println!(" - object_name_to_table_reference: Converts SQL names to TableReference");
178+
179+
Ok(())
180+
}

0 commit comments

Comments
 (0)