Skip to content

Commit 52141a5

Browse files
committed
feat(java/driver/jni): wire up ingest into temporary/namespace
Closes #4240.
1 parent c8060fd commit 52141a5

5 files changed

Lines changed: 287 additions & 0 deletions

File tree

java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ default AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
6363
"Connection does not support bulkIngest(String, BulkIngestMode)");
6464
}
6565

66+
/**
67+
* Create a new statement to bulk insert a {@link VectorSchemaRoot} into a table.
68+
*
69+
* <p>Bind data to the statement, then call {@link AdbcStatement#executeUpdate()}. See {@link
70+
* BulkIngestMode} for description of behavior around creating tables.
71+
*/
72+
default AdbcStatement bulkIngest(
73+
String targetTableName, BulkIngestMode mode, IngestOption... options) throws AdbcException {
74+
throw AdbcException.notImplemented(
75+
"Connection does not support bulkIngest(String, BulkIngestMode, IngestOption...)");
76+
}
77+
6678
/**
6779
* Create a result set from a serialized PartitionDescriptor.
6880
*
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adbc.core;
18+
19+
import java.util.Objects;
20+
import org.checkerframework.checker.nullness.qual.Nullable;
21+
22+
public interface IngestOption {
23+
TemporaryIngestOption TEMPORARY = new TemporaryIngestOption(true);
24+
TemporaryIngestOption NOT_TEMPORARY = new TemporaryIngestOption(false);
25+
26+
static IngestOption targetCatalog(String catalog) {
27+
return new TargetNamespaceIngestOption(catalog, null);
28+
}
29+
30+
static IngestOption targetDbSchema(String dbSchema) {
31+
return new TargetNamespaceIngestOption(null, dbSchema);
32+
}
33+
34+
static IngestOption targetNamespace(@Nullable String catalog, @Nullable String dbSchema) {
35+
return new TargetNamespaceIngestOption(catalog, dbSchema);
36+
}
37+
38+
class TemporaryIngestOption implements IngestOption {
39+
boolean temporary;
40+
41+
TemporaryIngestOption(boolean temporary) {
42+
this.temporary = temporary;
43+
}
44+
45+
public boolean isTemporary() {
46+
return temporary;
47+
}
48+
49+
@Override
50+
public boolean equals(Object o) {
51+
if (o == null || getClass() != o.getClass()) return false;
52+
TemporaryIngestOption that = (TemporaryIngestOption) o;
53+
return temporary == that.temporary;
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hashCode(temporary);
59+
}
60+
}
61+
62+
class TargetNamespaceIngestOption implements IngestOption {
63+
private final @Nullable String targetCatalog;
64+
private final @Nullable String targetDbSchema;
65+
66+
public TargetNamespaceIngestOption(
67+
@Nullable String targetCatalog, @Nullable String targetDbSchema) {
68+
this.targetCatalog = targetCatalog;
69+
this.targetDbSchema = targetDbSchema;
70+
}
71+
72+
public @Nullable String getTargetCatalog() {
73+
return targetCatalog;
74+
}
75+
76+
public @Nullable String getTargetDbSchema() {
77+
return targetDbSchema;
78+
}
79+
80+
@Override
81+
public boolean equals(Object o) {
82+
if (o == null || getClass() != o.getClass()) return false;
83+
TargetNamespaceIngestOption that = (TargetNamespaceIngestOption) o;
84+
return Objects.equals(targetCatalog, that.targetCatalog)
85+
&& Objects.equals(targetDbSchema, that.targetDbSchema);
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(targetCatalog, targetDbSchema);
91+
}
92+
}
93+
}

java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.arrow.adbc.core.AdbcStatement;
3737
import org.apache.arrow.adbc.core.AdbcStatusCode;
3838
import org.apache.arrow.adbc.core.BulkIngestMode;
39+
import org.apache.arrow.adbc.core.IngestOption;
3940
import org.apache.arrow.adbc.core.TypedKey;
4041
import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
4142
import org.apache.arrow.memory.BufferAllocator;
@@ -272,6 +273,81 @@ void bulkIngest() throws Exception {
272273
}
273274
}
274275

276+
@Test
277+
void bulkIngestTarget() throws Exception {
278+
runSetup(
279+
"DROP TABLE IF EXISTS secondary.foobar",
280+
"DROP SCHEMA IF EXISTS secondary",
281+
"CREATE SCHEMA secondary");
282+
283+
final Schema schema =
284+
new Schema(
285+
List.of(
286+
Field.nullable("ndx", Types.MinorType.INT.getType()),
287+
Field.nullable("value", Types.MinorType.VARCHAR.getType())));
288+
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
289+
IntVector iv = (IntVector) vsr.getVector(0);
290+
VarCharVector vv = (VarCharVector) vsr.getVector(1);
291+
292+
try (AdbcStatement stmt =
293+
conn.bulkIngest(
294+
"foobar",
295+
BulkIngestMode.REPLACE,
296+
IngestOption.NOT_TEMPORARY,
297+
IngestOption.targetNamespace(null, "secondary"))) {
298+
iv.setSafe(0, 1);
299+
iv.setSafe(1, 2);
300+
vv.setNull(0);
301+
vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
302+
vsr.setRowCount(2);
303+
304+
stmt.bind(vsr);
305+
assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
306+
}
307+
}
308+
309+
try (AdbcStatement stmt = conn.createStatement()) {
310+
stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
311+
try (var result = stmt.executeQuery()) {
312+
var values = ArrowToJava.toStrings(result.getReader(), "value");
313+
assertThat(values).containsExactly(null, "foobar");
314+
}
315+
}
316+
}
317+
318+
@Test
319+
void bulkIngestTemporary() throws Exception {
320+
final Schema schema =
321+
new Schema(
322+
List.of(
323+
Field.nullable("ndx", Types.MinorType.INT.getType()),
324+
Field.nullable("value", Types.MinorType.VARCHAR.getType())));
325+
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
326+
IntVector iv = (IntVector) vsr.getVector(0);
327+
VarCharVector vv = (VarCharVector) vsr.getVector(1);
328+
329+
try (AdbcStatement stmt =
330+
conn.bulkIngest("foobar", BulkIngestMode.CREATE, IngestOption.TEMPORARY)) {
331+
iv.setSafe(0, 1);
332+
iv.setSafe(1, 2);
333+
vv.setNull(0);
334+
vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
335+
vsr.setRowCount(2);
336+
337+
stmt.bind(vsr);
338+
assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
339+
}
340+
}
341+
342+
try (AdbcStatement stmt = conn.createStatement()) {
343+
stmt.setSqlQuery("SELECT value FROM foobar ORDER BY ndx");
344+
try (var result = stmt.executeQuery()) {
345+
var values = ArrowToJava.toStrings(result.getReader(), "value");
346+
assertThat(values).containsExactly(null, "foobar");
347+
}
348+
}
349+
}
350+
275351
@Test
276352
void currentCatalogSchema() throws Exception {
277353
runSetup(

java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.arrow.adbc.core.AdbcStatement;
3838
import org.apache.arrow.adbc.core.AdbcStatusCode;
3939
import org.apache.arrow.adbc.core.BulkIngestMode;
40+
import org.apache.arrow.adbc.core.IngestOption;
4041
import org.apache.arrow.adbc.core.TypedKey;
4142
import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
4243
import org.apache.arrow.memory.BufferAllocator;
@@ -274,6 +275,81 @@ void bulkIngest() throws Exception {
274275
}
275276
}
276277

278+
@Test
279+
void bulkIngestTarget() throws Exception {
280+
runSetup(
281+
"DROP TABLE IF EXISTS secondary.foobar",
282+
"DROP SCHEMA IF EXISTS secondary",
283+
"CREATE SCHEMA secondary");
284+
285+
final Schema schema =
286+
new Schema(
287+
List.of(
288+
Field.nullable("ndx", Types.MinorType.INT.getType()),
289+
Field.nullable("value", Types.MinorType.VARCHAR.getType())));
290+
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
291+
IntVector iv = (IntVector) vsr.getVector(0);
292+
VarCharVector vv = (VarCharVector) vsr.getVector(1);
293+
294+
try (AdbcStatement stmt =
295+
conn.bulkIngest(
296+
"foobar",
297+
BulkIngestMode.REPLACE,
298+
IngestOption.NOT_TEMPORARY,
299+
IngestOption.targetNamespace("master", "secondary"))) {
300+
iv.setSafe(0, 1);
301+
iv.setSafe(1, 2);
302+
vv.setNull(0);
303+
vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
304+
vsr.setRowCount(2);
305+
306+
stmt.bind(vsr);
307+
assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
308+
}
309+
}
310+
311+
try (AdbcStatement stmt = conn.createStatement()) {
312+
stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
313+
try (var result = stmt.executeQuery()) {
314+
var values = ArrowToJava.toStrings(result.getReader(), "value");
315+
assertThat(values).containsExactly(null, "foobar");
316+
}
317+
}
318+
}
319+
320+
@Test
321+
void bulkIngestTemporary() throws Exception {
322+
final Schema schema =
323+
new Schema(
324+
List.of(
325+
Field.nullable("ndx", Types.MinorType.INT.getType()),
326+
Field.nullable("value", Types.MinorType.VARCHAR.getType())));
327+
try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
328+
IntVector iv = (IntVector) vsr.getVector(0);
329+
VarCharVector vv = (VarCharVector) vsr.getVector(1);
330+
331+
try (AdbcStatement stmt =
332+
conn.bulkIngest("foobar", BulkIngestMode.CREATE, IngestOption.TEMPORARY)) {
333+
iv.setSafe(0, 1);
334+
iv.setSafe(1, 2);
335+
vv.setNull(0);
336+
vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
337+
vsr.setRowCount(2);
338+
339+
stmt.bind(vsr);
340+
assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
341+
}
342+
}
343+
344+
try (AdbcStatement stmt = conn.createStatement()) {
345+
stmt.setSqlQuery("SELECT value FROM #foobar ORDER BY ndx");
346+
try (var result = stmt.executeQuery()) {
347+
var values = ArrowToJava.toStrings(result.getReader(), "value");
348+
assertThat(values).containsExactly(null, "foobar");
349+
}
350+
}
351+
}
352+
277353
@Test
278354
void currentCatalogSchema() throws Exception {
279355
runSetup(

java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.arrow.adbc.core.AdbcException;
2222
import org.apache.arrow.adbc.core.AdbcStatement;
2323
import org.apache.arrow.adbc.core.BulkIngestMode;
24+
import org.apache.arrow.adbc.core.IngestOption;
2425
import org.apache.arrow.adbc.core.IsolationLevel;
2526
import org.apache.arrow.adbc.core.TypedKey;
2627
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
@@ -53,6 +54,17 @@ public void cancel() throws AdbcException {
5354
@Override
5455
public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
5556
throws AdbcException {
57+
return bulkIngestImpl(targetTableName, mode);
58+
}
59+
60+
@Override
61+
public AdbcStatement bulkIngest(
62+
String targetTableName, BulkIngestMode mode, IngestOption... options) throws AdbcException {
63+
return bulkIngestImpl(targetTableName, mode, options);
64+
}
65+
66+
AdbcStatement bulkIngestImpl(String targetTableName, BulkIngestMode mode, IngestOption... options)
67+
throws AdbcException {
5668
NativeStatementHandle stmtHandle = JniLoader.INSTANCE.openStatement(handle);
5769
try {
5870
String modeValue;
@@ -77,6 +89,24 @@ public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
7789
stmtHandle, "adbc.ingest.target_table", targetTableName);
7890
JniLoader.INSTANCE.statementSetOptionString(stmtHandle, "adbc.ingest.mode", modeValue);
7991

92+
for (var option : options) {
93+
if (option instanceof IngestOption.TemporaryIngestOption) {
94+
var o = (IngestOption.TemporaryIngestOption) option;
95+
JniLoader.INSTANCE.statementSetOptionString(
96+
stmtHandle, "adbc.ingest.temporary", Boolean.toString(o.isTemporary()));
97+
} else if (option instanceof IngestOption.TargetNamespaceIngestOption) {
98+
var o = (IngestOption.TargetNamespaceIngestOption) option;
99+
if (o.getTargetCatalog() != null) {
100+
JniLoader.INSTANCE.statementSetOptionString(
101+
stmtHandle, "adbc.ingest.target_catalog", o.getTargetCatalog());
102+
}
103+
if (o.getTargetDbSchema() != null) {
104+
JniLoader.INSTANCE.statementSetOptionString(
105+
stmtHandle, "adbc.ingest.target_db_schema", o.getTargetDbSchema());
106+
}
107+
}
108+
}
109+
80110
return new JniStatement(allocator, stmtHandle);
81111
} catch (Exception e) {
82112
stmtHandle.close();

0 commit comments

Comments
 (0)