Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ private CompletableFuture<ResponseWriter> processOperation(
);

case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
return ClientSqlCursorNextResultRequest.process(in, resources, partitionOperationsExecutor, metrics);
return ClientSqlCursorNextResultRequest.process(partitionOperationsExecutor, in, resources, metrics, tsTracker);

case ClientOp.OPERATION_CANCEL:
return ClientOperationCancelRequest.process(in, cancelHandles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.QueryModifier;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.SqlCommon;
Expand Down Expand Up @@ -268,6 +270,7 @@ static CompletableFuture<ResponseWriter> writeResultSetAsync(
ClientResourceRegistry resources,
AsyncResultSetImpl asyncResultSet,
ClientHandlerMetricSource metrics,
HybridTimestampTracker parentTsTracker,
int pageSize,
boolean includePartitionAwarenessMeta,
boolean sqlDirectTxMappingSupported,
Expand All @@ -277,7 +280,7 @@ static CompletableFuture<ResponseWriter> writeResultSetAsync(
) {
try {
Long nextResultResourceId = sqlMultiStatementSupported && asyncResultSet.cursor().hasNextResult()
? saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, resources, executor)
? saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, resources, parentTsTracker, executor)
: null;

if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages())) {
Expand Down Expand Up @@ -317,10 +320,11 @@ private static Long saveNextResultResource(
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
int pageSize,
ClientResourceRegistry resources,
HybridTimestampTracker parentTsTracker,
Executor executor
) throws IgniteInternalCheckedException {
ClientResource resource = new ClientResource(
new CursorWithPageSize(nextResultFuture, pageSize),
new NextCursorContext(parentTsTracker, pageSize, nextResultFuture),
() -> nextResultFuture.thenAccept(cur -> iterateThroughResultsAndCloseThem(cur, executor))
);

Expand Down Expand Up @@ -414,22 +418,33 @@ private static void packPartitionAwarenessMeta(
}
}

/** Holder of the cursor future and page size. */
static class CursorWithPageSize {
/** Holder of the context for future result set retrieval. */
static class NextCursorContext {
private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture;
private final int pageSize;

CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture, int pageSize) {
this.cursorFuture = cursorFuture;
private final HybridTimestampTracker parentTsTracker;

NextCursorContext(
HybridTimestampTracker parentTsTracker,
int pageSize,
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture
) {
this.parentTsTracker = parentTsTracker;
this.pageSize = pageSize;
this.cursorFuture = cursorFuture;
}

CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
return cursorFuture;
/** Tracker of the request that initiated query processing (i.e. {@link ClientOp#SQL_EXEC}). */
HybridTimestampTracker parentTsTracker() {
return parentTsTracker;
}

int pageSize() {
return pageSize;
}

CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
return cursorFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.NextCursorContext;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
Expand All @@ -37,43 +38,54 @@ public class ClientSqlCursorNextResultRequest {
/**
* Processes the request.
*
* @param operationExecutor Operation executor.
* @param in Unpacker.
* @param resources Resource bundle.
* @param metrics Client metrics.
* @param requestTsTracker TS tracker attached to current request processing.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Executor operationExecutor,
ClientMessageUnpacker in,
ClientResourceRegistry resources,
Executor operationExecutor,
ClientHandlerMetricSource metrics
ClientHandlerMetricSource metrics,
HybridTimestampTracker requestTsTracker
Comment thread
solveme marked this conversation as resolved.
) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
ClientResource resource = resources.remove(resourceId);
CursorWithPageSize cursorWithPageSize = resource.get(CursorWithPageSize.class);
int pageSize = cursorWithPageSize.pageSize();
NextCursorContext nextCursorContext = resource.get(NextCursorContext.class);
HybridTimestampTracker parentTsTracker = nextCursorContext.parentTsTracker();
int pageSize = nextCursorContext.pageSize();

CompletableFuture<ResponseWriter> f = cursorWithPageSize.cursorFuture()
CompletableFuture<ResponseWriter> f = nextCursorContext.cursorFuture()
.thenComposeAsync(cur -> cur.requestNextAsync(pageSize)
.thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>(
cur,
batchRes,
pageSize
)
).thenCompose(asyncResultSet ->
ClientSqlCommon.writeResultSetAsync(
resources,
asyncResultSet,
metrics,
pageSize,
false,
false,
true,
false,
operationExecutor)
).thenApply(rsWriter -> rsWriter), operationExecutor);
).thenCompose(asyncResultSet -> {
// For multi-statement DML operations, this will help us keep the client's timestamp tracker up to date and
// ensure client reads are consistent with the latest updates.
requestTsTracker.update(parentTsTracker.get());

return ClientSqlCommon.writeResultSetAsync(
resources,
asyncResultSet,
metrics,
parentTsTracker,
pageSize,
false,
false,
true,
false,
operationExecutor);
}).thenApply(rsWriter -> rsWriter), operationExecutor);

f.whenCompleteAsync((r, t) -> {
if (t != null) {
cursorWithPageSize.cursorFuture().thenAccept(cur -> closeRemainingCursors(cur, false, operationExecutor));
nextCursorContext.cursorFuture().thenAccept(cur -> closeRemainingCursors(cur, false, operationExecutor));
}
}, operationExecutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
ClockService clockService,
NotificationSender notificationSender,
@Nullable String username,
boolean sqlMultistatementsSupported,
boolean sqlMultistatementSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Consumer<SqlQueryType> queryTypeListener
) {
Expand All @@ -121,7 +121,7 @@ public static CompletableFuture<ResponseWriter> process(
resIdHolder
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported);
ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementSupported);
String statement = in.unpackString();
Object[] arguments = readArgsNotNull(in);

Expand All @@ -146,10 +146,11 @@ public static CompletableFuture<ResponseWriter> process(
resources,
asyncResultSet,
metrics,
timestampTracker,
props.pageSize(),
includePartitionAwarenessMeta,
sqlDirectTxMappingSupported,
sqlMultistatementsSupported,
sqlMultistatementSupported,
sqlPartitionAwarenessQualifiedNameSupported,
operationExecutor))
.thenApply(rsWriter -> out -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
import static org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesInAnyOrder;
import static org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesOrder;
import static org.apache.ignite.jdbc.util.StatementResultCheck.isResultSet;
import static org.apache.ignite.jdbc.util.StatementResultCheck.isUpdateCounter;
import static org.apache.ignite.jdbc.util.StatementResultCheck.noMoreResults;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
Expand All @@ -35,9 +40,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.jdbc.JdbcStatement;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.jdbc.util.StatementResultCheck;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -281,18 +288,26 @@ public void statementMustCloseAllDependentCursors() throws SQLException {

@Test
public void testMixedDmlQueryExecute() throws Exception {
boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
assertFalse(res);
assertEquals(1, getResultSetSize());
assertStatementResults("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE FROM TEST_TX WHERE ID=6; SELECT 1;",
isUpdateCounter(1),
isUpdateCounter(1),
isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
noMoreResults()
);

res = stmt.execute("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5'); DELETE FROM TEST_TX WHERE ID=6;");
assertEquals(true, res);
assertEquals(1, getResultSetSize());
assertStatementResults("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5'); DELETE FROM TEST_TX WHERE ID=6;",
isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
isUpdateCounter(1),
isUpdateCounter(0),
noMoreResults()
);

// empty results set in the middle
res = stmt.execute("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES (6, 6, '6'); SELECT * FROM TEST_TX;");
assertEquals(true, res);
assertEquals(11, getResultSetSize());
assertStatementResults("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES (6, 6, '6'); SELECT * FROM TEST_TX;",
isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3, 4, 7)),
isUpdateCounter(1),
isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3, 4, 6, 7)),
noMoreResults()
);
}

@Test
Expand Down Expand Up @@ -780,6 +795,18 @@ private int getResultSetSize() throws SQLException {
return size;
}

/** Verifies that after query execution statement returns results that satisfy provided assertions. */
private void assertStatementResults(String query, StatementResultCheck... resultCheck) throws SQLException {
List<StatementResultCheck> checks = List.of(resultCheck);

stmt.execute(query);

for (StatementResultCheck check : checks) {
check.check(stmt);
stmt.getMoreResults();
}
}

private boolean checkNoMoreResults() throws SQLException {
boolean more = stmt.getMoreResults();
int updCnt = stmt.getUpdateCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.jdbc.util;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
* Functional interface for extracting a value from the current row of a {@link ResultSet}.
*
* @param <T> Type of the extracted value.
*/
@FunctionalInterface
public interface RowColumnProjection<T> {
/** Extracts a value from the current row of {@code rs}. */
T extract(ResultSet rs) throws SQLException;

/** Drains result set to list by projecting each record with provided extractor. */
static <T> List<T> projectRowsColumn(ResultSet rs, RowColumnProjection<T> extractor) throws SQLException {
List<T> result = new ArrayList<>();

while (rs.next()) {
result.add(extractor.extract(rs));
}

return result;
}
}
Loading