Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java/driver/flight-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>caffeine</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${dep.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CancelFlightInfoRequest;
import org.apache.arrow.flight.CancelFlightInfoResult;
import org.apache.arrow.flight.CloseSessionRequest;
import org.apache.arrow.flight.CloseSessionResult;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.GetSessionOptionsRequest;
import org.apache.arrow.flight.GetSessionOptionsResult;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.CancelResult;
import org.apache.arrow.flight.sql.FlightSqlClient;
Expand Down Expand Up @@ -287,6 +293,20 @@ public FlightEndpoint renewFlightEndpoint(
return client.renewFlightEndpoint(request, combine(options));
}

public SetSessionOptionsResult setSessionOptions(
SetSessionOptionsRequest request, CallOption... options) {
return client.setSessionOptions(request, combine(options));
}

public GetSessionOptionsResult getSessionOptions(
GetSessionOptionsRequest request, CallOption... options) {
return client.getSessionOptions(request, combine(options));
}

public CloseSessionResult closeSession(CloseSessionRequest request, CallOption... options) {
return client.closeSession(request, combine(options));
}

@Override
public void close() throws Exception {
AutoCloseables.close(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,22 @@
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CloseSessionRequest;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.GetSessionOptionsRequest;
import org.apache.arrow.flight.HeaderCallOption;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.flight.SessionOptionValueFactory;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter;
import org.apache.arrow.flight.client.ClientCookieMiddleware;
Expand Down Expand Up @@ -203,8 +212,115 @@ public void setAutoCommit(boolean enableAutoCommit) throws AdbcException {
}
}

@Override
public <T> T getOption(TypedKey<T> key) throws AdbcException {
final String k = key.getKey();

if (k.equals(FlightSqlConnectionProperties.SESSION_OPTIONS)) {
if (key.getType() != String.class) {
return AdbcConnection.super.getOption(key);
}
return key.cast(FlightSqlSessionUtil.toJson(fetchSessionOptionsOrEmpty()));
}

final String prefix;
if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_BOOL_PREFIX)) {
prefix = FlightSqlConnectionProperties.SESSION_OPTION_BOOL_PREFIX;
} else if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_STRING_LIST_PREFIX)) {
prefix = FlightSqlConnectionProperties.SESSION_OPTION_STRING_LIST_PREFIX;
} else if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_PREFIX)) {
prefix = FlightSqlConnectionProperties.SESSION_OPTION_PREFIX;
} else {
return AdbcConnection.super.getOption(key);
}

final String name = k.substring(prefix.length());
if (name.isEmpty()) {
throw AdbcException.invalidArgument("[Flight SQL] Session option name must not be empty");
}
final Object raw =
FlightSqlSessionUtil.require(fetchSessionOptions(), name)
.acceptVisitor(FlightSqlSessionUtil.TO_JAVA);
if (raw == null) {
throw new AdbcException(
"[Flight SQL] Session option not found: " + name,
null,
AdbcStatusCode.NOT_FOUND,
null,
0);
}
final T result = FlightSqlSessionUtil.cast(key, raw, name);
return result != null ? result : AdbcConnection.super.getOption(key);
}

