Skip to content

Commit 6f5b5fc

Browse files
authored
feat(java/driver/jni): wire up cancel, getParameterSchema (#4249)
Closes #4239.
1 parent 693c085 commit 6f5b5fc

8 files changed

Lines changed: 152 additions & 8 deletions

File tree

java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface AdbcConnection extends AutoCloseable, AdbcOptions {
4040
* @since ADBC API revision 1.1.0
4141
*/
4242
default void cancel() throws AdbcException {
43-
throw AdbcException.notImplemented("Statement does not support cancel");
43+
throw AdbcException.notImplemented("Connection does not support cancel");
4444
}
4545

4646
/** Commit the pending transaction. */

java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,43 @@ void selectQuery() throws Exception {
316316
assertThat(result.getReader().loadNextBatch()).isTrue();
317317
assertThat(result.getReader().loadNextBatch()).isFalse();
318318
}
319+
320+
assertThat(stmt.getParameterSchema().getFields()).isEmpty();
319321
}
320-
// TODO(https://github.com/apache/arrow-adbc/issues/4239): test get parameter schema
322+
323+
try (var stmt = conn.createStatement()) {
324+
stmt.setSqlQuery("SELECT $1 || 'foo'");
325+
assertSchema(stmt.getParameterSchema())
326+
.isEqualTo(new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType()))));
327+
}
328+
}
329+
330+
@Test
331+
void cancelQuery() throws Exception {
332+
// There's nothing really we can test reliably; it is wired up but we'd need a long-running
333+
// query and a reliable way to start the cancel at the right time
334+
Schema schema = new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType())));
335+
try (var stmt = conn.createStatement();
336+
var vsr = VectorSchemaRoot.create(schema, allocator)) {
337+
var vcv = (VarCharVector) vsr.getVector(0);
338+
vcv.setSafe(0, "test".getBytes(StandardCharsets.UTF_8));
339+
vcv.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
340+
341+
stmt.setSqlQuery("SELECT CAST($1 AS VARCHAR) || 'foo'");
342+
stmt.bind(vsr);
343+
344+
try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
345+
stmt.cancel();
346+
//noinspection StatementWithEmptyBody
347+
while (result.getReader().loadNextBatch()) {}
348+
}
349+
}
350+
}
351+
352+
@Test
353+
void cancelConnection() throws Exception {
354+
// There's nothing really we can test reliably
355+
conn.cancel();
321356
}
322357

323358
@Test

java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,43 @@ void selectQuery() throws Exception {
322322
assertThat(result.getReader().loadNextBatch()).isTrue();
323323
assertThat(result.getReader().loadNextBatch()).isFalse();
324324
}
325+
326+
assertThat(stmt.getParameterSchema().getFields()).isEmpty();
325327
}
326-
// TODO(https://github.com/apache/arrow-adbc/issues/4239): test get parameter schema
328+
329+
try (var stmt = conn.createStatement()) {
330+
stmt.setSqlQuery("SELECT CONCAT(CAST(@p1 AS NVARCHAR), 'foo')");
331+
assertSchema(stmt.getParameterSchema())
332+
.isEqualTo(new Schema(List.of(Field.nullable("@p1", Types.MinorType.VARCHAR.getType()))));
333+
}
334+
}
335+
336+
@Test
337+
void cancelQuery() throws Exception {
338+
// There's nothing really we can test reliably (MSSQL driver doesn't react to cancel)
339+
Schema schema = new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType())));
340+
try (var stmt = conn.createStatement();
341+
var vsr = VectorSchemaRoot.create(schema, allocator)) {
342+
var vcv = (VarCharVector) vsr.getVector(0);
343+
vcv.setSafe(0, "test".getBytes(StandardCharsets.UTF_8));
344+
vcv.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
345+
346+
stmt.setSqlQuery("SELECT CAST(@p1 AS NVARCHAR) || 'foo'");
347+
stmt.bind(vsr);
348+
349+
// N.B. the MSSQL driver doesn't appear to react
350+
try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
351+
stmt.cancel();
352+
//noinspection StatementWithEmptyBody
353+
while (result.getReader().loadNextBatch()) {}
354+
}
355+
}
356+
}
357+
358+
@Test
359+
void cancelConnection() throws Exception {
360+
// There's nothing really we can test reliably
361+
conn.cancel();
327362
}
328363

