Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4e91edf
IGNITE-27947 Implement error handling for unset transaction in ClientSql
tmgodinho Mar 6, 2026
74b3741
IGNITE-27947 Add TX_Rollback implementation based on firstReqId
tmgodinho Mar 20, 2026
e7df787
IGNITE-27947 Allow PayloadOutputChannel to support multiple onSent ca…
tmgodinho Mar 20, 2026
46e0cd1
IGNITE-27947 Implement early rollback for Direct Transactions on the …
tmgodinho Mar 20, 2026
49c0fc2
IGNITE-27947 Add tests for Tx Blocked on conflicts in ItThinClientTra…
tmgodinho Mar 20, 2026
cdf9608
IGNITE-27947 Update CLientLazyTransaction error handling logic.
tmgodinho Mar 30, 2026
f649f76
IGNITE-27947 Apply code-review
tmgodinho Mar 30, 2026
6c2bf2f
IGNITE-27947 Apply code-reviews
tmgodinho Mar 30, 2026
52b8fe9
IGNITE-27947 Reconciled ClientSql and ClientTable transaction error h…
tmgodinho Mar 30, 2026
dfa9bed
IGNITE-27947 Add TX_ROLLBACK_USING_FIRST_REQUEST protocol feature
tmgodinho Apr 1, 2026
0264715
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 2, 2026
0dbb31c
IGNITE-27947 Apply code review
tmgodinho Apr 17, 2026
7bae687
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 17, 2026
d288b28
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 20, 2026
dc1f122
IGNITE-27947 Fix Merge remote-tracking branch 'origin/main' into igni…
tmgodinho Apr 20, 2026
49a9b25
IGNITE-27947 Harden ClientLazyTransaction#rollbackAsync against race …
tmgodinho Apr 21, 2026
8c64dcd
IGNITE-27947 Apply code-reviews
tmgodinho Apr 21, 2026
9915257
IGNITE-27947 Remove id from DirectTxUtils#tryHandleErrorOnFirstRequest
tmgodinho Apr 22, 2026
5b227b6
IGNITE-27947 Improve comment on ClientTransaction id field.
tmgodinho Apr 22, 2026
79b0f05
Merge remote-tracking branch 'origin/main' into ignite-27947
tmgodinho Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
/**
* Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary value instead of array).
*/
SQL_UPDATE_COUNTERS_2(18);
SQL_UPDATE_COUNTERS_2(18),

/**
* Allow rolling back direct transactions using the first request id.
*/
TX_ROLLBACK_USING_FIRST_REQUEST(19);

private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ public void testServerReturnsAllItsFeatures() throws IOException {
expected.set(16);
expected.set(17);
expected.set(18);
expected.set(19);

assertEquals(expected, supportedFeatures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
));

/** Connection id generator.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -58,6 +59,7 @@ public static CompletableFuture<ResponseWriter> process(
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
Expand All @@ -71,7 +73,9 @@ public static CompletableFuture<ResponseWriter> process(
null,
null,
null,
null
null,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
* transaction.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown user.
* @return Future representing result of operation.
*/
Expand All @@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Expand All @@ -119,7 +121,9 @@ public static CompletableFuture<ResponseWriter> process(
txManager,
tables,
notificationSender,
resIdHolder
resIdHolder,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand All @@ -43,6 +44,8 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -51,9 +54,21 @@ public static CompletableFuture<ResponseWriter> process(
QueryProcessor processor,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, metrics, null, null, null, null);
CompletableFuture<InternalTransaction> txFut = readTx(in,
tsTracker,
resources,
metrics,
null,
null,
null,
null,
requestId,
reqToTxMap
);

String schema = in.unpackString();
String query = in.unpackString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand Down Expand Up @@ -422,6 +423,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Transaction, if present, or null.
*/
public static CompletableFuture<@Nullable InternalTransaction> readTx(
Expand All @@ -432,7 +435,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable TxManager txManager,
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(
in,
Expand All @@ -443,6 +448,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
EnumSet.noneOf(RequestOptions.class)
);
}
Expand All @@ -456,6 +463,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
Expand All @@ -468,6 +477,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap,
EnumSet<RequestOptions> options
) {
if (in.tryUnpackNil()) {
Expand Down Expand Up @@ -509,11 +520,18 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
});

InternalTxOptions txOptions = builder.build();
var tx = startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
var tx = new DirectTransactionWithFirstRequest(
startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
reqToTxMap,
requestId
);

// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));

// Record the mapping between first request and resourceId.
reqToTxMap.put(requestId, resourceIdHolder[0]);

metrics.transactionsActiveIncrement();

return completedFuture(tx);
Expand Down Expand Up @@ -596,9 +614,23 @@ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(in, readTs, resources, metrics, txManager, tables, notificationSender, resourceIdHolder, options)
return readTx(
in,
readTs,
resources,
metrics,
txManager,
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
options
)
.thenApply(tx -> {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;

import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -47,6 +48,8 @@ public class ClientTupleContainsAllKeysRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
Expand All @@ -58,11 +61,13 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY);

return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options)
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -43,6 +45,8 @@ public class ClientTupleContainsKeyRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Transaction manager.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -52,9 +56,11 @@ public static CompletableFuture<ResponseWriter> process(
ClientHandlerMetricSource metrics,
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return ClientTupleRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
return readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -45,6 +46,8 @@ public class ClientTupleDeleteAllExactRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -55,13 +58,27 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class))
return readAsync(
in,
tables,
resources,
metrics,
txManager,
notificationSender,
tsTracker,
noneOf(RequestOptions.class),
requestId,
reqToTxMap
)
.thenCompose(req -> req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples, req.table().schemaView());
}));
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -45,6 +47,8 @@ public class ClientTupleDeleteAllRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -55,9 +59,11 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
return readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Loading