Skip to content

Commit 04ff40c

Browse files
authored
Upgrade arrow to 2023 Spring version (#76)
* upgrade jni and tokio * fix lint * add unit test and binding * fix javadoc * add err case
1 parent 1ff0b3b commit 04ff40c

16 files changed

Lines changed: 163 additions & 81 deletions

File tree

datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,16 @@ public static void main(String[] args) throws Exception {
5555

5656
context
5757
.sql("select * from test_parquet limit 3")
58-
.thenComposeAsync(df -> df.registerTable(context, "test_parquet_limited"))
58+
.thenAccept(
59+
df -> {
60+
try {
61+
boolean previouslyRegistered =
62+
context.registerTable("test_parquet_limited", df.intoView()).isPresent();
63+
assert !previouslyRegistered;
64+
} catch (Exception e) {
65+
throw new RuntimeException(e);
66+
}
67+
})
5968
.join();
6069

6170
context.sql("select * from test_parquet_limited").thenComposeAsync(DataFrame::show).join();

datafusion-java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ java {
3030
withSourcesJar()
3131

3232
compileJava {
33-
options.compilerArgs += ["-h", "${buildDir}/target/headers"]
33+
options.compilerArgs += ["-h", "${layout.buildDirectory.asFile.get()}/target/headers"]
3434
}
3535
}
3636

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ public interface DataFrame extends NativeProxy {
4545
CompletableFuture<Void> writeCsv(Path path);
4646

4747
/**
48-
* Register this dataframe as a temporary table.
48+
* Converts this DataFrame into a TableProvider that can be registered as a table view using
49+
* {@link SessionContext#registerParquet(String, Path)}
4950
*
50-
* @param context SessionContext to register table to
51-
* @param name name of the tmp table
52-
* @return null
51+
* @return the table provider ready to be e.g. {@link SessionContext#registerTable(String,
52+
* TableProvider) registered}.
5353
*/
54-
CompletableFuture<Void> registerTable(SessionContext context, String name);
54+
TableProvider intoView();
5555
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,5 @@ static native void writeParquet(
2020

2121
static native void writeCsv(long runtime, long dataframe, String path, Consumer<String> callback);
2222

23-
static native void registerTable(
24-
long runtime, long dataframe, long context, String name, Consumer<String> callback);
23+
static native long intoView(long dataframe);
2524
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,11 @@ public CompletableFuture<Void> writeCsv(Path path) {
104104
return future;
105105
}
106106

107-
public CompletableFuture<Void> registerTable(SessionContext ctx, String name) {
108-
Runtime runtime = context.getRuntime();
109-
long runtimePointer = runtime.getPointer();
107+
@Override
108+
public TableProvider intoView() {
110109
long dataframe = getPointer();
111-
long contextPointer = ctx.getPointer();
112-
CompletableFuture<Void> future = new CompletableFuture<>();
113-
DataFrames.registerTable(
114-
runtimePointer,
115-
dataframe,
116-
contextPointer,
117-
name,
118-
(String errString) -> {
119-
if (containsError(errString)) {
120-
future.completeExceptionally(new RuntimeException(errString));
121-
} else {
122-
future.complete(null);
123-
}
124-
});
125-
return future;
110+
long tableProviderPointer = DataFrames.intoView(dataframe);
111+
return new DefaultTableProvider(tableProviderPointer);
126112
}
127113

128114
@Override

datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.arrow.datafusion;
22

33
import java.nio.file.Path;
4+
import java.util.Optional;
45
import java.util.concurrent.CompletableFuture;
56
import java.util.function.Consumer;
67
import org.slf4j.Logger;
@@ -19,6 +20,8 @@ static native void registerCsv(
1920
static native void registerParquet(
2021
long runtime, long context, String name, String path, Consumer<String> callback);
2122

23+
static native long registerTable(long context, String name, long tableProvider) throws Exception;
24+
2225
@Override
2326
public CompletableFuture<DataFrame> sql(String sql) {
2427
long runtime = getRuntime().getPointer();
@@ -64,6 +67,16 @@ public CompletableFuture<Void> registerParquet(String name, Path path) {
6467
return future;
6568
}
6669

70+
@Override
71+
public Optional<TableProvider> registerTable(String name, TableProvider tableProvider)
72+
throws Exception {
73+
long previouslyRegistered = registerTable(getPointer(), name, tableProvider.getPointer());
74+
if (previouslyRegistered == 0) {
75+
return Optional.empty();
76+
}
77+
return Optional.of(new DefaultTableProvider(previouslyRegistered));
78+
}
79+
6780
private void voidCallback(CompletableFuture<Void> future, String errMessage) {
6881
if (null != errMessage && !errMessage.isEmpty()) {
6982
future.completeExceptionally(new RuntimeException(errMessage));
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.apache.arrow.datafusion;
2+
3+
class DefaultTableProvider extends AbstractProxy implements TableProvider {
4+
DefaultTableProvider(long pointer) {
5+
super(pointer);
6+
}
7+
8+
@Override
9+
void doClose(long pointer) throws Exception {
10+
TableProviders.destroyTableProvider(pointer);
11+
}
12+
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.arrow.datafusion;
22

33
import java.nio.file.Path;
4+
import java.util.Optional;
45
import java.util.concurrent.CompletableFuture;
56

67
/** A session context holds resources and is the entrance for obtaining {@link DataFrame} */
@@ -32,6 +33,17 @@ public interface SessionContext extends AutoCloseable, NativeProxy {
3233
*/
3334
CompletableFuture<Void> registerParquet(String name, Path path);
3435

36+
/**
37+
* Registers a TableProvider as a table that can be referenced from SQL statements executed
38+
* against this context.
39+
*
40+
* @param name table reference
41+
* @param tableProvider table provider
42+
* @return as of Arrow 22 this is only {@link Optional#empty()}
43+
* @throws Exception when the table is already registered
44+
*/
45+
Optional<TableProvider> registerTable(String name, TableProvider tableProvider) throws Exception;
46+
3547
/**
3648
* Get the runtime associated with this context
3749
*
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package org.apache.arrow.datafusion;
2+
3+
/** vague interface that maps to {@code Arc<dyn TableProvider>}. */
4+
public interface TableProvider extends NativeProxy {}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.apache.arrow.datafusion;
2+
3+
class TableProviders {
4+
5+
private TableProviders() {}
6+
7+
static native void destroyTableProvider(long pointer);
8+
}

0 commit comments

Comments
 (0)