Skip to content

Commit 3f0806b

Browse files
authored
IGNITE-27947 Fix rollback for client piggyback tx (#7844)
Allow rollback for piggyback transactions: piggyback client tx combines **transaction start** with **table operation** in one client request. If the table operation is stuck on a lock, the client never receives the transaction id. Instead, we use the `requestId` (generated on the client) to rollback the transaction. * Server-side: * `TX_ROLLBACK` accepts a request id of the first request ("piggyback") of a direct mapped TX. Request Id is encoded in the negative range of `resourceId`. * Track `requestId -> txId` to roll back piggyback transactions stuck on a lock * Update all the operations. RO ops have the same parameters just for consistency. * Client Side: * Allow multiple `onSent` callbacks on the payload output object. * Add info to `ClientLazyTransaction` about the first request in the TX, update via `PayloadOutputChannel` on successful request. * Implement `TX_ROLLBACK` based on `firstReqId`
1 parent 2c9fce5 commit 3f0806b

38 files changed

Lines changed: 1164 additions & 201 deletions

File tree

modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
117117
/**
118118
* Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary value instead of array).
119119
*/
120-
SQL_UPDATE_COUNTERS_2(18);
120+
SQL_UPDATE_COUNTERS_2(18),
121+
122+
/**
123+
* Allow rolling back direct transactions using the first request id.
124+
*/
125+
TX_ROLLBACK_USING_FIRST_REQUEST(19);
121126

122127
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
123128
EnumSet.allOf(ProtocolBitmaskFeature.class);

modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ public void testServerReturnsAllItsFeatures() throws IOException {
565565
expected.set(16);
566566
expected.set(17);
567567
expected.set(18);
568+
expected.set(19);
568569

569570
assertEquals(expected, supportedFeatures);
570571

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
103103
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
104104
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
105105
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
106-
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
106+
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
107+
ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
107108
));
108109

109110
/** Connection id generator.

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 248 additions & 35 deletions
Large diffs are not rendered by default.

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class ClientSqlExecuteBatchRequest {
4848
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
4949
* thread.
5050
* @param username Authenticated user name.
51+
* @param reqToTxMap Tracker for first request of direct transactions.
5152
* @return Future representing result of operation.
5253
*/
5354
public static CompletableFuture<ResponseWriter> process(
@@ -58,6 +59,7 @@ public static CompletableFuture<ResponseWriter> process(
5859
long requestId,
5960
Map<Long, CancelHandle> cancelHandleMap,
6061
HybridTimestampTracker tsTracker,
62+
Map<Long, Long> reqToTxMap,
6163
String username
6264
) {
6365
CancelHandle cancelHandle = CancelHandle.create();
@@ -71,7 +73,9 @@ public static CompletableFuture<ResponseWriter> process(
7173
null,
7274
null,
7375
null,
74-
null
76+
null,
77+
requestId,
78+
reqToTxMap
7579
);
7680

7781
ClientSqlProperties props = new ClientSqlProperties(in, false);

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
7979
* transaction.
8080
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
8181
* transaction.
82+
* @param reqToTxMap Tracker for first request of direct transactions.
8283
* @param username Authenticated user name or {@code null} for unknown user.
8384
* @return Future representing result of operation.
8485
*/
@@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
9798
IgniteTables tables,
9899
ClockService clockService,
99100
NotificationSender notificationSender,
101+
Map<Long, Long> reqToTxMap,
100102
@Nullable String username,
101103
boolean sqlMultistatementSupported,
102104
boolean sqlPartitionAwarenessQualifiedNameSupported,
@@ -119,7 +121,9 @@ public static CompletableFuture<ResponseWriter> process(
119121
txManager,
120122
tables,
121123
notificationSender,
122-
resIdHolder
124+
resIdHolder,
125+
requestId,
126+
reqToTxMap
123127
);
124128

125129
ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementSupported);

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
2121

22+
import java.util.Map;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.Executor;
2425
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -43,6 +44,8 @@ public class ClientSqlQueryMetadataRequest {
4344
* @param in Unpacker.
4445
* @param processor SQL API.
4546
* @param resources Resources.
47+
* @param requestId Id of the request.
48+
* @param reqToTxMap Tracker for first request of direct transactions.
4649
* @return Future representing result of operation.
4750
*/
4851
public static CompletableFuture<ResponseWriter> process(
@@ -51,9 +54,21 @@ public static CompletableFuture<ResponseWriter> process(
5154
QueryProcessor processor,
5255
ClientResourceRegistry resources,
5356
ClientHandlerMetricSource metrics,
54-
HybridTimestampTracker tsTracker
57+
HybridTimestampTracker tsTracker,
58+
long requestId,
59+
Map<Long, Long> reqToTxMap
5560
) {
56-
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, metrics, null, null, null, null);
61+
CompletableFuture<InternalTransaction> txFut = readTx(in,
62+
tsTracker,
63+
resources,
64+
metrics,
65+
null,
66+
null,
67+
null,
68+
null,
69+
requestId,
70+
reqToTxMap
71+
);
5772

5873
String schema = in.unpackString();
5974
String query = in.unpackString();

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.BitSet;
3131
import java.util.Collection;
3232
import java.util.EnumSet;
33+
import java.util.Map;
3334
import java.util.UUID;
3435
import java.util.concurrent.CompletableFuture;
3536
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -422,6 +423,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
422423
* @param txManager Tx manager.
423424
* @param notificationSender Notification sender.
424425
* @param resourceIdHolder Resource id holder.
426+
* @param requestId Id of the request.
427+
* @param reqToTxMap Tracker for first request of direct transactions.
425428
* @return Transaction, if present, or null.
426429
*/
427430
public static CompletableFuture<@Nullable InternalTransaction> readTx(
@@ -432,7 +435,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
432435
@Nullable TxManager txManager,
433436
@Nullable IgniteTables tables,
434437
@Nullable NotificationSender notificationSender,
435-
long[] resourceIdHolder
438+
long[] resourceIdHolder,
439+
long requestId,
440+
Map<Long, Long> reqToTxMap
436441
) {
437442
return readTx(
438443
in,
@@ -443,6 +448,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
443448
tables,
444449
notificationSender,
445450
resourceIdHolder,
451+
requestId,
452+
reqToTxMap,
446453
EnumSet.noneOf(RequestOptions.class)
447454
);
448455
}
@@ -456,6 +463,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
456463
* @param txManager Tx manager.
457464
* @param notificationSender Notification sender.
458465
* @param resourceIdHolder Resource id holder.
466+
* @param requestId Id of the request.
467+
* @param reqToTxMap Tracker for first request of direct transactions.
459468
* @param options Request options. Defines how a request is processed.
460469
* @return Transaction, if present, or null.
461470
*/
@@ -468,6 +477,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
468477
@Nullable IgniteTables tables,
469478
@Nullable NotificationSender notificationSender,
470479
long[] resourceIdHolder,
480+
long requestId,
481+
Map<Long, Long> reqToTxMap,
471482
EnumSet<RequestOptions> options
472483
) {
473484
if (in.tryUnpackNil()) {
@@ -509,11 +520,18 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
509520
});
510521

511522
InternalTxOptions txOptions = builder.build();
512-
var tx = startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
523+
var tx = new DirectTransactionWithFirstRequest(
524+
startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
525+
reqToTxMap,
526+
requestId
527+
);
513528

514529
// Attach resource id only on first direct request.
515530
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));
516531

