Skip to content

Commit 35f8ea2

Browse files
committed
feat: Provide a low level Async Interface
(THIS IS STILL A DRAFT, but completes the tests) The core protocol logic is now written using Java8 CompletionStage / Future style async programming. Frameworks like JavaRx or Vert.X provide interoperability shims and thus this change unlocks the adoption of our protocol flow in such services. For the JDBC driver which is using a synchronous interface there is a `SyncIteratorAdapert` that allows the high level logic to operate like before.
1 parent 6b2c170 commit 35f8ea2

21 files changed

Lines changed: 1325 additions & 750 deletions

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static com.salesforce.datacloud.jdbc.logging.ElapsedLogger.logTimedValue;
1010
import static com.salesforce.datacloud.jdbc.util.ArrowUtils.toColumnMetaData;
1111

12+
import com.google.protobuf.Empty;
1213
import com.salesforce.datacloud.jdbc.core.partial.DataCloudQueryPolling;
1314
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
1415
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
@@ -17,8 +18,9 @@
1718
import com.salesforce.datacloud.jdbc.protocol.QueryResultArrowStream;
1819
import com.salesforce.datacloud.jdbc.protocol.QuerySchemaAccessor;
1920
import com.salesforce.datacloud.jdbc.protocol.RowRangeIterator;
21+
import com.salesforce.datacloud.jdbc.protocol.async.util.AsyncStreamObserverIterator;
22+
import com.salesforce.datacloud.jdbc.protocol.async.util.SyncIteratorAdapter;
2023
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
21-
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
2224
import com.salesforce.datacloud.jdbc.util.Deadline;
2325
import com.salesforce.datacloud.jdbc.util.JdbcURL;
2426
import com.salesforce.datacloud.jdbc.util.ThrowingJdbcSupplier;
@@ -347,8 +349,9 @@ public void cancelQuery(String queryId) throws SQLException {
347349
try {
348350
val client = QueryAccessGrpcClient.of(queryId, getStub());
349351
val message = "cancel queryId=" + queryId;
350-
val iterator = new BufferingStreamIterator<CancelQueryParam, com.google.protobuf.Empty>(message, log);
351-
client.getStub().cancelQuery(client.getCancelQueryParamBuilder().build(), iterator.getObserver());
352+
val asyncIterator = new AsyncStreamObserverIterator<CancelQueryParam, Empty>(message, log);
353+
client.getStub().cancelQuery(client.getCancelQueryParamBuilder().build(), asyncIterator.getObserver());
354+
val iterator = new SyncIteratorAdapter<>(asyncIterator);
352355
// Check hasNext to ensure that the call completes
353356
val ignored = iterator.hasNext();
354357
} catch (Exception ex) {

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/protocol/AsyncQueryAccessHandle.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
*/
55
package com.salesforce.datacloud.jdbc.protocol;
66

7-
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
7+
import com.salesforce.datacloud.jdbc.protocol.async.util.AsyncStreamObserverIterator;
8+
import com.salesforce.datacloud.jdbc.protocol.async.util.SyncIteratorAdapter;
89
import lombok.AllArgsConstructor;
910
import lombok.Getter;
1011
import lombok.extern.slf4j.Slf4j;
@@ -23,8 +24,9 @@ public class AsyncQueryAccessHandle implements QueryAccessHandle {
2324
public static AsyncQueryAccessHandle of(HyperServiceGrpc.HyperServiceStub stub, QueryParam param) {
2425
val message = "executeQuery. mode=" + param.getTransferMode();
2526
// Submit request to start feeding the iterator
26-
val messages = new BufferingStreamIterator<QueryParam, ExecuteQueryResponse>(message, log);
27-
stub.executeQuery(param, messages.getObserver());
27+
val asyncIterator = new AsyncStreamObserverIterator<QueryParam, ExecuteQueryResponse>(message, log);
28+
stub.executeQuery(param, asyncIterator.getObserver());
29+
val messages = new SyncIteratorAdapter<>(asyncIterator);
2830

2931
// The protocol guarantees that the first message is a Query Status message with a Query Id.
3032
val queryStatus = messages.next().getQueryInfo().getQueryStatus();

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/protocol/ChunkRangeIterator.java

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,29 @@
44
*/
55
package com.salesforce.datacloud.jdbc.protocol;
66

7+
import com.salesforce.datacloud.jdbc.protocol.async.AsyncChunkRangeIterator;
8+
import com.salesforce.datacloud.jdbc.protocol.async.util.SyncIteratorAdapter;
79
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
8-
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
9-
import java.util.Iterator;
10-
import java.util.NoSuchElementException;
11-
import lombok.AccessLevel;
12-
import lombok.AllArgsConstructor;
1310
import lombok.NonNull;
1411
import lombok.extern.slf4j.Slf4j;
15-
import lombok.val;
1612
import salesforce.cdp.hyperdb.v1.OutputFormat;
1713
import salesforce.cdp.hyperdb.v1.QueryResult;
18-
import salesforce.cdp.hyperdb.v1.QueryResultParam;
1914

2015
/**
21-
* See {@link ChunkRangeIterator#of(QueryAccessGrpcClient, long, long, boolean, OutputFormat)}
16+
* Synchronous iterator over a range of chunks from a query result.
17+
*
18+
* <p>This extends {@link SyncIteratorAdapter} wrapping {@link AsyncChunkRangeIterator} to provide
19+
* a blocking iterator interface for backward compatibility.</p>
20+
*
21+
* <p>Note: To set a timeout configure the stub in the client accordingly.</p>
22+
* <p>Attention: This iterator might throw {@link io.grpc.StatusRuntimeException} exceptions during
23+
* {@link ChunkRangeIterator#hasNext()} and {@link ChunkRangeIterator#next()} calls.</p>
24+
*
25+
* @see AsyncChunkRangeIterator
2226
*/
2327
@Slf4j
24-
@AllArgsConstructor(access = AccessLevel.PRIVATE)
25-
public class ChunkRangeIterator implements Iterator<QueryResult> {
28+
public class ChunkRangeIterator extends SyncIteratorAdapter<QueryResult> {
29+
2630
/**
2731
* Provides an Iterator over a range of chunks. It doesn't validate the range of chunks, so it is on the caller to
2832
* ensure that the result range is valid. No network calls will be done as part of this method call, only once the
@@ -37,70 +41,11 @@ public static ChunkRangeIterator of(
3741
long limit,
3842
boolean omitSchema,
3943
@NonNull OutputFormat outputFormat) {
40-
return new ChunkRangeIterator(chunkId, chunkId + limit, omitSchema, queryClient, outputFormat, null);
41-
}
42-
43-
private long chunkId;
44-
45-
private final long limitChunkId;
46-
47-
private boolean omitSchema;
48-
49-
private final QueryAccessGrpcClient client;
50-
51-
private final OutputFormat outputFormat;
52-
53-
private BufferingStreamIterator<QueryResultParam, QueryResult> iterator;
54-
55-
@Override
56-
public boolean hasNext() {
57-
// This is the no op case where we have a non-empty iterator
58-
if ((iterator != null) && iterator.hasNext()) {
59-
return true;
60-
}
61-
// This is the case where we have finished all chunks
62-
if (chunkId >= limitChunkId) {
63-
return false;
64-
}
65-
66-
// Here we need to fetch a chunk
67-
val request = client.getQueryResultParamBuilder()
68-
.setChunkId(chunkId++)
69-
.setOmitSchema(omitSchema)
70-
.setOutputFormat(outputFormat)
71-
.build();
72-
val message = String.format(
73-
"getQueryResult queryId=%s, chunkId=%d, limit=%d",
74-
client.getQueryId(), chunkId, limitChunkId - chunkId);
75-
iterator = new BufferingStreamIterator<QueryResultParam, QueryResult>(message, log);
76-
client.getStub().getQueryResult(request, iterator.getObserver());
77-
78-
if (iterator.hasNext()) {
79-
// Even if omitSchema was initially false we only need the schema for the first chunk in the result stream.
80-
if (!omitSchema) {
81-
omitSchema = true;
82-
}
83-
return true;
84-
} else if ((chunkId == 1) && (chunkId < limitChunkId)) {
85-
// In special cases on adaptive timeout Hyper can produce an empty first chunk
86-
// We thus retry immediately with next chunk in this case
87-
return hasNext();
88-
} else {
89-
log.error(
90-
"Unexpected empty chunk, stopping iterator before limit. queryId={}, chunkId={}, limit{}",
91-
client.getQueryId(),
92-
chunkId,
93-
limitChunkId);
94-
return false;
95-
}
44+
return new ChunkRangeIterator(
45+
AsyncChunkRangeIterator.of(queryClient, chunkId, limit, omitSchema, outputFormat));
9646
}
9747

98-
@Override
99-
public QueryResult next() {
100-
if (!hasNext()) {
101-
throw new NoSuchElementException();
102-
}
103-
104-
return iterator.next();
48+
private ChunkRangeIterator(AsyncChunkRangeIterator asyncIterator) {
49+
super(asyncIterator);
10550
}
10651
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/protocol/QueryInfoIterator.java

Lines changed: 16 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,27 @@
44
*/
55
package com.salesforce.datacloud.jdbc.protocol;
66

7+
import com.salesforce.datacloud.jdbc.protocol.async.AsyncQueryInfoIterator;
8+
import com.salesforce.datacloud.jdbc.protocol.async.util.SyncIteratorAdapter;
79
import com.salesforce.datacloud.jdbc.protocol.grpc.QueryAccessGrpcClient;
8-
import com.salesforce.datacloud.jdbc.protocol.grpc.util.BufferingStreamIterator;
9-
import io.grpc.Status;
10-
import io.grpc.StatusRuntimeException;
11-
import java.util.Iterator;
12-
import java.util.NoSuchElementException;
13-
import lombok.AccessLevel;
14-
import lombok.AllArgsConstructor;
1510
import lombok.NonNull;
1611
import lombok.extern.slf4j.Slf4j;
17-
import lombok.val;
1812
import salesforce.cdp.hyperdb.v1.QueryInfo;
19-
import salesforce.cdp.hyperdb.v1.QueryInfoParam;
20-
import salesforce.cdp.hyperdb.v1.QueryStatus;
2113

2214
/**
23-
* See {@link QueryInfoIterator#of(QueryAccessGrpcClient)}.
15+
* Synchronous iterator over QueryInfo messages of a Query.
16+
*
17+
* <p>This extends {@link SyncIteratorAdapter} wrapping {@link AsyncQueryInfoIterator} to provide
18+
* a blocking iterator interface for backward compatibility.</p>
19+
*
20+
* <p>Note: To set a timeout configure the stub in the client accordingly.</p>
21+
* <p>Attention: This iterator might throw {@link io.grpc.StatusRuntimeException} exceptions during
22+
* {@link QueryInfoIterator#hasNext()} and {@link QueryInfoIterator#next()} calls.</p>
23+
*
24+
* @see AsyncQueryInfoIterator
2425
*/
2526
@Slf4j
26-
@AllArgsConstructor(access = AccessLevel.PRIVATE)
27-
public class QueryInfoIterator implements Iterator<QueryInfo>, AutoCloseable {
27+
public class QueryInfoIterator extends SyncIteratorAdapter<QueryInfo> {
2828

2929
/**
3030
* Provides an Iterator over QueryInfo messages of a Query. It'll keep iterating until the query is finished.
@@ -39,76 +39,10 @@ public class QueryInfoIterator implements Iterator<QueryInfo>, AutoCloseable {
3939
* @return A new QueryInfoIterator instance
4040
*/
4141
public static QueryInfoIterator of(@NonNull QueryAccessGrpcClient queryClient) {
42-
return new QueryInfoIterator(queryClient, null, false);
42+
return new QueryInfoIterator(AsyncQueryInfoIterator.of(queryClient));
4343
}
4444

45-
private final QueryAccessGrpcClient client;
46-
private BufferingStreamIterator<QueryInfoParam, QueryInfo> iterator;
47-
private boolean isFinished;
48-
49-
/**
50-
* This extends the normal hasNext() logic with graceful handling of the CANCELLED error code which indicates
51-
* that the stream has finished and that a new stream should be started.
52-
* @return whether there is a next element
53-
*/
54-
private boolean hasNextFromIterator(int retryCount) {
55-
try {
56-
return iterator.hasNext();
57-
} catch (StatusRuntimeException ex) {
58-
if (ex.getStatus().getCode() == Status.Code.CANCELLED && (retryCount < 2)) {
59-
return false;
60-
}
61-
throw ex;
62-
}
63-
}
64-
65-
@Override
66-
public boolean hasNext() {
67-
// Failsafe if cancelled happens too many times sequentially without an info message in-between. Every
68-
// call should have at least one info returned.
69-
int retryCount = 0;
70-
while (true) {
71-
// We have an iterator that still has infos
72-
if ((iterator != null) && (hasNextFromIterator(retryCount))) {
73-
retryCount = 0;
74-
return true;
75-
} else if (isFinished) {
76-
// We have observed a query finish and thus have all query info objects. This check is consciously after
77-
// the
78-
// hasNext() check on an existing iterator to allow consumption of all QueryInfo events that might still
79-
// trigger after a query finish.
80-
return false;
81-
} else {
82-
++retryCount;
83-
// Get a new set of infos
84-
val request =
85-
client.getQueryInfoParamBuilder().setStreaming(true).build();
86-
val message = String.format("getQueryInfo queryId=%s, streaming=%s", client.getQueryId(), true);
87-
iterator = new BufferingStreamIterator<QueryInfoParam, QueryInfo>(message, log);
88-
client.getStub().getQueryInfo(request, iterator.getObserver());
89-
// Continue with next iteration of the loop
90-
}
91-
}
92-
}
93-
94-
@Override
95-
public QueryInfo next() {
96-
if (!hasNext()) {
97-
throw new NoSuchElementException();
98-
}
99-
100-
val result = iterator.next();
101-
if (result.hasQueryStatus()) {
102-
isFinished = (result.getQueryStatus().getCompletionStatus() == QueryStatus.CompletionStatus.FINISHED);
103-
}
104-
return result;
105-
}
106-
107-
@Override
108-
public void close() {
109-
// Close the iterator to ensure that ongoing streaming calls are properly closed
110-
if (iterator != null) {
111-
iterator.close();
112-
}
45+
private QueryInfoIterator(AsyncQueryInfoIterator asyncIterator) {
46+
super(asyncIterator);
11347
}
11448
}

0 commit comments

Comments
 (0)