Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4e91edf
IGNITE-27947 Implement error handling for unset transaction in ClientSql
tmgodinho Mar 6, 2026
74b3741
IGNITE-27947 Add TX_Rollback implementation based on firstReqId
tmgodinho Mar 20, 2026
e7df787
IGNITE-27947 Allow PayloadOutputChannel to support multiple onSent ca…
tmgodinho Mar 20, 2026
46e0cd1
IGNITE-27947 Implement early rollback for Direct Transactions on the …
tmgodinho Mar 20, 2026
49c0fc2
IGNITE-27947 Add tests for Tx Blocked on conflicts in ItThinClientTra…
tmgodinho Mar 20, 2026
cdf9608
IGNITE-27947 Update CLientLazyTransaction error handling logic.
tmgodinho Mar 30, 2026
f649f76
IGNITE-27947 Apply code-review
tmgodinho Mar 30, 2026
6c2bf2f
IGNITE-27947 Apply code-reviews
tmgodinho Mar 30, 2026
52b8fe9
IGNITE-27947 Reconciled ClientSql and ClientTable transaction error h…
tmgodinho Mar 30, 2026
dfa9bed
IGNITE-27947 Add TX_ROLLBACK_USING_FIRST_REQUEST protocol feature
tmgodinho Apr 1, 2026
0264715
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 2, 2026
0dbb31c
IGNITE-27947 Apply code review
tmgodinho Apr 17, 2026
7bae687
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 17, 2026
d288b28
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 20, 2026
dc1f122
IGNITE-27947 Fix Merge remote-tracking branch 'origin/main' into igni…
tmgodinho Apr 20, 2026
49a9b25
IGNITE-27947 Harden ClientLazyTransaction#rollbackAsync against race …
tmgodinho Apr 21, 2026
8c64dcd
IGNITE-27947 Apply code-reviews
tmgodinho Apr 21, 2026
9915257
IGNITE-27947 Remove id from DirectTxUtils#tryHandleErrorOnFirstRequest
tmgodinho Apr 22, 2026
5b227b6
IGNITE-27947 Improve comment on ClientTransaction id field.
tmgodinho Apr 22, 2026
79b0f05
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableCommon;
import org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
Expand Down Expand Up @@ -189,6 +190,7 @@
import org.apache.ignite.security.exception.InvalidCredentialsException;
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -291,6 +293,25 @@ public class ClientInboundMessageHandler

private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;

/**
* Tracks mappings between the first {@code requestId} and the {@code resourceId}
* holding the Tx object for directly mapped transactions. The process is not localized.
*
* <p><b>Mappings are created:</b>
* <ul>
* <li>When a direct transaction is created in {@link ClientTableCommon#readTx(ClientMessageUnpacker, HybridTimestampTracker,
* ClientResourceRegistry, TxManager, IgniteTables, NotificationSender, long[], long, Map)}.
* </li>
* </ul>
*
* <p><b>Mappings are removed:</b>
* <ul>
* <li>During a rollback request.</li>
* <li>After the first request response is sent to the client.</li>
Comment thread
ptupitsyn marked this conversation as resolved.
Outdated
* </ul>
*/
private final Map<Long, Long> firstReqToTxResMap = new ConcurrentHashMap<>();

/**
* Constructor.
*
Expand Down Expand Up @@ -899,70 +920,72 @@ private CompletableFuture<ResponseWriter> processOperation(
return ClientTableGetRequest.process(in, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(
in, igniteTables, resources, txManager, clockService, notificationSender(requestId), tsTracker);
return ClientTupleUpsertRequest.process(in, igniteTables, resources, txManager, clockService, notificationSender(requestId),
tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
return ClientTupleContainsAllKeysRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in, jdbcQueryEventHandler, resolveCurrentUsername());
Expand Down Expand Up @@ -1011,7 +1034,7 @@ private CompletableFuture<ResponseWriter> processOperation(
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);

case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables,
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables, firstReqToTxResMap,
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));

case ClientOp.COMPUTE_EXECUTE:
Expand Down Expand Up @@ -1066,6 +1089,7 @@ private CompletableFuture<ResponseWriter> processOperation(
igniteTables,
clockService,
notificationSender(requestId),
firstReqToTxResMap,
resolveCurrentUsername(),
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
Expand Down Expand Up @@ -1097,12 +1121,12 @@ partitionOperationsExecutor, in, queryProcessor, requestId, cancelHandles, tsTra

case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker, requestId, firstReqToTxResMap
);

case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(
in, queryProcessor, resources, requestId, cancelHandles, tsTracker,
in, queryProcessor, resources, requestId, cancelHandles, tsTracker, firstReqToTxResMap,
resolveCurrentUsername()
);

Expand Down Expand Up @@ -1171,6 +1195,7 @@ private void processOperationInternal(
if (err != null) {
writeError(requestId, opCode, (Throwable) err, ctx, false);
metrics.requestsFailedIncrement();
firstReqToTxResMap.remove(requestId);
return;
}

Expand Down Expand Up @@ -1200,6 +1225,8 @@ private void processOperationInternal(
writeError(requestId, opCode, e, ctx, false);
metrics.requestsFailedIncrement();
}

firstReqToTxResMap.remove(requestId);
Comment thread
ptupitsyn marked this conversation as resolved.
Outdated
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -56,6 +57,7 @@ public static CompletableFuture<ResponseWriter> process(
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
Expand All @@ -68,7 +70,9 @@ public static CompletableFuture<ResponseWriter> process(
null,
null,
null,
null
null,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
* transaction.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown user.
* @return Future representing result of operation.
*/
Expand All @@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementsSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Expand All @@ -118,7 +120,9 @@ public static CompletableFuture<ResponseWriter> process(
txManager,
tables,
notificationSender,
resIdHolder
resIdHolder,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -42,16 +43,20 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Executor operationExecutor,
ClientMessageUnpacker in,
QueryProcessor processor,
ClientResourceRegistry resources,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null);
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null, requestId, reqToTxMap);

String schema = in.unpackString();
String query = in.unpackString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResource;
Expand Down Expand Up @@ -421,6 +422,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Transaction, if present, or null.
*/
public static CompletableFuture<@Nullable InternalTransaction> readTx(
Expand All @@ -430,7 +433,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable TxManager txManager,
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(
in,
Expand All @@ -440,6 +445,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
EnumSet.noneOf(RequestOptions.class)
);
}
Expand All @@ -453,6 +460,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
Expand All @@ -464,6 +473,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap,
EnumSet<RequestOptions> options
) {
if (in.tryUnpackNil()) {
Expand Down Expand Up @@ -510,6 +521,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));

// Record the mapping between first request and resourceId.
reqToTxMap.put(requestId, resourceIdHolder[0]);

return completedFuture(tx);
} else if (id == TX_ID_DIRECT) {
assert txManager != null : "Transaction manager must be specified to process directly mapped requests.";
Expand Down Expand Up @@ -589,9 +603,11 @@ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, options)
return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, requestId, reqToTxMap, options)
.thenApply(tx -> {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;

import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
Expand All @@ -46,6 +47,8 @@ public class ClientTupleContainsAllKeysRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
Expand All @@ -56,11 +59,13 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY);

return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options)
return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Loading