Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
id 'jacoco'
id "org.sonarqube" version "7.2.2.6593"
id 'maven-publish'
id 'com.github.johnrengelman.shadow' version '8.0.0'
id 'com.gradleup.shadow' version '8.3.9'
id 'signing'
id 'io.qameta.allure' version '3.0.1' apply false
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
Expand All @@ -25,6 +25,8 @@ project.ext {
lombokVersion = '1.18.42'
okHttpVersion = '4.12.0'
slf4jVersion = '2.0.17'
jacksonVersion = '2.19.2'
awsSdkVersion = '2.29.47'
}

java {
Expand Down Expand Up @@ -71,6 +73,11 @@ dependencies {
implementation 'org.apache.commons:commons-text:1.15.0'
implementation 'org.lz4:lz4-java:1.8.0'
implementation 'commons-validator:commons-validator:1.10.1'
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
implementation platform("software.amazon.awssdk:bom:${awsSdkVersion}")
implementation 'software.amazon.awssdk:s3'

implementation fileTree(dir: 'libs', includes: ['*.jar'])

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/firebolt/jdbc/client/FireboltClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public abstract class FireboltClient implements CacheListener {
private static final String HEADER_USER_AGENT = "User-Agent";
private static final String HEADER_PROTOCOL_VERSION = "Firebolt-Protocol-Version";
private static final String HEADER_CONTENT_ENCODING = "Content-Encoding";
private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";

private static final Pattern plainErrorPattern = Pattern.compile("Line (\\d+), Column (\\d+): (.*)$", Pattern.MULTILINE);
private final OkHttpClient httpClient;
private String headerUserAgentValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.firebolt.jdbc.connection.settings.FireboltProperties;
import com.firebolt.jdbc.connection.settings.FireboltQueryParameterKey;
import com.firebolt.jdbc.statement.StatementInfoWrapper;
import com.firebolt.jdbc.statement.StatementType;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -64,4 +65,14 @@ protected void addTransactionSequenceIdIfNeeded(Map<String,String> params, Strin
}
}

/**
* When s3 query location is used, by default we enable the advanced mode as well
*/
protected void addS3QueryResultLocationIfNeeded(Map<String,String> params, String s3queryResultLocation, StatementType statementType) {
if (StringUtils.isNotBlank(s3queryResultLocation) && statementType == StatementType.QUERY) {
params.put(FireboltQueryParameterKey.QUERY_RESULT_UPLOAD_LOCATION.getKey(), s3queryResultLocation);
params.put(FireboltQueryParameterKey.ADVANCED_MODE.getKey(), "true");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public Map<String, String> getQueryParams(FireboltProperties fireboltProperties,
Map<String, String> params = new HashMap<>(fireboltProperties.getAdditionalProperties());

if (statementInfoWrapper.getType() == StatementType.QUERY) {
params.put(OUTPUT_FORMAT.getKey(), TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT);
String outputFormat = fireboltProperties.getQueryResultLocation() == null ? TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT : QueryParameterProvider.S3_TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT;
params.put(OUTPUT_FORMAT.getKey(), outputFormat);
}

addQueryParameterIfNeeded(params, statementInfoWrapper.getPreparedStatementParameters());
Expand All @@ -34,6 +35,7 @@ public Map<String, String> getQueryParams(FireboltProperties fireboltProperties,
addQueryLabel(params, fireboltProperties, statementInfoWrapper);
addCompress(params, fireboltProperties.isCompress());
addQueryTimeoutIfNeeded(params, queryTimeout);
addS3QueryResultLocationIfNeeded(params, fireboltProperties.getQueryResultLocation(), statementInfoWrapper.getType());
}

return params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
public interface QueryParameterProvider {

String TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT = "TabSeparatedWithNamesAndTypes";
String S3_TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT = "S3_TabSeparatedWithNamesAndTypes";

/**
* Returns a map of parameters that will be added to the url and sent to the firebolt backend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.firebolt.jdbc.FireboltBackendType;
import com.firebolt.jdbc.client.CompressionType;
import com.firebolt.jdbc.client.FireboltClient;
import com.firebolt.jdbc.client.query.response.ResponseReader;
import com.firebolt.jdbc.client.query.response.ResponseReaderProvider;
import com.firebolt.jdbc.connection.FireboltConnection;
import com.firebolt.jdbc.connection.settings.FireboltProperties;
import com.firebolt.jdbc.connection.settings.FireboltQueryParameterKey;
Expand Down Expand Up @@ -45,7 +47,7 @@
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.util.Optional.ofNullable;
import static okhttp3.MultipartBody.*;
import static okhttp3.MultipartBody.Part;
import static okhttp3.RequestBody.create;

@CustomLog
Expand Down Expand Up @@ -120,8 +122,15 @@ static QueryIdFetcher getQueryFetcher(int infraVersion) {

}

private ResponseReaderProvider responseReaderProvider;

public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connection, String customDrivers, String customClients) {
this(httpClient, connection, customDrivers, customClients, ResponseReaderProvider.getInstance());
}

public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connection, String customDrivers, String customClients, ResponseReaderProvider responseReaderProvider) {
super(httpClient, connection, customDrivers, customClients);
this.responseReaderProvider = responseReaderProvider;
}

/**
Expand All @@ -136,8 +145,9 @@ public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connectio
@Override
public InputStream executeSqlStatement(@NonNull StatementInfoWrapper statementInfoWrapper,
@NonNull FireboltProperties connectionProperties, int queryTimeout, boolean isServerAsync) throws SQLException {
return executeSqlStatementInternal(statementInfoWrapper, connectionProperties, queryTimeout, isServerAsync,
(label, formattedStatement, uri) -> executeSqlStatementWithRetryOnUnauthorized(label, connectionProperties, formattedStatement, uri));
ResponseReader responseReader = responseReaderProvider.getResponseReader(connectionProperties, statementInfoWrapper.getType());
StatementExecutor statementExecutor = (label, formattedStatement, uri) -> executeSqlStatementWithRetryOnUnauthorized(label, connectionProperties, formattedStatement, uri, responseReader);
return executeSqlStatementInternal(statementInfoWrapper, connectionProperties, queryTimeout, isServerAsync, statementExecutor);
}

/**
Expand Down Expand Up @@ -238,10 +248,10 @@ private InputStream executeSqlStatementInternal(@NonNull StatementInfoWrapper st
}
}

private InputStream executeSqlStatementWithRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri)
private InputStream executeSqlStatementWithRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, ResponseReader responseReader)
throws SQLException, IOException {
return executeWithRetryOnUnauthorized(label, uri, "statement",
() -> postSqlStatement(connectionProperties, formattedStatement, uri, label));
() -> postSqlStatement(connectionProperties, formattedStatement, uri, label, responseReader));
}

private InputStream executeSqlStatementWithFilesRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, Map<String, byte[]> files)
Expand All @@ -265,12 +275,12 @@ private InputStream executeWithRetryOnUnauthorized(String label, String uri, Str
}
}

private InputStream postSqlStatement(@NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, String label)
private InputStream postSqlStatement(@NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, String label, ResponseReader responseReader)
throws SQLException, IOException {
CompressionType compressionType = getRequestBodyCompressionType(connectionProperties);
Request post = createPostRequest(uri, label, formattedStatement, getConnection().getAccessToken().orElse(null), compressionType);
Response response = execute(post, connectionProperties.getHost(), connectionProperties.isCompress());
InputStream is = ofNullable(response.body()).map(ResponseBody::byteStream).orElse(null);
InputStream is = responseReader.read(response);
if (is == null) {
CloseableUtil.close(response);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.firebolt.jdbc.client.query.response;

import java.io.InputStream;
import okhttp3.Response;
import okhttp3.ResponseBody;

import static java.util.Optional.ofNullable;

/**
* The default response reader that will just convert the body into a byte stream
*/
class DefaultResponseReader implements ResponseReader {

@Override
public InputStream read(Response response) {
return ofNullable(response.body()).map(ResponseBody::byteStream).orElse(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.firebolt.jdbc.client.query.response;

import java.io.InputStream;
import okhttp3.Response;

/**
* This repsonse reader knows how to transform the response from the http call to firebolt into an input stream
*/
@FunctionalInterface
public interface ResponseReader {

/**
* converts the http response into an input stream
* @param response
* @return
*/
InputStream read(Response response);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.firebolt.jdbc.client.query.response;

import com.firebolt.jdbc.client.query.response.s3.AwsInfo;
import com.firebolt.jdbc.client.query.response.s3.S3DownloaderType;
import com.firebolt.jdbc.client.query.response.s3.S3ResponseReader;
import com.firebolt.jdbc.connection.settings.FireboltProperties;
import com.firebolt.jdbc.statement.StatementType;

public final class ResponseReaderProvider {
private ResponseReaderProvider() {
}

private static class InstanceHolder {
private static final ResponseReaderProvider INSTANCE = new ResponseReaderProvider();
}

public static ResponseReaderProvider getInstance() {
return InstanceHolder.INSTANCE;
}

public ResponseReader getResponseReader(FireboltProperties fireboltProperties, StatementType statementType) {

// only use the s3 reader in case the statement is a query
if (fireboltProperties.getQueryResultLocation() != null && statementType == StatementType.QUERY) {
AwsInfo awsInfo = AwsInfo.builder()
.region(fireboltProperties.getAwsRegion())
.keyId(fireboltProperties.getAwsAccessKeyId())
.keySecret(fireboltProperties.getAwsSecretAccessKey())
.sessionToken(fireboltProperties.getAwsSessionToken())
.build();
boolean isResponseCompressed = fireboltProperties.isCompress();
return new S3ResponseReader(awsInfo, isResponseCompressed, S3DownloaderType.fromValue(fireboltProperties.getFileDownloaderType()));
}

return new DefaultResponseReader();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.firebolt.jdbc.client.query.response.s3;

import lombok.Builder;
import lombok.Data;
import lombok.Getter;

@Getter
@Builder
@Data
public class AwsInfo {

private String region;
private String keyId;
private String keySecret;
private String sessionToken;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.firebolt.jdbc.client.query.response.s3;

import java.io.InputStream;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

/**
* Thin wrapper around AWS SDK v2 S3 client to fetch objects given an S3 path.
* Supports paths in the form s3://bucket/key.
*/
@Slf4j
public final class FireboltS3Client implements AutoCloseable {

private static final Pattern S3_URI = Pattern.compile("^s3://([^/]+)/(.+)$");
private static volatile FireboltS3Client INSTANCE;

private final S3Client s3;

private FireboltS3Client(AwsInfo awsInfo) {
Region region = StringUtils.isEmpty(awsInfo.getRegion()) ? new DefaultAwsRegionProviderChain().getRegion() : Region.of(awsInfo.getRegion());
log.info("Using AWS region {}", region);

S3ClientBuilder builder = S3Client.builder()
.credentialsProvider(getCredentialsProvider(awsInfo));

if (region != null) {
builder = builder.region(region);
}

this.s3 = builder.build();
}

private AwsCredentialsProvider getCredentialsProvider(AwsInfo awsInfo) {
if (StringUtils.isAnyEmpty(awsInfo.getKeyId(), awsInfo.getKeySecret())) {
return DefaultCredentialsProvider.create();
}

AwsCredentials awsCredentials = StringUtils.isEmpty(awsInfo.getSessionToken())
? AwsBasicCredentials.create(awsInfo.getKeyId(), awsInfo.getKeySecret()) : AwsSessionCredentials.create(awsInfo.getKeyId(), awsInfo.getKeySecret(), awsInfo.getSessionToken());
return StaticCredentialsProvider.create(awsCredentials);
}

/**
* Get a singleton instance. This avoids repeatedly creating SDK clients.
*/
public static FireboltS3Client getInstance(AwsInfo awsInfo) {
if (INSTANCE == null) {
synchronized (FireboltS3Client.class) {
if (INSTANCE == null) {
INSTANCE = new FireboltS3Client(awsInfo);
}
}
}
return INSTANCE;
}

/**
* Read an object from S3 and return its stream.
* The caller is responsible for closing the returned {@link InputStream}.
*
* @param s3Path path like s3://bucket/key
* @return input stream to the object contents
*/
public InputStream readObject(@NonNull String s3Path) {
Objects.requireNonNull(s3Path, "s3Path must not be null");
Matcher matcher = S3_URI.matcher(s3Path);
if (!matcher.matches()) {
throw new IllegalArgumentException("Unsupported S3 path format: " + s3Path);
}
String bucket = matcher.group(1);
String key = matcher.group(2);

GetObjectRequest request = GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.build();

log.debug("Fetching S3 object. bucket={}, key={}", bucket, key);
ResponseInputStream<?> responseStream = s3.getObject(request);
return responseStream;
}

@Override
public void close() {
try {
this.s3.close();
} catch (Exception e) {
log.warn("Error closing S3 client", e);
}
}
}


Loading
Loading