Skip to content

Commit 6cdb34b

Browse files
authored
Register dataframe as table to allow further sql queries over df (#63)
* register dataframe as table to allow further sql queries over df * indent
1 parent 2189244 commit 6cdb34b

5 files changed

Lines changed: 79 additions & 1 deletion

File tree

datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static void main(String[] args) throws Exception {
3333
.registerParquet(
3434
"test_parquet", Paths.get("src/main/resources/aggregate_test_100.parquet"))
3535
.join();
36-
context.sql("select * from test_parquet limit 3").thenComposeAsync(DataFrame::show).join();
36+
context.sql("select * from test_parquet limit 5").thenComposeAsync(DataFrame::show).join();
3737

3838
context
3939
.sql("select * from test_csv")
@@ -52,6 +52,13 @@ public static void main(String[] args) throws Exception {
5252
.sql("select * from test_parquet limit 3")
5353
.thenComposeAsync(df -> df.writeParquet(tempPath.resolve("parquet-out")))
5454
.join();
55+
56+
context
57+
.sql("select * from test_parquet limit 3")
58+
.thenComposeAsync(df -> df.registerTable(context, "test_parquet_limited"))
59+
.join();
60+
61+
context.sql("select * from test_parquet_limited").thenComposeAsync(DataFrame::show).join();
5562
}
5663
}
5764

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,12 @@ public interface DataFrame extends NativeProxy {
4343
* @return null
4444
*/
4545
CompletableFuture<Void> writeCsv(Path path);
46+
47+
/**
48+
* Register this dataframe as a temporary table.
49+
* @param context SessionContext to register table to
50+
* @param name name of the tmp table
51+
* @return null
52+
*/
53+
CompletableFuture<Void> registerTable(SessionContext context, String name);
4654
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,7 @@ static native void writeParquet(
1919
long runtime, long dataframe, String path, Consumer<String> callback);
2020

2121
static native void writeCsv(long runtime, long dataframe, String path, Consumer<String> callback);
22+
23+
static native void registerTable(
24+
long runtime, long dataframe, long context, String name, Consumer<String> callback);
2225
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,27 @@ public CompletableFuture<Void> writeCsv(Path path) {
104104
return future;
105105
}
106106

107+
public CompletableFuture<Void> registerTable(SessionContext ctx, String name) {
108+
Runtime runtime = context.getRuntime();
109+
long runtimePointer = runtime.getPointer();
110+
long dataframe = getPointer();
111+
long contextPointer = ctx.getPointer();
112+
CompletableFuture<Void> future = new CompletableFuture<>();
113+
DataFrames.registerTable(
114+
runtimePointer,
115+
dataframe,
116+
contextPointer,
117+
name,
118+
(String errString) -> {
119+
if (containsError(errString)) {
120+
future.completeExceptionally(new RuntimeException(errString));
121+
} else {
122+
future.complete(null);
123+
}
124+
});
125+
return future;
126+
}
127+
107128
@Override
108129
void doClose(long pointer) {
109130
DataFrames.destroyDataFrame(pointer);

datafusion-jni/src/dataframe.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::convert::Into;
77
use std::io::BufWriter;
88
use std::io::Cursor;
99
use std::sync::Arc;
10+
use datafusion::prelude::SessionContext;
1011
use tokio::runtime::Runtime;
1112

1213
#[no_mangle]
@@ -153,6 +154,44 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv(
153154
});
154155
}
155156

157+
#[no_mangle]
158+
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable(
159+
env: JNIEnv,
160+
_class: JClass,
161+
runtime: jlong,
162+
dataframe: jlong,
163+
session: jlong,
164+
name: JString,
165+
callback: JObject,
166+
) {
167+
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
168+
let dataframe = unsafe { &mut *(dataframe as *mut Arc<DataFrame>) };
169+
let context = unsafe { &mut *(session as *mut SessionContext) };
170+
let name: String = env
171+
.get_string(name)
172+
.expect("Couldn't get name as string!")
173+
.into();
174+
runtime.block_on(async {
175+
let r = context.register_table(name.as_str(), dataframe.clone());
176+
let err_message: JValue = match r {
177+
Ok(_) => JValue::Void,
178+
Err(err) => {
179+
let err_message = env
180+
.new_string(err.to_string())
181+
.expect("Couldn't create java string!");
182+
err_message.into()
183+
}
184+
};
185+
env.call_method(
186+
callback,
187+
"accept",
188+
"(Ljava/lang/Object;)V",
189+
&[err_message.into()],
190+
)
191+
.expect("failed to call method");
192+
});
193+
}
194+
156195
#[no_mangle]
157196
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_destroyDataFrame(
158197
_env: JNIEnv,

0 commit comments

Comments
 (0)