329364
@Test

java/driver/jni/src/main/cpp/jni_wrapper.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,33 @@ jobject MakeNativeSchemaResult(JNIEnv* env, struct ArrowSchema* schema) {
353353
static_cast<jlong>(reinterpret_cast<uintptr_t>(schema)));
354354
}
355355

356+
JNIEXPORT void JNICALL
357+
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementCancel(
358+
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
359+
struct AdbcError error = ADBC_ERROR_INIT;
360+
auto* ptr = reinterpret_cast<struct AdbcStatement*>(static_cast<uintptr_t>(handle));
361+
try {
362+
CHECK_ADBC_ERROR(AdbcStatementCancel(ptr, &error), error);
363+
} catch (const AdbcException& e) {
364+
e.ThrowJavaException(env);
365+
}
366+
}
367+
368+
JNIEXPORT jobject JNICALL
369+
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementGetParameterSchema(
370+
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
371+
struct AdbcError error = ADBC_ERROR_INIT;
372+
auto* ptr = reinterpret_cast<struct AdbcStatement*>(static_cast<uintptr_t>(handle));
373+
struct ArrowSchema schema = {};
374+
try {
375+
CHECK_ADBC_ERROR(AdbcStatementGetParameterSchema(ptr, &schema, &error), error);
376+
return MakeNativeSchemaResult(env, &schema);
377+
} catch (const AdbcException& e) {
378+
e.ThrowJavaException(env);
379+
}
380+
return nullptr;
381+
}
382+
356383
JNIEXPORT jobject JNICALL
357384
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteQuery(
358385
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
@@ -621,6 +648,18 @@ Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementSetOptionString(
621648
}
622649
}
623650

