Skip to content

Commit d377032

Browse files
authored
IGNITE-28675 Fix flaky TxDeadlockCauseTest #testCause, #testCauseSeveralNodes (#13136)
1 parent affc5ae commit d377032

3 files changed

Lines changed: 46 additions & 56 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.ignite.internal.util.future.GridFutureAdapter;
4040
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
4141
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
42+
import org.apache.ignite.internal.util.typedef.F;
4243
import org.apache.ignite.internal.util.typedef.T2;
4344
import org.apache.ignite.internal.util.typedef.internal.S;
4445
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,7 +56,7 @@ public class TxDeadlockDetection {
5556
public static final int DFLT_TX_DEADLOCK_DETECTION_TIMEOUT = 60000;
5657

5758
/** Deadlock detection maximum iterations. */
58-
private static int deadLockTimeout =
59+
private static final int DEAD_LOCK_TIMEOUT =
5960
getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, DFLT_TX_DEADLOCK_DETECTION_TIMEOUT);
6061

6162
/** Sequence. */
@@ -80,7 +81,7 @@ public TxDeadlockDetection(GridCacheSharedContext<?, ?> cctx) {
8081
*
8182
* @param tx Target tx.
8283
* @param keys Keys.
83-
* @return {@link TxDeadlock} if found, otherwise - {@code null}.
84+
* @return {@link TxDeadlockFuture} future.
8485
*/
8586
TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set<IgniteTxKey> keys) {
8687
GridCacheVersion txId = tx.nearXidVersion();
@@ -101,7 +102,7 @@ TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set<IgniteTxKey> keys) {
101102
* @param wfg Wait-for-graph.
102103
* @param txId Tx ID - start vertex for cycle search in graph.
103104
*/
104-
static List<GridCacheVersion> findCycle(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion txId) {
105+
static @Nullable List<GridCacheVersion> findCycle(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion txId) {
105106
if (wfg == null || wfg.isEmpty())
106107
return null;
107108

@@ -181,7 +182,7 @@ static class TxDeadlockFuture extends GridFutureAdapter<TxDeadlock> {
181182

182183
/** Pending keys. */
183184
@GridToStringInclude
184-
private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>();
185+
private final Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>();
185186

186187
/** Nodes queue. */
187188
@GridToStringInclude
@@ -233,7 +234,7 @@ private TxDeadlockFuture(GridCacheSharedContext cctx,
233234
this.topVer = topVer;
234235
this.keys = keys;
235236

236-
if (deadLockTimeout > 0) {
237+
if (DEAD_LOCK_TIMEOUT > 0) {
237238
timeoutObj = new DeadlockTimeoutObject();
238239

239240
cctx.time().addTimeoutObject(timeoutObj);
@@ -322,7 +323,7 @@ private void detect(TxLocksResponse res) {
322323
* Maps tx keys on nodes. Key can be mapped on some node if this node is primary for given key or
323324
* node is near for transaction that holds or requests lock for key.
324325
*
325-
* Key will not be be mapped to node if both key and node are already handled.
326+
* Key will not be mapped to node if both key and node are already handled.
326327
*
327328
* @param txKeys Tx keys.
328329
* @param txLocks Tx locks.
@@ -348,10 +349,7 @@ private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys, Map<IgniteTxKey, List<
348349
// Process this node earlier than other in order to optimize amount of requests.
349350
preferredNodes.add(nodeId);
350351

351-
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId);
352-
353-
if (mappedKeys == null)
354-
pendingKeys.put(nodeId, mappedKeys = new HashSet<>());
352+
Set<IgniteTxKey> mappedKeys = pendingKeys.computeIfAbsent(nodeId, k -> new HashSet<>());
355353

356354
mappedKeys.add(txKey);
357355
}
@@ -363,10 +361,7 @@ private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys, Map<IgniteTxKey, List<
363361
else
364362
nodesQueue.addLast(nearNodeId);
365363

366-
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nearNodeId);
367-
368-
if (mappedKeys == null)
369-
pendingKeys.put(nearNodeId, mappedKeys = new HashSet<>());
364+
Set<IgniteTxKey> mappedKeys = pendingKeys.computeIfAbsent(nearNodeId, k -> new HashSet<>());
370365

371366
mappedKeys.add(txKey);
372367
}
@@ -387,10 +382,7 @@ private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys, Map<IgniteTxKey, List<
387382

388383
nodesQueue.addLast(nodeId);
389384

390-
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId);
391-
392-
if (mappedKeys == null)
393-
pendingKeys.put(nodeId, mappedKeys = new HashSet<>());
385+
Set<IgniteTxKey> mappedKeys = pendingKeys.computeIfAbsent(nodeId, k -> new HashSet<>());
394386

395387
mappedKeys.add(txKey);
396388
}
@@ -417,15 +409,15 @@ private UUID primary(IgniteTxKey txKey) {
417409
private void merge(TxLocksResponse res) {
418410
Map<IgniteTxKey, List<TxLock>> txLocks = res.txLocks();
419411

420-
if (txLocks == null || txLocks.isEmpty())
412+
if (F.isEmpty(txLocks))
421413
return;
422414

423415
for (Map.Entry<IgniteTxKey, List<TxLock>> e : txLocks.entrySet()) {
424416
IgniteTxKey txKey = e.getKey();
425417

426418
List<TxLock> lockList = e.getValue();
427419

428-
if (lockList != null && !lockList.isEmpty()) {
420+
if (!F.isEmpty(lockList)) {
429421
for (TxLock lock : lockList) {
430422
if (lock.owner() || lock.candiate()) {
431423
if (txs.get(lock.txId()) == null)
@@ -435,18 +427,12 @@ private void merge(TxLocksResponse res) {
435427
if (lock.owner()) {
436428
GridCacheVersion txId = lock.txId();
437429

438-
Set<IgniteTxKey> keys = txLockedKeys.get(txId);
439-
440-
if (keys == null)
441-
txLockedKeys.put(txId, keys = new HashSet<>());
430+
Set<IgniteTxKey> keys = txLockedKeys.computeIfAbsent(txId, k -> new HashSet<>());
442431

443432
keys.add(txKey);
444433
}
445434
else if (lock.candiate()) {
446-
Set<GridCacheVersion> txs = txRequestedKeys.get(txKey);
447-
448-
if (txs == null)
449-
txRequestedKeys.put(txKey, txs = new HashSet<>());
435+
Set<GridCacheVersion> txs = txRequestedKeys.computeIfAbsent(txKey, k -> new HashSet<>());
450436

451437
txs.add(lock.txId());
452438
}
@@ -473,10 +459,7 @@ private void updateWaitForGraph(Map<IgniteTxKey, List<TxLock>> txLocks) {
473459
txOwner = lock.txId();
474460

475461
if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) {
476-
Set<GridCacheVersion> waitingTxs = wfg.get(txId);
477-
478-
if (waitingTxs == null)
479-
wfg.put(txId, waitingTxs = new HashSet<>());
462+
Set<GridCacheVersion> waitingTxs = wfg.computeIfAbsent(txId, k -> new HashSet<>());
480463

481464
waitingTxs.add(lock.txId());
482465
}
@@ -487,10 +470,7 @@ private void updateWaitForGraph(Map<IgniteTxKey, List<TxLock>> txLocks) {
487470
if (lock.candiate() || lock.owner()) {
488471
GridCacheVersion txId0 = lock.txId();
489472

490-
Set<GridCacheVersion> waitForTxs = wfg.get(txId0);
491-
492-
if (waitForTxs == null)
493-
wfg.put(txId0, waitForTxs = new HashSet<>());
473+
Set<GridCacheVersion> waitForTxs = wfg.computeIfAbsent(txId0, k -> new HashSet<>());
494474

495475
waitForTxs.add(txOwner);
496476
}
@@ -562,7 +542,7 @@ private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter {
562542
* Default constructor.
563543
*/
564544
DeadlockTimeoutObject() {
565-
super(deadLockTimeout);
545+
super(DEAD_LOCK_TIMEOUT);
566546
}
567547

568548
/** {@inheritDoc} */
@@ -571,7 +551,7 @@ private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter {
571551

572552
IgniteLogger log = cctx.kernalContext().log(this.getClass());
573553

574-
U.warn(log, "Deadlock detection was timed out [timeout=" + deadLockTimeout + ", fut=" + this + ']');
554+
U.warn(log, "Deadlock detection was timed out [timeout=" + DEAD_LOCK_TIMEOUT + ", fut=" + this + ']');
575555

576556
onDone();
577557
}

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockCauseTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import org.apache.ignite.Ignite;
3030
import org.apache.ignite.IgniteCache;
31-
import org.apache.ignite.IgniteCheckedException;
3231
import org.apache.ignite.cache.CacheAtomicityMode;
3332
import org.apache.ignite.configuration.CacheConfiguration;
3433
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -165,6 +164,9 @@ private void checkCauseObject(
165164
final TransactionIsolation isolation,
166165
final boolean oneOp
167166
) throws Exception {
167+
if (nodes > 1)
168+
awaitPartitionMapExchange();
169+
168170
final Ignite ignite = grid(new Random().nextInt(nodes));
169171

170172
final IgniteCache<Integer, Account> cache = ignite.cache(DEFAULT_CACHE_NAME);
@@ -183,7 +185,7 @@ private void checkCauseObject(
183185
final CyclicBarrier barrier = new CyclicBarrier(2);
184186

185187
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new CAX() {
186-
@Override public void applyx() throws IgniteCheckedException {
188+
@Override public void applyx() {
187189
try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, isolation,
188190
timeout, keys.size())) {
189191

@@ -204,7 +206,9 @@ private void checkCauseObject(
204206
tx.commit();
205207
}
206208
catch (Exception e) {
207-
ex.compareAndSet(null, e);
209+
// TransactionDeadlockException raised at least for one transaction involved in the deadlock
210+
if (X.hasCause(e, TransactionDeadlockException.class))
211+
ex.compareAndSet(null, e);
208212
}
209213
}
210214
}, 2, "tx");
@@ -268,7 +272,7 @@ static class Account implements Serializable {
268272
/**
269273
* Change balance by specified amount.
270274
*
271-
* @param amount Amount to add to balance (may be negative).
275+
* @param amount Amount to add to balance (maybe negative).
272276
*/
273277
void update(double amount) {
274278
balance += amount;

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.ThreadLocalRandom;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.Consumer;
2425
import org.apache.ignite.Ignite;
2526
import org.apache.ignite.IgniteCache;
2627
import org.apache.ignite.IgniteCheckedException;
@@ -32,14 +33,13 @@
3233
import org.apache.ignite.internal.IgniteKernal;
3334
import org.apache.ignite.internal.util.typedef.internal.U;
3435
import org.apache.ignite.testframework.GridTestUtils;
36+
import org.apache.ignite.testframework.ListeningTestLogger;
3537
import org.apache.ignite.testframework.junits.WithSystemProperty;
3638
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
3739
import org.apache.ignite.transactions.Transaction;
3840
import org.apache.ignite.transactions.TransactionConcurrency;
3941
import org.junit.Test;
4042

41-
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
42-
import static org.apache.ignite.IgniteSystemProperties.getInteger;
4343
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
4444
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
4545
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -54,6 +54,18 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
5454
/** Cache. */
5555
private static final String CACHE = "cache";
5656

57+
/** Log listener. */
58+
private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
59+
60+
/** Deadlock timeout, it`s unexpected during these tests. */
61+
private static final AtomicBoolean DEAD_LOCK_FLAG = new AtomicBoolean();
62+
63+
/** */
64+
private static final Consumer<String> DEAD_LOCK_LSNR = s -> {
65+
if (s.contains("Deadlock detection was timed out"))
66+
DEAD_LOCK_FLAG.set(true);
67+
};
68+
5769
/** {@inheritDoc} */
5870
@SuppressWarnings("unchecked")
5971
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -68,6 +80,11 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
6880

6981
cfg.setCacheConfiguration(ccfg);
7082

83+
assertFalse(DEAD_LOCK_FLAG.get());
84+
85+
listeningLog.registerListener(DEAD_LOCK_LSNR);
86+
cfg.setGridLogger(listeningLog);
87+
7188
return cfg;
7289
}
7390

@@ -83,19 +100,8 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
83100
super.afterTest();
84101

85102
stopAllGrids();
86-
}
87103

88-
/** {@inheritDoc} */
89-
@Override protected void beforeTestsStarted() throws Exception {
90-
super.beforeTestsStarted();
91-
92-
GridTestUtils.setFieldValue(TxDeadlockDetection.class, "deadLockTimeout", (int)(getTestTimeout() * 2));
93-
}
94-
95-
/** {@inheritDoc} */
96-
@Override protected void afterTestsStopped() throws Exception {
97-
GridTestUtils.setFieldValue(TxDeadlockDetection.class, "deadLockTimeout",
98-
getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000));
104+
assertFalse(DEAD_LOCK_FLAG.get());
99105
}
100106

101107
/** {@inheritDoc} */

0 commit comments

Comments
 (0)