Skip to content

Commit f7d0baf

Browse files
authored
IGNITE-24963 Introduce fair wound wait deadlock prevention algorithm (#7799)
1 parent b033fc9 commit f7d0baf

File tree

67 files changed

+3434
-2511
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+3434
-2511
lines changed

modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ default CompletableFuture<Transaction> beginAsync() {
131131
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
132132
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
133133
* <br>
134-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
134+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
135135
* exceptions related to the primary replica change, etc.
136136
*
137137
* @param clo The closure.
@@ -174,7 +174,7 @@ default void runInTransaction(Consumer<Transaction> clo) throws TransactionExcep
174174
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
175175
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
176176
* <br>
177-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
177+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
178178
* exceptions related to the primary replica change, etc.
179179
*
180180
* @param options Transaction options.
@@ -223,7 +223,7 @@ default void runInTransaction(Consumer<Transaction> clo, @Nullable TransactionOp
223223
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
224224
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
225225
* <br>
226-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
226+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
227227
* exceptions related to the primary replica change, etc.
228228
*
229229
* @param clo Closure.
@@ -268,7 +268,7 @@ default <T> T runInTransaction(Function<Transaction, T> clo) throws TransactionE
268268
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
269269
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
270270
* <br>
271-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
271+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
272272
* exceptions related to the primary replica change, etc.
273273
*
274274
* @param clo The closure.
@@ -304,7 +304,7 @@ default <T> T runInTransaction(Function<Transaction, T> clo, @Nullable Transacti
304304
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
305305
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
306306
* <br>
307-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
307+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
308308
* exceptions related to the primary replica change, etc.
309309
*
310310
* @param clo The closure.
@@ -333,7 +333,7 @@ default <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, Com
333333
* closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout
334334
* expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead.
335335
* <br>
336-
* The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException},
336+
* The closure is retried only in cases of "expected" exceptions, like {@code LockException},
337337
* exceptions related to the primary replica change, etc.
338338
*
339339
*

modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java

Lines changed: 8 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Set;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.TimeoutException;
3433
import java.util.function.Function;
3534
import org.jetbrains.annotations.Nullable;
3635

@@ -61,11 +60,14 @@ static <T> T runInTransactionInternal(
6160
T ret;
6261

6362
while (true) {
63+
// TODO IGNITE-28448 Use tx restart counter to avoid starvation.
6464
tx = igniteTransactions.begin(txOptions);
6565

6666
try {
6767
ret = clo.apply(tx);
6868

69+
tx.commit(); // Commit is retriable.
70+
6971
break;
7072
} catch (Exception ex) {
7173
addSuppressedToList(suppressed, ex);
@@ -98,19 +100,6 @@ static <T> T runInTransactionInternal(
98100
}
99101
}
100102

101-
try {
102-
tx.commit();
103-
} catch (Exception e) {
104-
try {
105-
// Try to rollback tx in case if it's not finished. Retry is not needed here due to the durable finish.
106-
tx.rollback();
107-
} catch (Exception re) {
108-
e.addSuppressed(re);
109-
}
110-
111-
throw e;
112-
}
113-
114103
return ret;
115104
}
116105

@@ -158,6 +147,7 @@ static <T> CompletableFuture<T> runInTransactionAsyncInternal(
158147
.thenCompose(tx -> {
159148
try {
160149
return clo.apply(tx)
150+
.thenCompose(res -> tx.commitAsync().thenApply(ignored -> res))
161151
.handle((res, e) -> {
162152
if (e != null) {
163153
return handleClosureException(
@@ -173,30 +163,11 @@ static <T> CompletableFuture<T> runInTransactionAsyncInternal(
173163
} else {
174164
return completedFuture(res);
175165
}
176-
})
177-
.thenCompose(identity())
178-
.thenApply(res -> new TxWithVal<>(tx, res));
166+
}).thenCompose(identity());
179167
} catch (Exception e) {
180-
return handleClosureException(igniteTransactions, tx, clo, txOptions, startTimestamp, initialTimeout, sup, e)
181-
.thenApply(res -> new TxWithVal<>(tx, res));
168+
return handleClosureException(igniteTransactions, tx, clo, txOptions, startTimestamp, initialTimeout, sup, e);
182169
}
183-
})
184-
// Transaction commit with rollback on failure, without retries.
185-
// Transaction rollback on closure failure is implemented in closure retry logic.
186-
.thenCompose(txWithVal ->
187-
txWithVal.tx.commitAsync()
188-
.handle((ignored, e) -> {
189-
if (e == null) {
190-
return completedFuture(null);
191-
} else {
192-
return txWithVal.tx.rollbackAsync()
193-
// Rethrow commit exception.
194-
.handle((ign, re) -> sneakyThrow(e));
195-
}
196-
})
197-
.thenCompose(fut -> fut)
198-
.thenApply(ignored -> txWithVal.val)
199-
);
170+
});
200171
}
201172

202173
private static <T> CompletableFuture<T> handleClosureException(
@@ -311,10 +282,7 @@ private static CompletableFuture<Void> throwExceptionWithSuppressedAsync(Throwab
311282
}
312283

313284
private static boolean isRetriable(Throwable e) {
314-
return hasCause(e,
315-
TimeoutException.class,
316-
RetriableTransactionException.class
317-
);
285+
return hasCause(e, RetriableTransactionException.class);
318286
}
319287

320288
private static boolean hasCause(Throwable e, Class<?>... classes) {
@@ -347,14 +315,4 @@ private static long calcRemainingTime(long initialTimeout, long startTimestamp)
347315
private static <E extends Throwable> E sneakyThrow(Throwable e) throws E {
348316
throw (E) e;
349317
}
350-
351-
private static class TxWithVal<T> {
352-
private final Transaction tx;
353-
private final T val;
354-
355-
private TxWithVal(Transaction tx, T val) {
356-
this.tx = tx;
357-
this.val = val;
358-
}
359-
}
360318
}

modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ public void testRetries(
8888
}
8989

9090
boolean requiresEventualSuccess = closureFailureCount < Integer.MAX_VALUE
91-
// Commit failure can't be retried.
92-
&& commitFailureCount == 0
91+
&& commitFailureCount < Integer.MAX_VALUE
92+
&& (commitFailureCount == 0 || rollbackFailureCount < Integer.MAX_VALUE)
9393
// Rollbacks should be retried until success or timeout, so the rollback must succeed before closure retry.
9494
&& (closureFailureCount == 0 || rollbackFailureCount < Integer.MAX_VALUE);
9595

modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,19 @@ void testAccessLockedKeyTimesOut() throws Exception {
258258
// Lock the key in tx2.
259259
Transaction tx2 = client().transactions().begin();
260260

261+
IgniteImpl server0 = unwrapIgniteImpl(server(0));
262+
boolean invertedWaitOrder = server0.txManager().lockManager().policy().invertedWaitOrder();
263+
264+
Transaction owner = invertedWaitOrder ? tx2 : tx1;
265+
Transaction waiter = invertedWaitOrder ? tx1 : tx2;
266+
261267
try {
262-
kvView.put(tx2, -100, "1");
268+
kvView.put(owner, -100, "1");
263269

264270
// Get the key in tx1 - time out.
265-
assertThrows(TimeoutException.class, () -> kvView.getAsync(tx1, -100).get(1, TimeUnit.SECONDS));
271+
assertThrows(TimeoutException.class, () -> kvView.getAsync(waiter, -100).get(1, TimeUnit.SECONDS));
266272
} finally {
267-
tx2.rollback();
273+
owner.rollback();
268274
}
269275
}
270276

@@ -1374,25 +1380,29 @@ public void testRollbackDoesNotBlockOnLockConflict(KillTestContext ctx) {
13741380

13751381
assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
13761382

1377-
// Older is allowed to wait with wait-die.
1378-
CompletableFuture<?> fut = ctx.put.apply(client(), olderTxProxy, key2);
1379-
assertFalse(fut.isDone());
1380-
13811383
IgniteImpl ignite = unwrapIgniteImpl(server);
1384+
boolean invertedWaitOrder = ignite.txManager().lockManager().policy().invertedWaitOrder();
1385+
1386+
ClientLazyTransaction owner = invertedWaitOrder ? youngerTxProxy : olderTxProxy;
1387+
ClientLazyTransaction waiter = invertedWaitOrder ? olderTxProxy : youngerTxProxy;
1388+
1389+
CompletableFuture<?> fut =
1390+
invertedWaitOrder ? ctx.put.apply(client(), olderTxProxy, key2) : ctx.put.apply(client(), youngerTxProxy, key);
1391+
assertFalse(fut.isDone());
13821392

13831393
await().atMost(2, TimeUnit.SECONDS).until(() -> {
13841394
Iterator<Lock> locks = ignite.txManager().lockManager().locks(olderTx.txId());
13851395

13861396
return CollectionUtils.count(locks) == 2;
13871397
});
13881398

1389-
assertThat(olderTxProxy.rollbackAsync(), willSucceedFast());
1399+
assertThat(waiter.rollbackAsync(), willSucceedFast());
13901400

13911401
// Operation future should be failed.
13921402
assertThat(fut, willThrowWithCauseOrSuppressed(ctx.expectedErr));
13931403

13941404
// Ensure inflights cleanup.
1395-
assertThat(youngerTxProxy.rollbackAsync(), willSucceedFast());
1405+
assertThat(owner.rollbackAsync(), willSucceedFast());
13961406

13971407
assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2)), willSucceedFast());
13981408
}
@@ -1480,10 +1490,18 @@ public void testRollbackDoesNotBlockOnLockConflictWithDirectMapping(KillTestCont
14801490

14811491
// Should be directly mapped
14821492
assertThat(ctx.put.apply(client(), youngerTxProxy, key3), willSucceedFast());
1493+
assertThat(ctx.put.apply(client(), olderTxProxy, key4), willSucceedFast());
1494+
1495+
IgniteImpl server0 = unwrapIgniteImpl(server(0));
1496+
boolean invertedWaitOrder = server0.txManager().lockManager().policy().invertedWaitOrder();
14831497

1484-
// Younger is not allowed to wait with wait-die.
1485-
// Next operation should invalidate the transaction.
1486-
assertThat(ctx.put.apply(client(), youngerTxProxy, key), willThrowWithCauseOrSuppressed(ctx.expectedErr));
1498+
// Force wrong order.
1499+
if (invertedWaitOrder) {
1500+
assertThat(ctx.put.apply(client(), youngerTxProxy, key), willThrowWithCauseOrSuppressed(ctx.expectedErr));
1501+
} else {
1502+
assertThat(ctx.put.apply(client(), olderTxProxy, key2), willSucceedFast()); // Will invalidate younger tx.
1503+
assertThat(youngerTxProxy.commitAsync(), willThrowWithCauseOrSuppressed(TransactionException.class));
1504+
}
14871505

14881506
olderTxProxy.commit();
14891507

@@ -1493,7 +1511,7 @@ public void testRollbackDoesNotBlockOnLockConflictWithDirectMapping(KillTestCont
14931511

14941512
@ParameterizedTest
14951513
@MethodSource("killTestContextFactory")
1496-
public void testRollbackOnLocalError(KillTestContext ctx) throws Exception {
1514+
public void testRollbackOnLocalError(KillTestContext ctx) {
14971515
ClientTable table = (ClientTable) table();
14981516
ClientSql sql = (ClientSql) client().sql();
14991517
KeyValueView<Tuple, Tuple> kvView = table().keyValueView();

modules/client/src/integrationTest/java/org/apache/ignite/internal/streamer/ItClientDataStreamerLoadTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@
2020
import static org.junit.jupiter.api.Assertions.assertEquals;
2121
import static org.junit.jupiter.api.Assertions.assertNotNull;
2222

23+
import java.util.ArrayList;
24+
import java.util.List;
2325
import java.util.Random;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.SubmissionPublisher;
2628
import java.util.concurrent.TimeUnit;
2729
import org.apache.ignite.client.IgniteClient;
2830
import org.apache.ignite.client.RetryLimitPolicy;
2931
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
32+
import org.apache.ignite.internal.TestWrappers;
33+
import org.apache.ignite.internal.app.IgniteImpl;
34+
import org.apache.ignite.internal.logger.IgniteLogger;
35+
import org.apache.ignite.internal.logger.Loggers;
3036
import org.apache.ignite.internal.util.IgniteUtils;
3137
import org.apache.ignite.table.DataStreamerItem;
3238
import org.apache.ignite.table.DataStreamerOptions;
@@ -42,6 +48,8 @@
4248
* Data streamer load test.
4349
*/
4450
public final class ItClientDataStreamerLoadTest extends ClusterPerClassIntegrationTest {
51+
private static final IgniteLogger LOG = Loggers.forClass(ItClientDataStreamerLoadTest.class);
52+
4553
private static final String TABLE_NAME = "test_table";
4654

4755
private static final int CLIENT_COUNT = 2;
@@ -90,6 +98,9 @@ public void clearTable() {
9098
@Test
9199
@Timeout(value = 20, unit = TimeUnit.MINUTES)
92100
public void testHighLoad() throws InterruptedException {
101+
IgniteImpl ignite = TestWrappers.unwrapIgniteImpl(node(0));
102+
boolean invertedWaitOrder = ignite.txManager().lockManager().policy().invertedWaitOrder();
103+
93104
Thread[] threads = new Thread[CLIENT_COUNT];
94105

95106
for (int i = 0; i < clients.length; i++) {
@@ -106,8 +117,27 @@ public void testHighLoad() throws InterruptedException {
106117

107118
RecordView<Tuple> view = clients[0].tables().table(TABLE_NAME).recordView();
108119

120+
List<Tuple> keys = new ArrayList<>(ROW_COUNT);
121+
109122
for (int i = 0; i < ROW_COUNT; i++) {
110-
Tuple res = view.get(null, tupleKey(i));
123+
Tuple key = tupleKey(i);
124+
keys.add(key);
125+
}
126+
127+
List<Tuple> values = view.getAll(null, keys);
128+
assertEquals(ROW_COUNT, values.size());
129+
130+
for (int i = 0; i < ROW_COUNT; i++) {
131+
Tuple res = values.get(i);
132+
133+
// TODO https://issues.apache.org/jira/browse/IGNITE-28365
134+
// A row might be missing in the following scenario (assuming 2 concurrent streamers):
135+
// batch 1 is concurrently mapped to partition K, streamer 0 wins the conflict
136+
// batch 2 is concurrently mapped to partition N, streamer 1 wins the conflict
137+
// Both streamers become invalidated without proper implicit retries and stop.
138+
if (res == null && !invertedWaitOrder) {
139+
continue;
140+
}
111141

112142
assertNotNull(res, "Row not found: " + i);
113143
assertEquals("foo_" + i, res.value("name"));
@@ -130,13 +160,20 @@ private static void streamData(IgniteClient client) {
130160

131161
// Insert same data over and over again.
132162
for (int j = 0; j < LOOP_COUNT; j++) {
163+
LOG.info("Loop " + j);
133164
for (int i = 0; i < ROW_COUNT; i++) {
134165
publisher.submit(DataStreamerItem.of(tuple(i, "foo_" + i)));
135166
}
136167
}
137168
}
138169

139-
streamerFut.orTimeout(10, TimeUnit.SECONDS).join();
170+
try {
171+
streamerFut.orTimeout(10, TimeUnit.SECONDS).join();
172+
LOG.info("Done streaming");
173+
} catch (Exception e) {
174+
// TODO IGNITE-28365 Don't expecting errors here with proper retries
175+
LOG.warn("Done streaming with error", e);
176+
}
140177
}
141178

142179
private static Tuple tuple(int id, String name) {

modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@
2020
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
2121

2222
import java.util.UUID;
23-
import org.apache.ignite.tx.RetriableReplicaRequestException;
24-
import org.apache.ignite.tx.RetriableTransactionException;
2523
import org.jetbrains.annotations.Nullable;
2624

2725
/**
2826
* This exception is used to indicate that Ignite node is stopping (already stopped) for some reason.
2927
*/
30-
public class NodeStoppingException extends IgniteInternalCheckedException implements RetriableTransactionException,
31-
RetriableReplicaRequestException {
28+
public class NodeStoppingException extends IgniteInternalCheckedException {
3229
/** Serial version UID. */
3330
private static final long serialVersionUID = 0L;
3431

0 commit comments

Comments
 (0)