@Override
public <T> void setOption(TypedKey<T> key, T value) throws AdbcException {
final String k = key.getKey();

if (value == null) {
throw AdbcException.invalidArgument(
"[Flight SQL] null value not allowed for key: "
+ k
+ " — use adbc.flight.sql.session.optionerase.<name> to erase an option");
}

if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_ERASE_PREFIX)) {
final String name =
k.substring(FlightSqlConnectionProperties.SESSION_OPTION_ERASE_PREFIX.length());
doSetSessionOption(name, SessionOptionValueFactory.makeEmptySessionOptionValue());

} else if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_BOOL_PREFIX)) {
final String name =
k.substring(FlightSqlConnectionProperties.SESSION_OPTION_BOOL_PREFIX.length());
final boolean b;
if (value instanceof Boolean) {
b = (Boolean) value;
} else {
b = Boolean.parseBoolean(value.toString());
}
doSetSessionOption(name, SessionOptionValueFactory.makeSessionOptionValue(b));

} else if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_STRING_LIST_PREFIX)) {
final String name =
k.substring(FlightSqlConnectionProperties.SESSION_OPTION_STRING_LIST_PREFIX.length());
final String[] arr;
if (value instanceof String[]) {
arr = (String[]) value;
} else {
arr = FlightSqlSessionUtil.parseJsonArray(value.toString());
}
doSetSessionOption(name, SessionOptionValueFactory.makeSessionOptionValue(arr));

} else if (k.startsWith(FlightSqlConnectionProperties.SESSION_OPTION_PREFIX)) {
final String name = k.substring(FlightSqlConnectionProperties.SESSION_OPTION_PREFIX.length());
final SessionOptionValue sv;
if (value instanceof Long) {
sv = SessionOptionValueFactory.makeSessionOptionValue((Long) value);
} else if (value instanceof Double) {
sv = SessionOptionValueFactory.makeSessionOptionValue((Double) value);
} else {
sv = SessionOptionValueFactory.makeSessionOptionValue(value.toString());
}
doSetSessionOption(name, sv);

} else if (k.equals(FlightSqlConnectionProperties.SESSION_OPTIONS)) {
throw AdbcException.notImplemented(
"[Flight SQL] adbc.flight.sql.session.options is read-only");

} else {
AdbcConnection.super.setOption(key, value);
}
}

@Override
public void close() throws Exception {
try {
client.closeSession(new CloseSessionRequest());
} catch (FlightRuntimeException e) {
if (e.status().code() != FlightStatusCode.UNIMPLEMENTED) {
throw FlightSqlDriverUtil.fromFlightException(e);
}
}
clientCache.invalidateAll();
AutoCloseables.close(client, allocator);
}
Expand All @@ -214,6 +330,45 @@ public String toString() {
return "FlightSqlConnection{" + "client=" + client + '}';
}

private Map<String, SessionOptionValue> fetchSessionOptions() throws AdbcException {
try {
return client.getSessionOptions(new GetSessionOptionsRequest()).getSessionOptions();
} catch (FlightRuntimeException e) {
throw FlightSqlDriverUtil.fromFlightException(e);
}
}

private Map<String, SessionOptionValue> fetchSessionOptionsOrEmpty() throws AdbcException {
try {
return client.getSessionOptions(new GetSessionOptionsRequest()).getSessionOptions();
} catch (FlightRuntimeException e) {
if (e.status().code() == FlightStatusCode.UNIMPLEMENTED) {
return Collections.emptyMap();
}
throw FlightSqlDriverUtil.fromFlightException(e);
}
}

private void doSetSessionOption(String name, SessionOptionValue value) throws AdbcException {
if (name.isEmpty()) {
throw AdbcException.invalidArgument("[Flight SQL] Session option name must not be empty");
}
final SetSessionOptionsResult result;
try {
result =
client.setSessionOptions(
new SetSessionOptionsRequest(Collections.singletonMap(name, value)));
} catch (FlightRuntimeException e) {
throw FlightSqlDriverUtil.fromFlightException(e);
}
if (result.hasErrors()) {
final SetSessionOptionsResult.Error err = result.getErrors().get(name);
final String errType = (err != null) ? err.value.name() : "UNKNOWN";
throw AdbcException.invalidArgument(
"[Flight SQL] Failed to set session option '" + name + "': " + errType);
}
}

/**
* Initialize cached data to share between connections and create, test, and authenticate the
* first connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public interface FlightSqlConnectionProperties {
TypedKey<Boolean> WITH_COOKIE_MIDDLEWARE =
new TypedKey<>("adbc.flight.sql.rpc.with_cookie_middleware", Boolean.class);
String RPC_CALL_HEADER_PREFIX = "adbc.flight.sql.rpc.call_header.";

String SESSION_OPTIONS = "adbc.flight.sql.session.options";
String SESSION_OPTION_PREFIX = "adbc.flight.sql.session.option.";
String SESSION_OPTION_BOOL_PREFIX = "adbc.flight.sql.session.optionbool.";
String SESSION_OPTION_STRING_LIST_PREFIX = "adbc.flight.sql.session.optionstringlist.";
String SESSION_OPTION_ERASE_PREFIX = "adbc.flight.sql.session.optionerase.";
}
Loading