From 640fe1771d91f7855c95837acf2ed1f84eaaf86c Mon Sep 17 00:00:00 2001 From: goprean Date: Tue, 23 Dec 2025 17:23:14 +0200 Subject: [PATCH] feat: read the query results from S3 FIR-51568 --- build.gradle | 9 +- .../firebolt/jdbc/client/FireboltClient.java | 2 + .../query/AbstractQueryParameterProvider.java | 11 ++ ...FireboltCloudV2QueryParameterProvider.java | 4 +- .../client/query/QueryParameterProvider.java | 1 + .../client/query/StatementClientImpl.java | 24 ++- .../query/response/DefaultResponseReader.java | 18 ++ .../client/query/response/ResponseReader.java | 19 +++ .../response/ResponseReaderProvider.java | 37 ++++ .../client/query/response/s3/AwsInfo.java | 16 ++ .../query/response/s3/FireboltS3Client.java | 109 ++++++++++++ .../query/response/s3/FireboltS3Response.java | 57 +++++++ .../s3/HeaderSkippingInputStream.java | 88 ++++++++++ .../response/s3/OneAheadS3FileDownloader.java | 160 ++++++++++++++++++ .../response/s3/ParallelS3FileDownloader.java | 135 +++++++++++++++ .../query/response/s3/S3DownloaderType.java | 32 ++++ .../query/response/s3/S3FileDownloader.java | 16 ++ .../response/s3/S3FileDownloaderProvider.java | 16 ++ .../query/response/s3/S3ResponseReader.java | 97 +++++++++++ .../s3/SequentialS3FileDownloader.java | 48 ++++++ .../settings/FireboltProperties.java | 14 ++ .../settings/FireboltQueryParameterKey.java | 5 +- .../settings/FireboltSessionProperty.java | 11 +- .../service/FireboltStatementService.java | 18 +- ...boltCloudV2QueryParameterProviderTest.java | 21 +++ .../settings/FireboltPropertiesTest.java | 2 + 26 files changed, 951 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/DefaultResponseReader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/ResponseReader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/ResponseReaderProvider.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/AwsInfo.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Client.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Response.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/HeaderSkippingInputStream.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/OneAheadS3FileDownloader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/ParallelS3FileDownloader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/S3DownloaderType.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloaderProvider.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/S3ResponseReader.java create mode 100644 src/main/java/com/firebolt/jdbc/client/query/response/s3/SequentialS3FileDownloader.java diff --git a/build.gradle b/build.gradle index 53e282acc0..e179ff4c63 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { id 'jacoco' id "org.sonarqube" version "7.2.2.6593" id 'maven-publish' - id 'com.github.johnrengelman.shadow' version '8.0.0' + id 'com.gradleup.shadow' version '8.3.9' id 'signing' id 'io.qameta.allure' version '3.0.1' apply false id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' @@ -25,6 +25,8 @@ project.ext { lombokVersion = '1.18.42' okHttpVersion = '4.12.0' slf4jVersion = '2.0.17' + jacksonVersion = '2.19.2' + awsSdkVersion = '2.29.47' } java { @@ -71,6 +73,11 @@ dependencies { implementation 'org.apache.commons:commons-text:1.15.0' implementation 'org.lz4:lz4-java:1.8.0' implementation 'commons-validator:commons-validator:1.10.1' + implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" + implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" + implementation platform("software.amazon.awssdk:bom:${awsSdkVersion}") + implementation 'software.amazon.awssdk:s3' implementation fileTree(dir: 'libs', includes: ['*.jar']) diff --git a/src/main/java/com/firebolt/jdbc/client/FireboltClient.java b/src/main/java/com/firebolt/jdbc/client/FireboltClient.java index 062284fd04..c9efe091c4 100644 --- a/src/main/java/com/firebolt/jdbc/client/FireboltClient.java +++ b/src/main/java/com/firebolt/jdbc/client/FireboltClient.java @@ -57,6 +57,8 @@ public abstract class FireboltClient implements CacheListener { private static final String HEADER_USER_AGENT = "User-Agent"; private static final String HEADER_PROTOCOL_VERSION = "Firebolt-Protocol-Version"; private static final String HEADER_CONTENT_ENCODING = "Content-Encoding"; + private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding"; + private static final Pattern plainErrorPattern = Pattern.compile("Line (\\d+), Column (\\d+): (.*)$", Pattern.MULTILINE); private final OkHttpClient httpClient; private String headerUserAgentValue; diff --git a/src/main/java/com/firebolt/jdbc/client/query/AbstractQueryParameterProvider.java b/src/main/java/com/firebolt/jdbc/client/query/AbstractQueryParameterProvider.java index 5f05de4840..64053354d4 100644 --- a/src/main/java/com/firebolt/jdbc/client/query/AbstractQueryParameterProvider.java +++ b/src/main/java/com/firebolt/jdbc/client/query/AbstractQueryParameterProvider.java @@ -3,6 +3,7 @@ import com.firebolt.jdbc.connection.settings.FireboltProperties; import com.firebolt.jdbc.connection.settings.FireboltQueryParameterKey; import com.firebolt.jdbc.statement.StatementInfoWrapper; +import com.firebolt.jdbc.statement.StatementType; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -64,4 +65,14 @@ protected void addTransactionSequenceIdIfNeeded(Map params, Strin } } + /** + * When s3 query location is used, by default we enable the advanced mode as well + */ + protected void addS3QueryResultLocationIfNeeded(Map params, String s3queryResultLocation, StatementType statementType) { + if (StringUtils.isNotBlank(s3queryResultLocation) && statementType == StatementType.QUERY) { + params.put(FireboltQueryParameterKey.QUERY_RESULT_UPLOAD_LOCATION.getKey(), s3queryResultLocation); + params.put(FireboltQueryParameterKey.ADVANCED_MODE.getKey(), "true"); + } + } + } diff --git a/src/main/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProvider.java b/src/main/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProvider.java index 85896ca811..9e819c53c0 100644 --- a/src/main/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProvider.java +++ b/src/main/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProvider.java @@ -18,7 +18,8 @@ public Map getQueryParams(FireboltProperties fireboltProperties, Map params = new HashMap<>(fireboltProperties.getAdditionalProperties()); if (statementInfoWrapper.getType() == StatementType.QUERY) { - params.put(OUTPUT_FORMAT.getKey(), TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT); + String outputFormat = fireboltProperties.getQueryResultLocation() == null ? TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT : QueryParameterProvider.S3_TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT; + params.put(OUTPUT_FORMAT.getKey(), outputFormat); } addQueryParameterIfNeeded(params, statementInfoWrapper.getPreparedStatementParameters()); @@ -34,6 +35,7 @@ public Map getQueryParams(FireboltProperties fireboltProperties, addQueryLabel(params, fireboltProperties, statementInfoWrapper); addCompress(params, fireboltProperties.isCompress()); addQueryTimeoutIfNeeded(params, queryTimeout); + addS3QueryResultLocationIfNeeded(params, fireboltProperties.getQueryResultLocation(), statementInfoWrapper.getType()); } return params; diff --git a/src/main/java/com/firebolt/jdbc/client/query/QueryParameterProvider.java b/src/main/java/com/firebolt/jdbc/client/query/QueryParameterProvider.java index 31badb88a3..3db117dc24 100644 --- a/src/main/java/com/firebolt/jdbc/client/query/QueryParameterProvider.java +++ b/src/main/java/com/firebolt/jdbc/client/query/QueryParameterProvider.java @@ -11,6 +11,7 @@ public interface QueryParameterProvider { String TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT = "TabSeparatedWithNamesAndTypes"; + String S3_TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT = "S3_TabSeparatedWithNamesAndTypes"; /** * Returns a map of parameters that will be added to the url and sent to the firebolt backend diff --git a/src/main/java/com/firebolt/jdbc/client/query/StatementClientImpl.java b/src/main/java/com/firebolt/jdbc/client/query/StatementClientImpl.java index b6b3e58c0e..16bb185365 100644 --- a/src/main/java/com/firebolt/jdbc/client/query/StatementClientImpl.java +++ b/src/main/java/com/firebolt/jdbc/client/query/StatementClientImpl.java @@ -3,6 +3,8 @@ import com.firebolt.jdbc.FireboltBackendType; import com.firebolt.jdbc.client.CompressionType; import com.firebolt.jdbc.client.FireboltClient; +import com.firebolt.jdbc.client.query.response.ResponseReader; +import com.firebolt.jdbc.client.query.response.ResponseReaderProvider; import com.firebolt.jdbc.connection.FireboltConnection; import com.firebolt.jdbc.connection.settings.FireboltProperties; import com.firebolt.jdbc.connection.settings.FireboltQueryParameterKey; @@ -45,7 +47,7 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; import static java.util.Optional.ofNullable; -import static okhttp3.MultipartBody.*; +import static okhttp3.MultipartBody.Part; import static okhttp3.RequestBody.create; @CustomLog @@ -120,8 +122,15 @@ static QueryIdFetcher getQueryFetcher(int infraVersion) { } + private ResponseReaderProvider responseReaderProvider; + public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connection, String customDrivers, String customClients) { + this(httpClient, connection, customDrivers, customClients, ResponseReaderProvider.getInstance()); + } + + public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connection, String customDrivers, String customClients, ResponseReaderProvider responseReaderProvider) { super(httpClient, connection, customDrivers, customClients); + this.responseReaderProvider = responseReaderProvider; } /** @@ -136,8 +145,9 @@ public StatementClientImpl(OkHttpClient httpClient, FireboltConnection connectio @Override public InputStream executeSqlStatement(@NonNull StatementInfoWrapper statementInfoWrapper, @NonNull FireboltProperties connectionProperties, int queryTimeout, boolean isServerAsync) throws SQLException { - return executeSqlStatementInternal(statementInfoWrapper, connectionProperties, queryTimeout, isServerAsync, - (label, formattedStatement, uri) -> executeSqlStatementWithRetryOnUnauthorized(label, connectionProperties, formattedStatement, uri)); + ResponseReader responseReader = responseReaderProvider.getResponseReader(connectionProperties, statementInfoWrapper.getType()); + StatementExecutor statementExecutor = (label, formattedStatement, uri) -> executeSqlStatementWithRetryOnUnauthorized(label, connectionProperties, formattedStatement, uri, responseReader); + return executeSqlStatementInternal(statementInfoWrapper, connectionProperties, queryTimeout, isServerAsync, statementExecutor); } /** @@ -238,10 +248,10 @@ private InputStream executeSqlStatementInternal(@NonNull StatementInfoWrapper st } } - private InputStream executeSqlStatementWithRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri) + private InputStream executeSqlStatementWithRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, ResponseReader responseReader) throws SQLException, IOException { return executeWithRetryOnUnauthorized(label, uri, "statement", - () -> postSqlStatement(connectionProperties, formattedStatement, uri, label)); + () -> postSqlStatement(connectionProperties, formattedStatement, uri, label, responseReader)); } private InputStream executeSqlStatementWithFilesRetryOnUnauthorized(String label, @NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, Map files) @@ -265,12 +275,12 @@ private InputStream executeWithRetryOnUnauthorized(String label, String uri, Str } } - private InputStream postSqlStatement(@NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, String label) + private InputStream postSqlStatement(@NonNull FireboltProperties connectionProperties, String formattedStatement, String uri, String label, ResponseReader responseReader) throws SQLException, IOException { CompressionType compressionType = getRequestBodyCompressionType(connectionProperties); Request post = createPostRequest(uri, label, formattedStatement, getConnection().getAccessToken().orElse(null), compressionType); Response response = execute(post, connectionProperties.getHost(), connectionProperties.isCompress()); - InputStream is = ofNullable(response.body()).map(ResponseBody::byteStream).orElse(null); + InputStream is = responseReader.read(response); if (is == null) { CloseableUtil.close(response); } diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/DefaultResponseReader.java b/src/main/java/com/firebolt/jdbc/client/query/response/DefaultResponseReader.java new file mode 100644 index 0000000000..c92a7fe0b2 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/DefaultResponseReader.java @@ -0,0 +1,18 @@ +package com.firebolt.jdbc.client.query.response; + +import java.io.InputStream; +import okhttp3.Response; +import okhttp3.ResponseBody; + +import static java.util.Optional.ofNullable; + +/** + * The default response reader that will just convert the body into a byte stream + */ +class DefaultResponseReader implements ResponseReader { + + @Override + public InputStream read(Response response) { + return ofNullable(response.body()).map(ResponseBody::byteStream).orElse(null); + } +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReader.java b/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReader.java new file mode 100644 index 0000000000..aae1bec8db --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReader.java @@ -0,0 +1,19 @@ +package com.firebolt.jdbc.client.query.response; + +import java.io.InputStream; +import okhttp3.Response; + +/** + * This repsonse reader knows how to transform the response from the http call to firebolt into an input stream + */ +@FunctionalInterface +public interface ResponseReader { + + /** + * converts the http response into an input stream + * @param response + * @return + */ + InputStream read(Response response); + +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReaderProvider.java b/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReaderProvider.java new file mode 100644 index 0000000000..c1ba770bc5 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/ResponseReaderProvider.java @@ -0,0 +1,37 @@ +package com.firebolt.jdbc.client.query.response; + +import com.firebolt.jdbc.client.query.response.s3.AwsInfo; +import com.firebolt.jdbc.client.query.response.s3.S3DownloaderType; +import com.firebolt.jdbc.client.query.response.s3.S3ResponseReader; +import com.firebolt.jdbc.connection.settings.FireboltProperties; +import com.firebolt.jdbc.statement.StatementType; + +public final class ResponseReaderProvider { + private ResponseReaderProvider() { + } + + private static class InstanceHolder { + private static final ResponseReaderProvider INSTANCE = new ResponseReaderProvider(); + } + + public static ResponseReaderProvider getInstance() { + return InstanceHolder.INSTANCE; + } + + public ResponseReader getResponseReader(FireboltProperties fireboltProperties, StatementType statementType) { + + // only use the s3 reader in case the statement is a query + if (fireboltProperties.getQueryResultLocation() != null && statementType == StatementType.QUERY) { + AwsInfo awsInfo = AwsInfo.builder() + .region(fireboltProperties.getAwsRegion()) + .keyId(fireboltProperties.getAwsAccessKeyId()) + .keySecret(fireboltProperties.getAwsSecretAccessKey()) + .sessionToken(fireboltProperties.getAwsSessionToken()) + .build(); + boolean isResponseCompressed = fireboltProperties.isCompress(); + return new S3ResponseReader(awsInfo, isResponseCompressed, S3DownloaderType.fromValue(fireboltProperties.getFileDownloaderType())); + } + + return new DefaultResponseReader(); + } +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/AwsInfo.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/AwsInfo.java new file mode 100644 index 0000000000..20b6b9f509 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/AwsInfo.java @@ -0,0 +1,16 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +@Getter +@Builder +@Data +public class AwsInfo { + + private String region; + private String keyId; + private String keySecret; + private String sessionToken; +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Client.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Client.java new file mode 100644 index 0000000000..462dbd599a --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Client.java @@ -0,0 +1,109 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import java.io.InputStream; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +/** + * Thin wrapper around AWS SDK v2 S3 client to fetch objects given an S3 path. + * Supports paths in the form s3://bucket/key. + */ +@Slf4j +public final class FireboltS3Client implements AutoCloseable { + + private static final Pattern S3_URI = Pattern.compile("^s3://([^/]+)/(.+)$"); + private static volatile FireboltS3Client INSTANCE; + + private final S3Client s3; + + private FireboltS3Client(AwsInfo awsInfo) { + Region region = StringUtils.isEmpty(awsInfo.getRegion()) ? new DefaultAwsRegionProviderChain().getRegion() : Region.of(awsInfo.getRegion()); + log.info("Using AWS region {}", region); + + S3ClientBuilder builder = S3Client.builder() + .credentialsProvider(getCredentialsProvider(awsInfo)); + + if (region != null) { + builder = builder.region(region); + } + + this.s3 = builder.build(); + } + + private AwsCredentialsProvider getCredentialsProvider(AwsInfo awsInfo) { + if (StringUtils.isAnyEmpty(awsInfo.getKeyId(), awsInfo.getKeySecret())) { + return DefaultCredentialsProvider.create(); + } + + AwsCredentials awsCredentials = StringUtils.isEmpty(awsInfo.getSessionToken()) + ? AwsBasicCredentials.create(awsInfo.getKeyId(), awsInfo.getKeySecret()) : AwsSessionCredentials.create(awsInfo.getKeyId(), awsInfo.getKeySecret(), awsInfo.getSessionToken()); + return StaticCredentialsProvider.create(awsCredentials); + } + + /** + * Get a singleton instance. This avoids repeatedly creating SDK clients. + */ + public static FireboltS3Client getInstance(AwsInfo awsInfo) { + if (INSTANCE == null) { + synchronized (FireboltS3Client.class) { + if (INSTANCE == null) { + INSTANCE = new FireboltS3Client(awsInfo); + } + } + } + return INSTANCE; + } + + /** + * Read an object from S3 and return its stream. + * The caller is responsible for closing the returned {@link InputStream}. + * + * @param s3Path path like s3://bucket/key + * @return input stream to the object contents + */ + public InputStream readObject(@NonNull String s3Path) { + Objects.requireNonNull(s3Path, "s3Path must not be null"); + Matcher matcher = S3_URI.matcher(s3Path); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unsupported S3 path format: " + s3Path); + } + String bucket = matcher.group(1); + String key = matcher.group(2); + + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + + log.debug("Fetching S3 object. bucket={}, key={}", bucket, key); + ResponseInputStream responseStream = s3.getObject(request); + return responseStream; + } + + @Override + public void close() { + try { + this.s3.close(); + } catch (Exception e) { + log.warn("Error closing S3 client", e); + } + } +} + + diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Response.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Response.java new file mode 100644 index 0000000000..3c814e1150 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/FireboltS3Response.java @@ -0,0 +1,57 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Comparator; +import java.util.List; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * This would be the response from firebolt that represents the information of the s3 bucket(s) where the actual results are stored + */ +@Getter +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown=true) +public class FireboltS3Response { + + @JsonProperty("total_bytes") + private long totalBytes; + + @JsonProperty("total_files") + private int totalFiles; + + @JsonProperty("total_rows") + private long totalRows; + + @JsonProperty("chunks") + private List chunks; + + /** + * The information about a particular data file in s3 + */ + @Getter + @NoArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class S3DataChunks { + + public static final Comparator CHUNK_ID_COMPARATOR = Comparator.comparing(s3DataChunks -> s3DataChunks.getChunkId()); + + @JsonProperty("bytes") + private long bytes; + + @JsonProperty("chunk_id") + private int chunkId; + + @JsonProperty("compressed") + private boolean compressed; + + @JsonProperty("rows") + private long rows; + + @JsonProperty("url") + private String url; + + } + +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/HeaderSkippingInputStream.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/HeaderSkippingInputStream.java new file mode 100644 index 0000000000..07f839e686 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/HeaderSkippingInputStream.java @@ -0,0 +1,88 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + +/** + * InputStream that discards the first N newline-terminated lines from the underlying stream, + * handling both LF and CRLF by detecting '\n' as the line terminator. + * Skipping is performed lazily on first read and uses a PushbackInputStream to preserve + * any bytes read beyond the final skipped newline. + */ +public final class HeaderSkippingInputStream extends InputStream { + private static final int DEFAULT_PUSHBACK_BUFFER = 8192; + private final PushbackInputStream in; + private final int linesToSkip; + private boolean skipped = false; + + public HeaderSkippingInputStream(InputStream delegate, int linesToSkip) { + this(delegate, linesToSkip, DEFAULT_PUSHBACK_BUFFER); + } + + public HeaderSkippingInputStream(InputStream delegate, int linesToSkip, int pushbackBufferSize) { + this.in = new PushbackInputStream(delegate, pushbackBufferSize); + this.linesToSkip = linesToSkip; + } + + private void ensureSkipped() throws IOException { + if (skipped) { + return; + } + int remaining = linesToSkip; + byte[] buf = new byte[DEFAULT_PUSHBACK_BUFFER]; + while (remaining > 0) { + int n = in.read(buf); + if (n == -1) { + break; + } + int lastNewlinePos = -1; + for (int i = 0; i < n && remaining > 0; i++) { + if (buf[i] == (byte) '\n') { + remaining--; + lastNewlinePos = i; + } + } + if (remaining <= 0) { + int remainderStart = lastNewlinePos + 1; + int remainderLen = n - remainderStart; + if (remainderLen > 0) { + in.unread(buf, remainderStart, remainderLen); + } + break; + } + } + skipped = true; + } + + @Override + public int read() throws IOException { + ensureSkipped(); + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureSkipped(); + return in.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + ensureSkipped(); + return in.skip(n); + } + + @Override + public int available() throws IOException { + ensureSkipped(); + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } +} + + diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/OneAheadS3FileDownloader.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/OneAheadS3FileDownloader.java new file mode 100644 index 0000000000..a45d4793e8 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/OneAheadS3FileDownloader.java @@ -0,0 +1,160 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import com.firebolt.jdbc.resultset.compress.LZ4InputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Downloader that prefetches exactly one chunk ahead. + * While chunk N is being read, chunk N+1 is downloaded in the background to a temp file. + * Subsequent chunks (index > 0) will have the first two header lines skipped. + */ +final class OneAheadS3FileDownloader implements S3FileDownloader { + + @Override + public InputStream openCombinedInputStream(List chunks, AwsInfo awsInfo) { + if (chunks == null || chunks.isEmpty()) { + return null; + } + + final int total = chunks.size(); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + Enumeration enumeration = new Enumeration<>() { + private int index = 0; + private Future nextFuture = null; // future for chunk at (index) + + @Override + public boolean hasMoreElements() { + return index < total; + } + + @Override + public InputStream nextElement() { + int current = index++; + try { + // If current is 0, we haven't prefetched anything yet. + // Schedule prefetch of chunk 1 (if exists), then return network stream for chunk 0. + if (current == 0) { + if (total > 1) { + nextFuture = executor.submit(downloadToTempFile(chunks.get(1).getUrl(), awsInfo)); + } else { + // No prefetch needed; we can shut down immediately as there will be no tasks. + executor.shutdown(); + } + InputStream in = FireboltS3Client.getInstance(awsInfo).readObject(chunks.get(0).getUrl()); + if (chunks.get(0).isCompressed()) { + in = new LZ4InputStream(in); + } + return in; + } + + // For current > 0, the future should correspond to the current chunk file. + if (nextFuture == null) { + // This should not happen unless there was only one chunk total + // In that case, return a direct stream (defensive) + FireboltS3Response.S3DataChunks thisChunk = chunks.get(current); + InputStream fallback = FireboltS3Client.getInstance(awsInfo).readObject(thisChunk.getUrl()); + if (thisChunk.isCompressed()) { + fallback = new LZ4InputStream(fallback); + } + InputStream in = new HeaderSkippingInputStream(fallback, 2); + // Shut down if we reached the end + if (current == total - 1) { + executor.shutdown(); + } else { + nextFuture = executor.submit(downloadToTempFile(chunks.get(current + 1).getUrl(), awsInfo)); + } + return in; + } + + Path filePath = nextFuture.get(); + File file = filePath.toFile(); + + // Schedule prefetch for the following chunk before returning the current one + if (current < total - 1) { + nextFuture = executor.submit(downloadToTempFile(chunks.get(current + 1).getUrl(), awsInfo)); + } else { + // No further prefetch; we can shut down the executor + executor.shutdown(); + nextFuture = null; + } + + InputStream in = new FileInputStream(file); + if (chunks.get(current).isCompressed()) { + in = new LZ4InputStream(in); + } + // Skip headers for all chunks except the first + in = new HeaderSkippingInputStream(in, 2); + return new DeletingFileInputStream(in, file.toPath()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for S3 chunk prefetch", e); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to prefetch S3 chunk", e.getCause()); + } catch (IOException e) { + throw new RuntimeException("Failed to open prefetched S3 chunk", e); + } + } + }; + + return new SequenceInputStream(enumeration); + } + + private Callable downloadToTempFile(String s3Url, AwsInfo awsInfo) { + return () -> { + Path tempFile = Files.createTempFile("firebolt-s3-chunk-", ".bin"); + try (InputStream in = FireboltS3Client.getInstance(awsInfo).readObject(s3Url)) { + Files.copy(in, tempFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + return tempFile; + } catch (IOException e) { + try { + Files.deleteIfExists(tempFile); + } catch (IOException ignore) { } + throw e; + } + }; + } + + private static final class DeletingFileInputStream extends FilterInputStream { + private final Path filePath; + DeletingFileInputStream(InputStream delegate, Path filePath) { + super(delegate); + this.filePath = filePath; + } + @Override + public void close() throws IOException { + IOException first = null; + try { + super.close(); + } catch (IOException e) { + first = e; + } + try { + Files.deleteIfExists(filePath); + } catch (IOException e) { + if (first == null) { + first = e; + } + } + if (first != null) { + throw first; + } + } + } +} + + diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/ParallelS3FileDownloader.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/ParallelS3FileDownloader.java new file mode 100644 index 0000000000..9e3ef2b8af --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/ParallelS3FileDownloader.java @@ -0,0 +1,135 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import com.firebolt.jdbc.resultset.compress.LZ4InputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +final class ParallelS3FileDownloader implements S3FileDownloader { + + @Override + public InputStream openCombinedInputStream(List chunks, AwsInfo awsInfo) { + if (chunks == null || chunks.isEmpty()) { + return null; + } + + ExecutorService executor = Executors.newFixedThreadPool(determinePoolSize(chunks.size())); + List> futureFiles = new ArrayList<>(chunks.size()); + for (FireboltS3Response.S3DataChunks chunk : chunks) { + futureFiles.add(executor.submit(downloadToTempFile(chunk.getUrl(), awsInfo))); + } + executor.shutdown(); + + Iterator> futureIterator = futureFiles.iterator(); + AtomicInteger chunkIndex = new AtomicInteger(0); + Iterator chunkIter = chunks.iterator(); + Enumeration streamEnumeration = new Enumeration<>() { + @Override + public boolean hasMoreElements() { + return futureIterator.hasNext(); + } + + @Override + public InputStream nextElement() { + FireboltS3Response.S3DataChunks chunk = chunkIter.next(); + Future future = futureIterator.next(); + Path filePath; + int currentIndex = chunkIndex.getAndIncrement(); + try { + filePath = future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for S3 chunk download", e); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to download S3 chunk", e.getCause()); + } + File file = filePath.toFile(); + try { + InputStream in = new FileInputStream(file); + // Decompress if compressed (including first chunk) + if (chunk.isCompressed()) { + in = new LZ4InputStream(in); + } + // For all chunks except the first (index 0), skip the first two header lines + if (currentIndex > 0) { + in = new HeaderSkippingInputStream(in, 2); + } + return new DeletingFileInputStream(in, file.toPath()); + } catch (IOException e) { + try { + Files.deleteIfExists(file.toPath()); + } catch (IOException ignore) { } + throw new RuntimeException("Failed to open downloaded S3 chunk for reading", e); + } + } + }; + + return new SequenceInputStream(streamEnumeration); + } + + private int determinePoolSize(int numTasks) { + int cores = Runtime.getRuntime().availableProcessors(); + int recommended = Math.min(Math.max(4, cores * 2), 16); + return Math.min(recommended, Math.max(1, numTasks)); + } + + private Callable downloadToTempFile(String s3Url, AwsInfo awsInfo) { + return () -> { + Path tempFile = Files.createTempFile("firebolt-s3-chunk-", ".bin"); + try (InputStream in = FireboltS3Client.getInstance(awsInfo).readObject(s3Url)) { + Files.copy(in, tempFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + return tempFile; + } catch (IOException e) { + try { + Files.deleteIfExists(tempFile); + } catch (IOException ignore) { } + throw e; + } + }; + } + + private static final class DeletingFileInputStream extends FilterInputStream { + private final Path filePath; + DeletingFileInputStream(InputStream delegate, Path filePath) { + super(delegate); + this.filePath = filePath; + } + @Override + public void close() throws IOException { + IOException first = null; + try { + super.close(); + } catch (IOException e) { + first = e; + } + try { + Files.deleteIfExists(filePath); + } catch (IOException e) { + if (first == null) { + first = e; + } + } + if (first != null) { + throw first; + } + } + } + +} + + diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3DownloaderType.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3DownloaderType.java new file mode 100644 index 0000000000..e5f71c9981 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3DownloaderType.java @@ -0,0 +1,32 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import java.util.Arrays; + +public enum S3DownloaderType { + /** + * Download files from s3 one chunk after another as they get read by the input stream + */ + SEQUENTIAL("sequential"), + + /** + * Download all the chunks to local temp files in parallel and then stream them from local disk when asked for it + */ + PARALLEL("parallel"), + + /** + * Download the current chunk and one ahead + */ + ONE_AHEAD("one_ahead"); + + private String type; + + S3DownloaderType(String type) { + this.type = type; + } + + public static final S3DownloaderType fromValue(String value) { + return Arrays.stream(S3DownloaderType.values()) + .filter(val -> val.type.equals(value.toLowerCase())) + .findFirst().orElseThrow(() -> new IllegalArgumentException("Cannot find s3 downloader type for value " + value)); + } +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloader.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloader.java new file mode 100644 index 0000000000..5ed8275b49 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloader.java @@ -0,0 +1,16 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import java.io.InputStream; +import java.util.List; + +/** + * Abstraction that downloads a list of S3 object paths and exposes them as a single InputStream + * that yields bytes in the same order as the provided paths. + */ +public interface S3FileDownloader { + + InputStream openCombinedInputStream(List chunks, AwsInfo awsInfo); + +} + + diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloaderProvider.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloaderProvider.java new file mode 100644 index 0000000000..f57eab1a78 --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3FileDownloaderProvider.java @@ -0,0 +1,16 @@ +package com.firebolt.jdbc.client.query.response.s3; + +public class S3FileDownloaderProvider { + + public S3FileDownloader get(S3DownloaderType type) { + switch(type) { + case SEQUENTIAL: + return new SequentialS3FileDownloader(); + case PARALLEL: + return new ParallelS3FileDownloader(); + case ONE_AHEAD: + default: + return new OneAheadS3FileDownloader(); + } + } +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3ResponseReader.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3ResponseReader.java new file mode 100644 index 0000000000..de6638e18d --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/S3ResponseReader.java @@ -0,0 +1,97 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.firebolt.jdbc.client.query.response.ResponseReader; +import com.firebolt.jdbc.resultset.compress.LZ4InputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import okhttp3.Response; +import okhttp3.ResponseBody; + +import static java.util.Optional.ofNullable; + +/** + * Read the response from s3 location. The initial response will contain the location of the s3 bucket from where to read the actual query response + */ +@Slf4j +public class S3ResponseReader implements ResponseReader { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private AwsInfo awsInfo; + private boolean responseCompressed; + private S3DownloaderType type; + private final S3FileDownloaderProvider s3FileDownloaderProvider; + + public S3ResponseReader(AwsInfo awsInfo, boolean responseCompressed, S3DownloaderType type) { + this(awsInfo, responseCompressed, type, new S3FileDownloaderProvider()); + } + +// @VisibleForTesting + S3ResponseReader(AwsInfo awsInfo, boolean responseCompressed, S3DownloaderType type, S3FileDownloaderProvider s3FileDownloaderProvider) { + this.awsInfo = awsInfo; + this.responseCompressed = responseCompressed; + this.type = type; + this.s3FileDownloaderProvider = s3FileDownloaderProvider; + } + + /** + * Reads the response from firebolt. In case compression is used, the data will be decompressed using lz4 algo. + * + * It then parses the result to find out the location of the actual result data and make calls to get the results from that location + * @param response + * @return + */ + @Override + public InputStream read(Response response) { + Optional fireboltS3ResponseOptional = + ofNullable(response.body()) + .map(ResponseBody::byteStream) + .map(bodyStream -> decompressResponseIfNeeded(bodyStream)) + .map(bodyStream -> { + try { + return OBJECT_MAPPER.readValue(bodyStream, FireboltS3Response.class); + } catch (IOException e) { + throw new RuntimeException("Failed to parse Firebolt S3 response", e); + } + }); + + if (fireboltS3ResponseOptional.isEmpty()) { + log.warn("Empty response received when it should receive the location for the s3 file"); + return null; + } + + FireboltS3Response fireboltS3Response = fireboltS3ResponseOptional.get(); + + if (fireboltS3Response.getChunks() == null || fireboltS3Response.getChunks().isEmpty()) { + log.warn("S3 response did not contain any data files"); + return null; + } + + // order by chunk_id and extract valid chunks (non-null url) + List chunks = fireboltS3Response.getChunks().stream() + .sorted(FireboltS3Response.S3DataChunks.CHUNK_ID_COMPARATOR) + .filter(c -> c.getUrl() != null && !c.getUrl().isEmpty()) + .collect(Collectors.toList()); + + if (chunks.isEmpty()) { + log.warn("S3 response did not contain any valid data file paths"); + return null; + } + + return s3FileDownloaderProvider.get(type).openCombinedInputStream(chunks, awsInfo); + } + + private InputStream decompressResponseIfNeeded(InputStream inputStream) { + if (!responseCompressed) { + return inputStream; + } + + log.debug("Decompressing the response using lz4 algorithm"); + return new LZ4InputStream(inputStream); + } +} diff --git a/src/main/java/com/firebolt/jdbc/client/query/response/s3/SequentialS3FileDownloader.java b/src/main/java/com/firebolt/jdbc/client/query/response/s3/SequentialS3FileDownloader.java new file mode 100644 index 0000000000..178298edbd --- /dev/null +++ b/src/main/java/com/firebolt/jdbc/client/query/response/s3/SequentialS3FileDownloader.java @@ -0,0 +1,48 @@ +package com.firebolt.jdbc.client.query.response.s3; + +import com.firebolt.jdbc.resultset.compress.LZ4InputStream; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; + +final class SequentialS3FileDownloader implements S3FileDownloader { + + @Override + public InputStream openCombinedInputStream(List chunks, AwsInfo awsInfo) { + if (chunks == null || chunks.isEmpty()) { + return null; + } + + Iterator it = chunks.iterator(); + Enumeration enumeration = new Enumeration<>() { + private int index = 0; + @Override + public boolean hasMoreElements() { + return it.hasNext(); + } + @Override + public InputStream nextElement() { + FireboltS3Response.S3DataChunks chunk = it.next(); + InputStream in = FireboltS3Client.getInstance(awsInfo).readObject(chunk.getUrl()); + if (chunk.isCompressed()) { + in = new LZ4InputStream(in); + } + + // For chunks after the first then skip headers + if (index > 0) { + in = new HeaderSkippingInputStream(in, 2); + + } + index++; + return in; + } + }; + + return new SequenceInputStream(enumeration); + } + +} + + diff --git a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltProperties.java b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltProperties.java index 95d5d95aea..da70fbd682 100644 --- a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltProperties.java +++ b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltProperties.java @@ -82,6 +82,13 @@ public class FireboltProperties { private String transactionId; private String transactionSequenceId; + private String queryResultLocation; + private String awsRegion; + private String awsAccessKeyId; + private String awsSecretAccessKey; + private String awsSessionToken; + private String fileDownloaderType; + // firebolt core url private final String url; @@ -126,6 +133,13 @@ public FireboltProperties(Properties properties) { connectionCachingEnabled = getSetting(properties, FireboltSessionProperty.CACHE_CONNECTION); preparedStatementParamStyle = getSetting(properties, FireboltSessionProperty.PREPARED_STATEMENT_PARAM_STYLE); + queryResultLocation = getSetting(properties, FireboltSessionProperty.QUERY_RESULT_LOCATION); + fileDownloaderType = getSetting(properties, FireboltSessionProperty.RESULT_FILE_DOWNLOADER_TYPE); + awsRegion = getSetting(properties, FireboltSessionProperty.AWS_REGION); + awsAccessKeyId = getSetting(properties, FireboltSessionProperty.AWS_ACCESS_KEY_ID); + awsSecretAccessKey = getSetting(properties, FireboltSessionProperty.AWS_SECRET_ACCESS_KEY); + awsSessionToken = getSetting(properties, FireboltSessionProperty.AWS_SESSION_TOKEN); + environment = getEnvironment(configuredEnvironment, properties); host = getHost(configuredEnvironment, properties); port = getPort(properties, ssl); diff --git a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltQueryParameterKey.java b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltQueryParameterKey.java index 5bc42668f2..e3fd73731d 100644 --- a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltQueryParameterKey.java +++ b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltQueryParameterKey.java @@ -18,8 +18,9 @@ public enum FireboltQueryParameterKey { QUERY_PARAMETERS("query_parameters"), MAX_EXECUTION_TIME("max_execution_time"), TRANSACTION_ID("transaction_id"), - TRANSACTION_SEQUENCE_ID("transaction_sequence_id") - ; + TRANSACTION_SEQUENCE_ID("transaction_sequence_id"), + QUERY_RESULT_UPLOAD_LOCATION("query_result_upload_location"), + ADVANCED_MODE("advanced_mode"); private final String key; } \ No newline at end of file diff --git a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltSessionProperty.java b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltSessionProperty.java index c6cf785c9d..376cb4c1a6 100644 --- a/src/main/java/com/firebolt/jdbc/connection/settings/FireboltSessionProperty.java +++ b/src/main/java/com/firebolt/jdbc/connection/settings/FireboltSessionProperty.java @@ -78,7 +78,6 @@ public enum FireboltSessionProperty { * validate already by the connection that was cached. */ CACHE_CONNECTION("cache_connection", true, Boolean.class, "Available only for Firebolt 2.0 connections. If true, the connection will be cached for 1 hour.", FireboltProperties::isConnectionCachingEnabled), - URL("url", null, String.class, "Firebolt Core deployment url. It needs to include protocol: http or https, host or IP address and port number. E.g: http://localhost:3473", FireboltProperties::getUrl), PREPARED_STATEMENT_PARAM_STYLE("prepared_statement_param_style", "native", String.class, @@ -87,6 +86,16 @@ public enum FireboltSessionProperty { "transaction id", FireboltProperties::getTransactionId), TRANSACTION_SEQUENCE_ID("transaction_sequence_id", null, String.class, "transaction sequence id", FireboltProperties::getTransactionSequenceId), + /** + * Location from where to read the results from s3 + */ + QUERY_RESULT_LOCATION("query_result_location", null, String.class, "The location to read the query results from. This should be define first in firebolt. ", FireboltProperties::getQueryResultLocation), + RESULT_FILE_DOWNLOADER_TYPE("result_file_downloader_type", "one_ahead", String.class, "The type of downloader to be used to download the results from location ", FireboltProperties::getFileDownloaderType), + AWS_REGION("aws_region", null, String.class, "The aws region, in case the query result location is using an s3 bucket.", FireboltProperties::getAwsRegion), + AWS_ACCESS_KEY_ID("aws_access_key_id", null, String.class, "The aws key id that has read access to the s3 location", FireboltProperties::getAwsAccessKeyId), + AWS_SECRET_ACCESS_KEY("aws_secret_access_key", null, String.class, "The aws secret that will be used in combination with aws key id", FireboltProperties::getAwsSecretAccessKey), + AWS_SESSION_TOKEN("aws_session_token", null, String.class, "The aws session token that will be used in combination with aws key id/key secret", FireboltProperties::getAwsSessionToken), + // We keep all the deprecated properties to ensure backward compatibility - but // they do not have any effect. @Deprecated diff --git a/src/main/java/com/firebolt/jdbc/service/FireboltStatementService.java b/src/main/java/com/firebolt/jdbc/service/FireboltStatementService.java index 56a196b6a9..1f45565c72 100644 --- a/src/main/java/com/firebolt/jdbc/service/FireboltStatementService.java +++ b/src/main/java/com/firebolt/jdbc/service/FireboltStatementService.java @@ -11,12 +11,6 @@ import com.firebolt.jdbc.statement.rawstatement.QueryRawStatement; import com.firebolt.jdbc.util.CloseableUtil; import com.firebolt.jdbc.util.InputStreamUtil; -import lombok.CustomLog; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import org.json.JSONException; -import org.json.JSONObject; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -25,6 +19,12 @@ import java.sql.SQLException; import java.util.Map; import java.util.Optional; +import lombok.CustomLog; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONException; +import org.json.JSONObject; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Optional.ofNullable; @@ -165,10 +165,14 @@ public boolean isStatementRunning(String statementLabel) { private FireboltResultSet createResultSet(InputStream inputStream, QueryRawStatement initialQuery, FireboltProperties properties, FireboltStatement statement) throws SQLException { + + // in case the query location compression is used, then the decompression will happen upstream, so pass it as false + boolean isCompressed = StringUtils.isEmpty(properties.getQueryResultLocation()) ? properties.isCompress() : false; + return new FireboltResultSet(inputStream, ofNullable(initialQuery.getTable()).orElse(UNKNOWN_TABLE_NAME), ofNullable(initialQuery.getDatabase()).orElse(properties.getDatabase()), - properties.getBufferSize(), properties.isCompress(), + properties.getBufferSize(), isCompressed, statement, properties.isLogResultSet()); } } diff --git a/src/test/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProviderTest.java b/src/test/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProviderTest.java index 3a15bf95dd..f94b1e822d 100644 --- a/src/test/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProviderTest.java +++ b/src/test/java/com/firebolt/jdbc/client/query/FireboltCloudV2QueryParameterProviderTest.java @@ -91,4 +91,25 @@ void shouldNotAddEngineAndAccountIdCompressOrQueryTimeoutForSystemEngine() { assertEquals(FireboltCloudV2QueryParameterProvider.TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT, queryParams.get(FireboltQueryParameterKey.OUTPUT_FORMAT.getKey())); assertEquals("value1", queryParams.get("key1")); } + + @Test + void shouldAddS3QueryResultLocationIfNeeded() { + when(mockFireboltProperties.getQueryResultLocation()).thenReturn("location_set_in_firebolt"); + mockIsCompressInProperties(false); + mockDatabaseInProperties(null); + mockPreparedStatements(null); + mockStatementType(StatementType.QUERY); + mockStatementWrapperLabel(STATEMENT_WRAPPER_LABEL); + + Map queryParams = fireboltCloudV2QueryParameterProvider.getQueryParams(mockFireboltProperties, mockStatementInfoWrapper, NO_QUERY_TIMEOUT, IS_NOT_SERVER_ASYNC); + + assertEquals(6, queryParams.size()); + assertEquals("location_set_in_firebolt", queryParams.get(FireboltQueryParameterKey.QUERY_RESULT_UPLOAD_LOCATION.getKey())); + // advanced mode is needed for the query result location + assertEquals("true", queryParams.get(FireboltQueryParameterKey.ADVANCED_MODE.getKey())); + assertEquals(STATEMENT_WRAPPER_LABEL, queryParams.get(FireboltQueryParameterKey.QUERY_LABEL.getKey())); + assertEquals("0", queryParams.get(FireboltQueryParameterKey.COMPRESS.getKey())); + assertEquals(FireboltCloudV2QueryParameterProvider.S3_TAB_SEPARATED_WITH_NAMES_AND_TYPES_FORMAT, queryParams.get(FireboltQueryParameterKey.OUTPUT_FORMAT.getKey())); + assertEquals("value1", queryParams.get("key1")); + } } diff --git a/src/test/java/com/firebolt/jdbc/connection/settings/FireboltPropertiesTest.java b/src/test/java/com/firebolt/jdbc/connection/settings/FireboltPropertiesTest.java index 096b5ae7f2..ad1b4b5059 100644 --- a/src/test/java/com/firebolt/jdbc/connection/settings/FireboltPropertiesTest.java +++ b/src/test/java/com/firebolt/jdbc/connection/settings/FireboltPropertiesTest.java @@ -26,6 +26,7 @@ void shouldHaveDefaultPropertiesWhenOnlyTheRequiredFieldsAreSpecified() { .socketTimeoutMillis(0).connectionTimeoutMillis(60000).tcpKeepInterval(30).environment("app").tcpKeepIdle(60) .tcpKeepCount(10).connectionCachingEnabled(true).preparedStatementParamStyle("native") .compressRequestPayload(false) + .fileDownloaderType("one_ahead") .build(); Properties properties = new Properties(); @@ -64,6 +65,7 @@ void shouldHaveAllTheSpecifiedCustomProperties() { .tcpKeepInterval(30).tcpKeepIdle(60).tcpKeepCount(10).environment("app").validateOnSystemEngine(true) .mergePreparedStatementBatches(true).connectionCachingEnabled(true).preparedStatementParamStyle("fb_numeric") .compressRequestPayload(false) + .fileDownloaderType("one_ahead") .build(); assertEquals(expectedDefaultProperties, new FireboltProperties(properties)); }