Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface AdbcConnection extends AutoCloseable, AdbcOptions {
* @since ADBC API revision 1.1.0
*/
default void cancel() throws AdbcException {
throw AdbcException.notImplemented("Statement does not support cancel");
throw AdbcException.notImplemented("Connection does not support cancel");
}

/** Commit the pending transaction. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,43 @@ void selectQuery() throws Exception {
assertThat(result.getReader().loadNextBatch()).isTrue();
assertThat(result.getReader().loadNextBatch()).isFalse();
}

assertThat(stmt.getParameterSchema().getFields()).isEmpty();
}
// TODO(https://github.com/apache/arrow-adbc/issues/4239): test get parameter schema

try (var stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT $1 || 'foo'");
assertSchema(stmt.getParameterSchema())
.isEqualTo(new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType()))));
}
}

@Test
void cancelQuery() throws Exception {
// There's nothing really we can test reliably; it is wired up but we'd need a long-running
// query and a reliable way to start the cancel at the right time
Schema schema = new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType())));
try (var stmt = conn.createStatement();
var vsr = VectorSchemaRoot.create(schema, allocator)) {
var vcv = (VarCharVector) vsr.getVector(0);
vcv.setSafe(0, "test".getBytes(StandardCharsets.UTF_8));
vcv.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));

stmt.setSqlQuery("SELECT CAST($1 AS VARCHAR) || 'foo'");
stmt.bind(vsr);

try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
stmt.cancel();
//noinspection StatementWithEmptyBody
while (result.getReader().loadNextBatch()) {}
}
}
}

@Test
void cancelConnection() throws Exception {
// There's nothing really we can test reliably
conn.cancel();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,43 @@ void selectQuery() throws Exception {
assertThat(result.getReader().loadNextBatch()).isTrue();
assertThat(result.getReader().loadNextBatch()).isFalse();
}

assertThat(stmt.getParameterSchema().getFields()).isEmpty();
}
// TODO(https://github.com/apache/arrow-adbc/issues/4239): test get parameter schema

try (var stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT CONCAT(CAST(@p1 AS NVARCHAR), 'foo')");
assertSchema(stmt.getParameterSchema())
.isEqualTo(new Schema(List.of(Field.nullable("@p1", Types.MinorType.VARCHAR.getType()))));
}
}

@Test
void cancelQuery() throws Exception {
// There's nothing really we can test reliably (MSSQL driver doesn't react to cancel)
Schema schema = new Schema(List.of(Field.nullable("$1", Types.MinorType.VARCHAR.getType())));
try (var stmt = conn.createStatement();
var vsr = VectorSchemaRoot.create(schema, allocator)) {
var vcv = (VarCharVector) vsr.getVector(0);
vcv.setSafe(0, "test".getBytes(StandardCharsets.UTF_8));
vcv.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));

stmt.setSqlQuery("SELECT CAST(@p1 AS NVARCHAR) || 'foo'");
stmt.bind(vsr);

// N.B. the MSSQL driver doesn't appear to react
try (AdbcStatement.QueryResult result = stmt.executeQuery()) {
stmt.cancel();
//noinspection StatementWithEmptyBody
while (result.getReader().loadNextBatch()) {}
}
}
}

@Test
void cancelConnection() throws Exception {
// There's nothing really we can test reliably
conn.cancel();
}

@Test
Expand Down
39 changes: 39 additions & 0 deletions java/driver/jni/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,33 @@ jobject MakeNativeSchemaResult(JNIEnv* env, struct ArrowSchema* schema) {
static_cast<jlong>(reinterpret_cast<uintptr_t>(schema)));
}

JNIEXPORT void JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementCancel(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
struct AdbcError error = ADBC_ERROR_INIT;
auto* ptr = reinterpret_cast<struct AdbcStatement*>(static_cast<uintptr_t>(handle));
try {
CHECK_ADBC_ERROR(AdbcStatementCancel(ptr, &error), error);
} catch (const AdbcException& e) {
e.ThrowJavaException(env);
}
}

JNIEXPORT jobject JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementGetParameterSchema(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
struct AdbcError error = ADBC_ERROR_INIT;
auto* ptr = reinterpret_cast<struct AdbcStatement*>(static_cast<uintptr_t>(handle));
struct ArrowSchema schema = {};
try {
CHECK_ADBC_ERROR(AdbcStatementGetParameterSchema(ptr, &schema, &error), error);
return MakeNativeSchemaResult(env, &schema);
} catch (const AdbcException& e) {
e.ThrowJavaException(env);
}
return nullptr;
}

JNIEXPORT jobject JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteQuery(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
Expand Down Expand Up @@ -621,6 +648,18 @@ Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementSetOptionString(
}
}

JNIEXPORT void JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionCancel(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
struct AdbcError error = ADBC_ERROR_INIT;
auto* ptr = reinterpret_cast<struct AdbcConnection*>(static_cast<uintptr_t>(handle));
try {
CHECK_ADBC_ERROR(AdbcConnectionCancel(ptr, &error), error);
} catch (const AdbcException& e) {
e.ThrowJavaException(env);
}
}

JNIEXPORT jobject JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionGetObjects(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jint depth, jstring catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public AdbcStatement createStatement() throws AdbcException {
return new JniStatement(allocator, JniLoader.INSTANCE.openStatement(handle));
}

@Override
public void cancel() throws AdbcException {
JniLoader.INSTANCE.connectionCancel(handle);
}

@Override
public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
throws AdbcException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public JniStatement(BufferAllocator allocator, NativeStatementHandle handle) {
this.handle = handle;
}

@Override
public void cancel() throws AdbcException {
JniLoader.INSTANCE.statementCancel(handle);
}

@Override
public void setSqlQuery(String query) throws AdbcException {
JniLoader.INSTANCE.statementSetSqlQuery(handle, query);
Expand Down Expand Up @@ -87,6 +92,11 @@ public Schema executeSchema() throws AdbcException {
return JniLoader.INSTANCE.statementExecuteSchema(handle).importSchema(allocator);
}

@Override
public Schema getParameterSchema() throws AdbcException {
return JniLoader.INSTANCE.statementGetParameterSchema(handle).importSchema(allocator);
}

@Override
public void prepare() throws AdbcException {
JniLoader.INSTANCE.statementPrepare(handle);
Expand Down Expand Up @@ -143,7 +153,7 @@ public <T> void setOption(TypedKey<T> key, T value) throws AdbcException {
JniLoader.INSTANCE.statementSetOptionDouble(handle, key.getKey(), (Double) value);
} else if (value instanceof Boolean) {
JniLoader.INSTANCE.statementSetOptionString(
handle, key.getKey(), ((Boolean) value) ? "true" : "false");
handle, key.getKey(), ((Boolean) value).toString());
} else if (value instanceof byte[]) {
JniLoader.INSTANCE.statementSetOptionBytes(handle, key.getKey(), (byte[]) value);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public NativeStatementHandle openStatement(NativeConnectionHandle connection)
return NativeAdbc.openStatement(connection.getConnectionHandle());
}

public void statementCancel(NativeStatementHandle statement) throws AdbcException {
NativeAdbc.statementCancel(statement.getStatementHandle());
}

public NativeQueryResult statementExecuteQuery(NativeStatementHandle statement)
throws AdbcException {
return NativeAdbc.statementExecuteQuery(statement.getStatementHandle());
Expand Down Expand Up @@ -110,6 +114,11 @@ public NativeSchemaResult statementExecuteSchema(NativeStatementHandle statement
return NativeAdbc.statementExecuteSchema(statement.getStatementHandle());
}

public NativeSchemaResult statementGetParameterSchema(NativeStatementHandle statement)
throws AdbcException {
return NativeAdbc.statementGetParameterSchema(statement.getStatementHandle());
}

public byte[] statementGetOptionBytes(NativeStatementHandle handle, String key)
throws AdbcException {
return NativeAdbc.statementGetOptionBytes(handle.getStatementHandle(), key);
Expand Down Expand Up @@ -150,6 +159,10 @@ public void statementSetOptionString(NativeStatementHandle statement, String key
NativeAdbc.statementSetOptionString(statement.getStatementHandle(), key, value);
}

public void connectionCancel(NativeConnectionHandle connection) throws AdbcException {
NativeAdbc.connectionCancel(connection.getConnectionHandle());
}

public NativeQueryResult connectionGetObjects(
NativeConnectionHandle connection,
int depth,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ static native NativeDatabaseHandle openDatabase(int version, String[] parameters

static native void closeStatement(long handle) throws AdbcException;

static native NativeQueryResult statementExecuteQuery(long handle) throws AdbcException;

static native void statementSetSqlQuery(long handle, String query) throws AdbcException;

static native void statementBind(long handle, long values, long schema) throws AdbcException;

// TODO(lidavidm): we need a way to bind an ArrowReader (or some other suitable interface that
Expand All @@ -46,12 +42,21 @@ static native NativeDatabaseHandle openDatabase(int version, String[] parameters
@SuppressWarnings("unused")
static native void statementBindStream(long handle, long stream) throws AdbcException;

static native void statementCancel(long handle) throws AdbcException;

static native long statementExecuteUpdate(long handle) throws AdbcException;

static native void statementPrepare(long handle) throws AdbcException;

static native NativeQueryResult statementExecuteQuery(long handle) throws AdbcException;

static native NativeSchemaResult statementExecuteSchema(long handle) throws AdbcException;

static native NativeSchemaResult statementGetParameterSchema(long statementHandle)
throws AdbcException;

static native void statementSetSqlQuery(long handle, String query) throws AdbcException;

static native byte[] statementGetOptionBytes(long handle, String key) throws AdbcException;

static native double statementGetOptionDouble(long handle, String key) throws AdbcException;
Expand All @@ -72,6 +77,8 @@ static native void statementSetOptionLong(long handle, String key, long value)
static native void statementSetOptionString(long handle, String key, String value)
throws AdbcException;

static native void connectionCancel(long handle) throws AdbcException;

static native NativeQueryResult connectionGetObjects(
long handle,
int depth,
Expand Down
Loading