532+
// Record the mapping between first request and resourceId.
533+
reqToTxMap.put(requestId, resourceIdHolder[0]);
534+
517535
metrics.transactionsActiveIncrement();
518536

519537
return completedFuture(tx);
@@ -596,9 +614,23 @@ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
596614
IgniteTables tables,
597615
EnumSet<RequestOptions> options,
598616
@Nullable NotificationSender notificationSender,
599-
long[] resourceIdHolder
617+
long[] resourceIdHolder,
618+
long requestId,
619+
Map<Long, Long> reqToTxMap
600620
) {
601-
return readTx(in, readTs, resources, metrics, txManager, tables, notificationSender, resourceIdHolder, options)
621+
return readTx(
622+
in,
623+
readTs,
624+
resources,
625+
metrics,
626+
txManager,
627+
tables,
628+
notificationSender,
629+
resourceIdHolder,
630+
requestId,
631+
reqToTxMap,
632+
options
633+
)
602634
.thenApply(tx -> {
603635
if (tx == null) {
604636
// Implicit transactions do not use an observation timestamp because RW never depends on it,

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
2424

2525
import java.util.EnumSet;
26+
import java.util.Map;
2627
import java.util.concurrent.CompletableFuture;
2728
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
2829
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -47,6 +48,8 @@ public class ClientTupleContainsAllKeysRequest {
4748
* @param txManager Transaction manager.
4849
* @param clockService Clock service.
4950
* @param tsTracker Tracker.
51+
* @param requestId Id of the request.
52+
* @param reqToTxMap Tracker for first request of direct transactions.
5053
* @param supportsOptions {@code True} if supports tx options.
5154
* @return Future.
5255
*/
@@ -58,11 +61,13 @@ public static CompletableFuture<ResponseWriter> process(
5861
TxManager txManager,
5962
ClockService clockService,
6063
HybridTimestampTracker tsTracker,
64+
long requestId,
65+
Map<Long, Long> reqToTxMap,
6166
boolean supportsOptions
6267
) {
6368
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY);
6469

65-
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options)
70+
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
6671
.thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples())
6772
.thenApply(containsAll -> out -> {
6873
writeTxMeta(out, tsTracker, clockService, req);

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
2222
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
2323
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
24+
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
2425

26+
import java.util.Map;
2527
import java.util.concurrent.CompletableFuture;
2628
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
2729
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +45,8 @@ public class ClientTupleContainsKeyRequest {
4345
* @param tables Ignite tables.
4446
* @param resources Resource registry.
4547
* @param txManager Transaction manager.
48+
* @param requestId Id of the request.
49+
* @param reqToTxMap Tracker for first request of direct transactions.
4650
* @return Future.
4751
*/
4852
public static CompletableFuture<ResponseWriter> process(
@@ -52,9 +56,11 @@ public static CompletableFuture<ResponseWriter> process(
5256
ClientHandlerMetricSource metrics,
5357
TxManager txManager,
5458
ClockService clockService,
55-
HybridTimestampTracker tsTracker
59+
HybridTimestampTracker tsTracker,
60+
long requestId,
61+
Map<Long, Long> reqToTxMap
5662
) {
57-
return ClientTupleRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
63+
return readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
5864
.thenCompose(req -> req.table().recordView().containsAsync(req.tx(), req.tuple())
5965
.thenApply(res -> out -> {
6066
writeTxMeta(out, tsTracker, clockService, req);

0 commit comments

Comments
 (0)