Skip to content

Commit 760e101

Browse files
authored
Merge pull request #394 from youngsofun/feat/arrow
feat(jdbc): arrow format support decimal.
2 parents 76eb3e5 + 52a39c6 commit 760e101

7 files changed

Lines changed: 133 additions & 31 deletions

File tree

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ public class LoadStreamExample {
249249
}
250250
}
251251
}
252-
253252
```
254253

255254
### method `uploadStream` and `downloadStream`
@@ -267,3 +266,34 @@ Download a single file in the stage as `InputStream`
267266
```
268267
InputStream downloadStream(String stageName, String filePathInStage) throws SQLException;
269268
```
269+
270+
### Use Arrow Result Format
271+
272+
By default, the driver fetches query results in JSON format. To enable Arrow over HTTP, add
273+
`query_result_format=arrow` to the JDBC URL:
274+
275+
```java
276+
String url = "jdbc:databend://localhost:8000/default?query_result_format=arrow";
277+
Connection conn = DriverManager.getConnection(url, "root", "");
278+
```
279+
280+
Arrow mode is intended for query result fetching. Internal control queries still use JSON when needed.
281+
282+
Requirements:
283+
284+
1. Databend server must support Arrow result pages.
285+
2. The JVM must allow Arrow to access `java.nio` internals.
286+
287+
Before starting your application, set:
288+
289+
```shell
290+
export JAVA_TOOL_OPTIONS='--add-opens=java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true'
291+
```
292+
293+
If you do not want to set `JAVA_TOOL_OPTIONS` globally, pass the same options directly to `java`:
294+
295+
```shell
296+
java --add-opens=java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true -jar your-app.jar
297+
```
298+
299+
If `query_result_format` is not specified, the driver uses JSON.

databend-jdbc/src/main/java/com/databend/jdbc/internal/query/QueryRequest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22

33
import com.databend.jdbc.internal.session.SessionState;
44
import com.databend.jdbc.internal.session.PaginationOptions;
5+
import com.fasterxml.jackson.annotation.JsonInclude;
56
import com.fasterxml.jackson.annotation.JsonCreator;
67
import com.fasterxml.jackson.annotation.JsonProperty;
78
import com.fasterxml.jackson.databind.ObjectMapper;
89

10+
@JsonInclude(JsonInclude.Include.NON_NULL)
911
public class QueryRequest {
1012
private final String sql;
1113
private final String sessionId;
1214
private final PaginationOptions paginationOptions;
1315
private final SessionState session;
1416
private final StageAttachment stageAttachment;
1517
private final Integer arrowResultVersionMax;
18+
private final ArrowFeatures arrowFeatures;
1619

1720
@JsonCreator
1821
public QueryRequest(
@@ -21,13 +24,15 @@ public QueryRequest(
2124
@JsonProperty("pagination") PaginationOptions paginationOptions,
2225
@JsonProperty("session") SessionState session,
2326
@JsonProperty("stage_attachment") StageAttachment stageAttachment,
24-
@JsonProperty("arrow_result_version_max") Integer arrowResultVersionMax) {
27+
@JsonProperty("arrow_result_version_max") Integer arrowResultVersionMax,
28+
@JsonProperty("arrow_features") ArrowFeatures arrowFeatures) {
2529
this.sql = sql;
2630
this.sessionId = sessionId;
2731
this.paginationOptions = paginationOptions;
2832
this.session = session;
2933
this.stageAttachment = stageAttachment;
3034
this.arrowResultVersionMax = arrowResultVersionMax;
35+
this.arrowFeatures = arrowFeatures;
3136
}
3237

3338
public static Builder builder() {
@@ -64,6 +69,11 @@ public Integer getArrowResultVersionMax() {
6469
return arrowResultVersionMax;
6570
}
6671

72+
@JsonProperty("arrow_features")
73+
public ArrowFeatures getArrowFeatures() {
74+
return arrowFeatures;
75+
}
76+
6777
@Override
6878
public String toString() {
6979
try {
@@ -80,6 +90,7 @@ public static class Builder {
8090
private SessionState session;
8191
private StageAttachment stageAttachment;
8292
private Integer arrowResultVersionMax;
93+
private ArrowFeatures arrowFeatures;
8394

8495
public Builder setSql(String sql) {
8596
this.sql = sql;
@@ -111,8 +122,27 @@ public Builder setArrowResultVersionMax(Integer arrowResultVersionMax) {
111122
return this;
112123
}
113124

125+
public Builder setArrowFeatures(ArrowFeatures arrowFeatures) {
126+
this.arrowFeatures = arrowFeatures;
127+
return this;
128+
}
129+
114130
public QueryRequest build() {
115-
return new QueryRequest(sql, sessionId, paginationOptions, session, stageAttachment, arrowResultVersionMax);
131+
return new QueryRequest(sql, sessionId, paginationOptions, session, stageAttachment, arrowResultVersionMax, arrowFeatures);
132+
}
133+
}
134+
135+
public static class ArrowFeatures {
136+
private final Boolean decimal64;
137+
138+
@JsonCreator
139+
public ArrowFeatures(@JsonProperty("decimal64") Boolean decimal64) {
140+
this.decimal64 = decimal64;
141+
}
142+
143+
@JsonProperty("decimal64")
144+
public Boolean getDecimal64() {
145+
return decimal64;
116146
}
117147
}
118148
}

databend-jdbc/src/main/java/com/databend/jdbc/internal/query/RestQueryResultPages.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
@ThreadSafe
4242
public class RestQueryResultPages implements QueryResultPages {
43+
private static final int ARROW_FEATURE_NEGOTIATION_VERSION = 3;
4344
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
4445
public static final MediaType MEDIA_TYPE_ARROW = MediaType.parse("application/vnd.apache.arrow.stream");
4546
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
@@ -104,7 +105,8 @@ private Request buildQueryRequest(String query, QueryRequestConfig requestConfig
104105
.setStageAttachment(requestConfig.getStageAttachment())
105106
.setPaginationOptions(requestConfig.getPaginationOptions())
106107
.setSql(query)
107-
.setArrowResultVersionMax(currentFormat == QueryResultFormat.ARROW ? 2 : null)
108+
.setArrowResultVersionMax(currentFormat == QueryResultFormat.ARROW ? ARROW_FEATURE_NEGOTIATION_VERSION : null)
109+
.setArrowFeatures(currentFormat == QueryResultFormat.ARROW ? new QueryRequest.ArrowFeatures(false) : null)
108110
.build();
109111
String reqString = req.toString();
110112
if (reqString == null || reqString.isEmpty()) {

databend-jdbc/src/main/java/com/databend/jdbc/internal/session/DatabendSessionHandle.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class DatabendSessionHandle implements Consumer<SessionState> {
7070
private static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
7171
private static final Semver STREAMING_LOAD_MIN_VERSION = new Semver("1.2.781");
7272
private static final Semver HEARTBEAT_MIN_VERSION = new Semver("1.2.709");
73+
private static final int MIN_ARROW_RESULT_VERSION = 3;
7374
private static final int MAX_STAGE_UPLOAD_RETRY_ATTEMPTS = 20;
7475
private static final Duration STAGE_UPLOAD_RETRY_TIMEOUT = Duration.ofMinutes(5);
7576

@@ -424,10 +425,7 @@ private QueryRequestConfig.Builder makeRequestConfig(String queryId, String host
424425
}
425426

426427
private boolean supportsArrowTransport() {
427-
if (this.serverMaxArrowResultVersion != null) {
428-
return this.serverMaxArrowResultVersion > 0;
429-
}
430-
return this.serverVersion != null && new Capability(this.serverVersion).arrowData();
428+
return this.serverMaxArrowResultVersion != null && this.serverMaxArrowResultVersion >= MIN_ARROW_RESULT_VERSION;
431429
}
432430

433431
private void logout() throws SQLException {

databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendDatabaseMetaData.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.vdurmont.semver4j.Semver;
44
import org.testng.Assert;
5-
import org.testng.SkipException;
65
import org.testng.annotations.BeforeTest;
76
import org.testng.annotations.Test;
87

@@ -261,9 +260,6 @@ public void testGetColumnTypeWithDecimal() throws Exception {
261260

262261
@Test(groups = {"IT"})
263262
public void testGetObjectWithDecimal() throws Exception {
264-
if ("arrow".equalsIgnoreCase(System.getenv("DATABEND_JDBC_TEST_QUERY_RESULT_FORMAT"))) {
265-
throw new SkipException("TODO: re-enable after Arrow Decimal64/Decimal128 compatibility is fixed");
266-
}
267263
try (Connection connection = Utils.createConnection()) {
268264
connection.createStatement().execute("insert into decimal_test values(1.2)");
269265
ResultSet rs = connection.createStatement().executeQuery("select * from decimal_test");
@@ -275,9 +271,6 @@ public void testGetObjectWithDecimal() throws Exception {
275271

276272
@Test(groups = {"IT"})
277273
public void testGetBigDecimal() throws Exception {
278-
if ("arrow".equalsIgnoreCase(System.getenv("DATABEND_JDBC_TEST_QUERY_RESULT_FORMAT"))) {
279-
throw new SkipException("TODO: re-enable after Arrow Decimal64/Decimal128 compatibility is fixed");
280-
}
281274
String bigDecimalStr = "123.456789012345";
282275
String scaleBigDecimalStr = "123.46";
283276
String columnLabel = "a";

databend-jdbc/src/test/java/com/databend/jdbc/TestTypes.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import org.locationtech.jts.io.ParseException;
66
import org.locationtech.jts.io.WKBReader;
77
import org.testng.Assert;
8-
import org.testng.SkipException;
98
import org.testng.annotations.BeforeSuite;
109
import org.testng.annotations.BeforeTest;
1110
import org.testng.annotations.DataProvider;
@@ -37,11 +36,6 @@ public Object[][] provideFlag() {
3736
@Test(groups = {"IT"})
3837
public void testGetDecimalByQueryResultFormat()
3938
throws SQLException {
40-
String queryResultFormat = currentQueryResultFormat();
41-
if ("arrow".equals(queryResultFormat)) {
42-
throw new SkipException("TODO: re-enable after Arrow Decimal64/Decimal128 compatibility is fixed");
43-
}
44-
4539
String sql = "select cast(123.456789012345 as decimal(15, 12)) as a";
4640
try (Connection connection = Utils.createConnection();
4741
Statement statement = connection.createStatement()) {
@@ -533,12 +527,4 @@ public void TestBatchInsertWithComplexDataType(boolean presigned, boolean place
533527
}
534528
}
535529

536-
private static String currentQueryResultFormat() {
537-
String queryResultFormat = System.getenv("DATABEND_JDBC_TEST_QUERY_RESULT_FORMAT");
538-
if (queryResultFormat == null || queryResultFormat.trim().isEmpty()) {
539-
return "json";
540-
}
541-
return queryResultFormat.trim().toLowerCase();
542-
}
543-
544530
}

databend-jdbc/src/test/java/com/databend/jdbc/internal/session/TestDatabendSessionHandle.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testLoginEnablesArrowTransportWhenServerAdvertisesArrowResultVersion
8888
HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
8989
server.createContext("/v1/session/login", exchange -> {
9090
try {
91-
byte[] response = "{\"version\":\"1.2.100\",\"server_max_arrow_result_version\":2}".getBytes(StandardCharsets.UTF_8);
91+
byte[] response = "{\"version\":\"1.2.100\",\"server_max_arrow_result_version\":3}".getBytes(StandardCharsets.UTF_8);
9292
exchange.getResponseHeaders().add("Content-Type", "application/json");
9393
exchange.sendResponseHeaders(200, response.length);
9494
exchange.getResponseBody().write(response);
@@ -134,7 +134,70 @@ public void testLoginEnablesArrowTransportWhenServerAdvertisesArrowResultVersion
134134
pages.close();
135135

136136
Assert.assertEquals(accept.get(), "application/vnd.apache.arrow.stream");
137-
Assert.assertTrue(requestBody.get().contains("\"arrow_result_version_max\":2"));
137+
Assert.assertTrue(requestBody.get().contains("\"arrow_result_version_max\":3"));
138+
Assert.assertTrue(requestBody.get().contains("\"arrow_features\":{\"decimal64\":false}"));
139+
}
140+
finally {
141+
server.stop(0);
142+
}
143+
}
144+
145+
@Test(groups = {"UNIT"})
146+
public void testLoginFallsBackToJsonWhenServerArrowResultVersionIsTooLow() throws Exception {
147+
AtomicReference<String> accept = new AtomicReference<>();
148+
AtomicReference<String> requestBody = new AtomicReference<>();
149+
150+
HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
151+
server.createContext("/v1/session/login", exchange -> {
152+
try {
153+
byte[] response = "{\"version\":\"1.2.100\",\"server_max_arrow_result_version\":2}".getBytes(StandardCharsets.UTF_8);
154+
exchange.getResponseHeaders().add("Content-Type", "application/json");
155+
exchange.sendResponseHeaders(200, response.length);
156+
exchange.getResponseBody().write(response);
157+
}
158+
finally {
159+
exchange.close();
160+
}
161+
});
162+
server.createContext("/v1/query", exchange -> {
163+
try {
164+
accept.set(exchange.getRequestHeaders().getFirst("Accept"));
165+
requestBody.set(new String(readAllBytes(exchange), StandardCharsets.UTF_8));
166+
byte[] response = "{\"id\":\"qid\",\"node_id\":\"node\",\"session\":{\"database\":\"default\"},\"schema\":[],\"data\":[]}".getBytes(StandardCharsets.UTF_8);
167+
exchange.getResponseHeaders().add("Content-Type", "application/json");
168+
exchange.sendResponseHeaders(200, response.length);
169+
exchange.getResponseBody().write(response);
170+
}
171+
finally {
172+
exchange.close();
173+
}
174+
});
175+
server.start();
176+
177+
try {
178+
DatabendSessionHandle handle = new DatabendSessionHandle(
179+
new OkHttpClient.Builder()
180+
.addInterceptor(userAgentInterceptor(DriverInfo.USER_AGENT_VALUE))
181+
.build(),
182+
SessionHandleConfig.builder()
183+
.setBaseUri(URI.create("http://127.0.0.1:" + server.getAddress().getPort()))
184+
.setInitialSession(SessionState.createDefault())
185+
.setQueryResultFormat(QueryResultFormat.ARROW)
186+
.setQueryTimeoutSecs(30)
187+
.setConnectionTimeoutSecs(30)
188+
.setSocketTimeoutSecs(60)
189+
.setWaitTimeSecs(10)
190+
.setMaxRowsInBuffer(1000)
191+
.setMaxRowsPerPage(1000)
192+
.build(),
193+
null);
194+
handle.login();
195+
QueryResultPages pages = handle.startQuery("qid", "select 1", null, null);
196+
pages.close();
197+
198+
Assert.assertEquals(accept.get(), "application/json");
199+
Assert.assertFalse(requestBody.get().contains("\"arrow_result_version_max\""));
200+
Assert.assertFalse(requestBody.get().contains("\"arrow_features\""));
138201
}
139202
finally {
140203
server.stop(0);

0 commit comments

Comments
 (0)