Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.salesforce.datacloud.jdbc.logging.ElapsedLogger.logTimedValue;
import static com.salesforce.datacloud.jdbc.util.ArrowUtils.toColumnMetaData;

import com.google.protobuf.Empty;
import com.salesforce.datacloud.jdbc.core.partial.DataCloudQueryPolling;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
Expand All @@ -17,8 +18,9 @@
import com.salesforce.datacloud.jdbc.protocol.QueryResultArrowStream;
import com.salesforce.datacloud.jdbc.protocol.QuerySchemaAccessor;
import com.salesforce.datacloud.jdbc.protocol.RowRangeIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.AsyncStreamObserverIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.SyncIteratorAdapter;
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
import com.salesforce.datacloud.jdbc.util.Deadline;
import com.salesforce.datacloud.jdbc.util.JdbcURL;
import com.salesforce.datacloud.jdbc.util.ThrowingJdbcSupplier;
Expand Down Expand Up @@ -347,8 +349,9 @@ public void cancelQuery(String queryId) throws SQLException {
try {
val client = QueryAccessGrpcClient.of(queryId, getStub());
val message = "cancel queryId=" + queryId;
val iterator = new BufferingStreamIterator<CancelQueryParam, com.google.protobuf.Empty>(message, log);
Comment thread
praveen2450 marked this conversation as resolved.
client.getStub().cancelQuery(client.getCancelQueryParamBuilder().build(), iterator.getObserver());
val asyncIterator = new AsyncStreamObserverIterator<CancelQueryParam, Empty>(message, log);
client.getStub().cancelQuery(client.getCancelQueryParamBuilder().build(), asyncIterator.getObserver());
val iterator = new SyncIteratorAdapter<>(asyncIterator);
// Check hasNext to ensure that the call completes
val ignored = iterator.hasNext();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
*/
package com.salesforce.datacloud.jdbc.protocol;

import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.AsyncStreamObserverIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.SyncIteratorAdapter;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -23,8 +24,9 @@ public class AsyncQueryAccessHandle implements QueryAccessHandle {
public static AsyncQueryAccessHandle of(HyperServiceGrpc.HyperServiceStub stub, QueryParam param) {
val message = "executeQuery. mode=" + param.getTransferMode();
// Submit request to start feeding the iterator
val messages = new BufferingStreamIterator<QueryParam, ExecuteQueryResponse>(message, log);
stub.executeQuery(param, messages.getObserver());
val asyncIterator = new AsyncStreamObserverIterator<QueryParam, ExecuteQueryResponse>(message, log);
stub.executeQuery(param, asyncIterator.getObserver());
val messages = new SyncIteratorAdapter<>(asyncIterator);

// The protocol guarantees that the first message is a Query Status message with a Query Id.
val queryStatus = messages.next().getQueryInfo().getQueryStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,29 @@
*/
package com.salesforce.datacloud.jdbc.protocol;

import com.salesforce.datacloud.jdbc.protocol.async.AsyncChunkRangeIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.SyncIteratorAdapter;
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.OutputFormat;
import salesforce.cdp.hyperdb.v1.QueryResult;
import salesforce.cdp.hyperdb.v1.QueryResultParam;

/**
* See {@link ChunkRangeIterator#of(QueryAccessGrpcClient, long, long, boolean, OutputFormat)}
* Synchronous iterator over a range of chunks from a query result.
*
* <p>This extends {@link SyncIteratorAdapter} wrapping {@link AsyncChunkRangeIterator} to provide
* a blocking iterator interface for backward compatibility.</p>
*
* <p>Note: To set a timeout configure the stub in the client accordingly.</p>
* <p>Attention: This iterator might throw {@link io.grpc.StatusRuntimeException} exceptions during
* {@link ChunkRangeIterator#hasNext()} and {@link ChunkRangeIterator#next()} calls.</p>
*
* @see AsyncChunkRangeIterator
*/
@Slf4j
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class ChunkRangeIterator implements Iterator<QueryResult> {
public class ChunkRangeIterator extends SyncIteratorAdapter<QueryResult> {

/**
* Provides an Iterator over a range of chunks. It doesn't validate the range of chunks, so it is on the caller to
* ensure that the result range is valid. No network calls will be done as part of this method call, only once the
Expand All @@ -37,70 +41,11 @@ public static ChunkRangeIterator of(
long limit,
boolean omitSchema,
@NonNull OutputFormat outputFormat) {
return new ChunkRangeIterator(chunkId, chunkId + limit, omitSchema, queryClient, outputFormat, null);
}

private long chunkId;

private final long limitChunkId;

private boolean omitSchema;

private final QueryAccessGrpcClient client;

private final OutputFormat outputFormat;

private BufferingStreamIterator<QueryResultParam, QueryResult> iterator;

@Override
public boolean hasNext() {
// This is the no op case where we have a non-empty iterator
if ((iterator != null) && iterator.hasNext()) {
return true;
}
// This is the case where we have finished all chunks
if (chunkId >= limitChunkId) {
return false;
}

// Here we need to fetch a chunk
val request = client.getQueryResultParamBuilder()
.setChunkId(chunkId++)
.setOmitSchema(omitSchema)
.setOutputFormat(outputFormat)
.build();
val message = String.format(
"getQueryResult queryId=%s, chunkId=%d, limit=%d",
client.getQueryId(), chunkId, limitChunkId - chunkId);
iterator = new BufferingStreamIterator<QueryResultParam, QueryResult>(message, log);
client.getStub().getQueryResult(request, iterator.getObserver());

if (iterator.hasNext()) {
// Even if omitSchema was initially false we only need the schema for the first chunk in the result stream.
if (!omitSchema) {
omitSchema = true;
}
return true;
} else if ((chunkId == 1) && (chunkId < limitChunkId)) {
// In special cases on adaptive timeout Hyper can produce an empty first chunk
// We thus retry immediately with next chunk in this case
return hasNext();
} else {
log.error(
"Unexpected empty chunk, stopping iterator before limit. queryId={}, chunkId={}, limit{}",
client.getQueryId(),
chunkId,
limitChunkId);
return false;
}
return new ChunkRangeIterator(
AsyncChunkRangeIterator.of(queryClient, chunkId, limit, omitSchema, outputFormat));
}

@Override
public QueryResult next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

return iterator.next();
private ChunkRangeIterator(AsyncChunkRangeIterator asyncIterator) {
super(asyncIterator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
*/
package com.salesforce.datacloud.jdbc.protocol;

import com.salesforce.datacloud.jdbc.protocol.async.AsyncQueryInfoIterator;
import com.salesforce.datacloud.jdbc.protocol.async.core.SyncIteratorAdapter;
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.QueryInfo;
import salesforce.cdp.hyperdb.v1.QueryInfoParam;
import salesforce.cdp.hyperdb.v1.QueryStatus;

/**
* See {@link QueryInfoIterator#of(QueryAccessGrpcClient)}.
* Synchronous iterator over QueryInfo messages of a Query.
*
* <p>This extends {@link SyncIteratorAdapter} wrapping {@link AsyncQueryInfoIterator} to provide
* a blocking iterator interface for backward compatibility.</p>
*
* <p>Note: To set a timeout configure the stub in the client accordingly.</p>
* <p>Attention: This iterator might throw {@link io.grpc.StatusRuntimeException} exceptions during
* {@link QueryInfoIterator#hasNext()} and {@link QueryInfoIterator#next()} calls.</p>
*
* @see AsyncQueryInfoIterator
*/
@Slf4j
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class QueryInfoIterator implements Iterator<QueryInfo>, AutoCloseable {
public class QueryInfoIterator extends SyncIteratorAdapter<QueryInfo> {

/**
* Provides an Iterator over QueryInfo messages of a Query. It'll keep iterating until the query is finished.
Expand All @@ -39,76 +39,10 @@ public class QueryInfoIterator implements Iterator<QueryInfo>, AutoCloseable {
* @return A new QueryInfoIterator instance
*/
public static QueryInfoIterator of(@NonNull QueryAccessGrpcClient queryClient) {
return new QueryInfoIterator(queryClient, null, false);
return new QueryInfoIterator(AsyncQueryInfoIterator.of(queryClient));
}

private final QueryAccessGrpcClient client;
private BufferingStreamIterator<QueryInfoParam, QueryInfo> iterator;
private boolean isFinished;

/**
* This extends the normal hasNext() logic with graceful handling of the CANCELLED error code which indicates
* that the stream has finished and that a new stream should be started.
* @return whether there is a next element
*/
private boolean hasNextFromIterator(int retryCount) {
try {
return iterator.hasNext();
} catch (StatusRuntimeException ex) {
if (ex.getStatus().getCode() == Status.Code.CANCELLED && (retryCount < 2)) {
return false;
}
throw ex;
}
}

@Override
public boolean hasNext() {
// Failsafe if cancelled happens too many times sequentially without an info message in-between. Every
// call should have at least one info returned.
int retryCount = 0;
while (true) {
// We have an iterator that still has infos
if ((iterator != null) && (hasNextFromIterator(retryCount))) {
retryCount = 0;
return true;
} else if (isFinished) {
// We have observed a query finish and thus have all query info objects. This check is consciously after
// the
// hasNext() check on an existing iterator to allow consumption of all QueryInfo events that might still
// trigger after a query finish.
return false;
} else {
++retryCount;
// Get a new set of infos
val request =
client.getQueryInfoParamBuilder().setStreaming(true).build();
val message = String.format("getQueryInfo queryId=%s, streaming=%s", client.getQueryId(), true);
iterator = new BufferingStreamIterator<QueryInfoParam, QueryInfo>(message, log);
client.getStub().getQueryInfo(request, iterator.getObserver());
// Continue with next iteration of the loop
}
}
}

@Override
public QueryInfo next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

val result = iterator.next();
if (result.hasQueryStatus()) {
isFinished = (result.getQueryStatus().getCompletionStatus() == QueryStatus.CompletionStatus.FINISHED);
}
return result;
}

@Override
public void close() {
// Close the iterator to ensure that ongoing streaming calls are properly closed
if (iterator != null) {
iterator.close();
}
private QueryInfoIterator(AsyncQueryInfoIterator asyncIterator) {
super(asyncIterator);
}
}
Loading
Loading