651+
JNIEXPORT void JNICALL
652+
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionCancel(
653+
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
654+
struct AdbcError error = ADBC_ERROR_INIT;
655+
auto* ptr = reinterpret_cast<struct AdbcConnection*>(static_cast<uintptr_t>(handle));
656+
try {
657+
CHECK_ADBC_ERROR(AdbcConnectionCancel(ptr, &error), error);
658+
} catch (const AdbcException& e) {
659+
e.ThrowJavaException(env);
660+
}
661+
}
662+
624663
JNIEXPORT jobject JNICALL
625664
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionGetObjects(
626665
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jint depth, jstring catalog,

java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public AdbcStatement createStatement() throws AdbcException {
4545
return new JniStatement(allocator, JniLoader.INSTANCE.openStatement(handle));
4646
}
4747

48+
@Override
49+
public void cancel() throws AdbcException {
50+
JniLoader.INSTANCE.connectionCancel(handle);
51+
}
52+
4853
@Override
4954
public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
5055
throws AdbcException {

java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public JniStatement(BufferAllocator allocator, NativeStatementHandle handle) {
4040
this.handle = handle;
4141
}
4242

43+
@Override
44+
public void cancel() throws AdbcException {
45+
JniLoader.INSTANCE.statementCancel(handle);
46+
}
47+
4348
@Override
4449
public void setSqlQuery(String query) throws AdbcException {
4550
JniLoader.INSTANCE.statementSetSqlQuery(handle, query);
@@ -87,6 +92,11 @@ public Schema executeSchema() throws AdbcException {
8792
return JniLoader.INSTANCE.statementExecuteSchema(handle).importSchema(allocator);
8893
}
8994

95+
@Override
96+
public Schema getParameterSchema() throws AdbcException {
97+
return JniLoader.INSTANCE.statementGetParameterSchema(handle).importSchema(allocator);
98+
}
99+
90100
@Override
91101
public void prepare() throws AdbcException {
92102
JniLoader.INSTANCE.statementPrepare(handle);
@@ -143,7 +153,7 @@ public <T> void setOption(TypedKey<T> key, T value) throws AdbcException {
143153
JniLoader.INSTANCE.statementSetOptionDouble(handle, key.getKey(), (Double) value);
144154
} else if (value instanceof Boolean) {
145155
JniLoader.INSTANCE.statementSetOptionString(
146-
handle, key.getKey(), ((Boolean) value) ? "true" : "false");
156+
handle, key.getKey(), ((Boolean) value).toString());
147157
} else if (value instanceof byte[]) {
148158
JniLoader.INSTANCE.statementSetOptionBytes(handle, key.getKey(), (byte[]) value);
149159
} else {

java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public NativeStatementHandle openStatement(NativeConnectionHandle connection)
8181
return NativeAdbc.openStatement(connection.getConnectionHandle());
8282
}
8383

84+
public void statementCancel(NativeStatementHandle statement) throws AdbcException {
85+
NativeAdbc.statementCancel(statement.getStatementHandle());
86+
}
87+
8488
public NativeQueryResult statementExecuteQuery(NativeStatementHandle statement)
8589
throws AdbcException {
8690
return NativeAdbc.statementExecuteQuery(statement.getStatementHandle());
@@ -110,6 +114,11 @@ public NativeSchemaResult statementExecuteSchema(NativeStatementHandle statement
110114
return NativeAdbc.statementExecuteSchema(statement.getStatementHandle());
111115
}
112116

117+
public NativeSchemaResult statementGetParameterSchema(NativeStatementHandle statement)
118+
throws AdbcException {
119+
return NativeAdbc.statementGetParameterSchema(statement.getStatementHandle());
120+
}
121+
113122
public byte[] statementGetOptionBytes(NativeStatementHandle handle, String key)
114123
throws AdbcException {
115124
return NativeAdbc.statementGetOptionBytes(handle.getStatementHandle(), key);
@@ -150,6 +159,10 @@ public void statementSetOptionString(NativeStatementHandle statement, String key
150159
NativeAdbc.statementSetOptionString(statement.getStatementHandle(), key, value);
151160
}
152161

162+
public void connectionCancel(NativeConnectionHandle connection) throws AdbcException {
163+
NativeAdbc.connectionCancel(connection.getConnectionHandle());
164+
}
165+
153166
public NativeQueryResult connectionGetObjects(
154167
NativeConnectionHandle connection,
155168
int depth,

java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ static native NativeDatabaseHandle openDatabase(int version, String[] parameters
3434

3535
static native void closeStatement(long handle) throws AdbcException;
3636

37-
static native NativeQueryResult statementExecuteQuery(long handle) throws AdbcException;
38-
39-
static native void statementSetSqlQuery(long handle, String query) throws AdbcException;
40-
4137
static native void statementBind(long handle, long values, long schema) throws AdbcException;
4238

4339
// TODO(lidavidm): we need a way to bind an ArrowReader (or some other suitable interface that
@@ -46,12 +42,21 @@ static native NativeDatabaseHandle openDatabase(int version, String[] parameters
4642
@SuppressWarnings("unused")
4743
static native void statementBindStream(long handle, long stream) throws AdbcException;
4844

45+
static native void statementCancel(long handle) throws AdbcException;
46+
4947
static native long statementExecuteUpdate(long handle) throws AdbcException;
5048

5149
static native void statementPrepare(long handle) throws AdbcException;
5250

51+
static native NativeQueryResult statementExecuteQuery(long handle) throws AdbcException;
52+
5353
static native NativeSchemaResult statementExecuteSchema(long handle) throws AdbcException;
5454

55+
static native NativeSchemaResult statementGetParameterSchema(long statementHandle)
56+
throws AdbcException;
57+
58+
static native void statementSetSqlQuery(long handle, String query) throws AdbcException;
59+
5560
static native byte[] statementGetOptionBytes(long handle, String key) throws AdbcException;
5661

5762
static native double statementGetOptionDouble(long handle, String key) throws AdbcException;
@@ -72,6 +77,8 @@ static native void statementSetOptionLong(long handle, String key, long value)
7277
static native void statementSetOptionString(long handle, String key, String value)
7378
throws AdbcException;
7479

80+
static native void connectionCancel(long handle) throws AdbcException;
81+
7582
static native NativeQueryResult connectionGetObjects(
7683
long handle,
7784
int depth,

0 commit comments

Comments
 (0)