Skip to content

Commit b73d260

Browse files
IGNITE-25641 Fix discrepancy with CacheStore in case of an error during tx commit phase (#12178)
1 parent 5de8f72 commit b73d260

3 files changed

Lines changed: 322 additions & 1 deletion

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,12 @@ protected void uncommit() {
512512
try {
513513
GridCacheEntryEx entry = e.cached();
514514

515-
if (e.op() != NOOP)
515+
if (e.op() != NOOP) {
516516
entry.invalidate(xidVer);
517+
518+
if (e.context().readThrough())
519+
entry.clear(xidVer, true);
520+
}
517521
}
518522
catch (Throwable t) {
519523
U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.cache.store;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Set;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.IntFunction;
25+
import javax.cache.Cache;
26+
import org.apache.ignite.Ignite;
27+
import org.apache.ignite.IgniteCache;
28+
import org.apache.ignite.IgniteException;
29+
import org.apache.ignite.cache.CacheInterceptor;
30+
import org.apache.ignite.cache.CacheInterceptorAdapter;
31+
import org.apache.ignite.configuration.CacheConfiguration;
32+
import org.apache.ignite.failure.FailureHandler;
33+
import org.apache.ignite.failure.StopNodeFailureHandler;
34+
import org.apache.ignite.internal.IgniteEx;
35+
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
36+
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
37+
import org.apache.ignite.internal.util.typedef.G;
38+
import org.apache.ignite.transactions.Transaction;
39+
import org.jetbrains.annotations.Nullable;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.junit.runners.Parameterized;
43+
44+
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
45+
46+
/**
47+
* Tests to check scenarios with system failures during transaction commit. Internal system failures are simulated by
48+
* {@link CacheInterceptor} custom implementation throwing an exception during final commit phase.
49+
*/
50+
@RunWith(Parameterized.class)
51+
public class CacheStoreWithIgniteTxFailureTest extends GridCacheAbstractSelfTest {
52+
/** */
53+
private static final int GRID_COUNT = 3;
54+
55+
/** */
56+
private static final int KEYS_NUMBER = 50;
57+
58+
/** */
59+
private static final int FAULTY_NODE_IDX = 1;
60+
61+
/** */
62+
private static final IntFunction<Integer> KEY_UPDATE_FUNCTION = key -> key + KEYS_NUMBER * 3;
63+
64+
/**
65+
* Type of node for keys involved into transaction: primary or backup.
66+
*/
67+
private enum FaultyNodeType {
68+
/** */
69+
PRIMARY,
70+
/** */
71+
BACKUP
72+
}
73+
74+
/**
75+
* Role of faulty node in transaction management: tx coordinator or regular node.
76+
*/
77+
private enum FaultyNodeRole {
78+
/** */
79+
REGULAR,
80+
/** */
81+
TX_COORDINATOR
82+
}
83+
84+
/** */
85+
@Parameterized.Parameter
86+
public FaultyNodeType faultyNodeType;
87+
88+
/** */
89+
@Parameterized.Parameter(1)
90+
public FaultyNodeRole faultyNodeRole;
91+
92+
/** */
93+
@Parameterized.Parameter(2)
94+
public boolean withFaulireHnd;
95+
96+
/** */
97+
@Parameterized.Parameters(name = "faultyNodeType={0}, faultyNodeRole={1}, withFaulireHandler={2}")
98+
public static List<Object[]> parameters() {
99+
List<Object[]> params = new ArrayList<>();
100+
101+
params.add(new Object[] {FaultyNodeType.PRIMARY, FaultyNodeRole.REGULAR, true});
102+
params.add(new Object[] {FaultyNodeType.PRIMARY, FaultyNodeRole.REGULAR, false});
103+
params.add(new Object[] {FaultyNodeType.BACKUP, FaultyNodeRole.REGULAR, true});
104+
params.add(new Object[] {FaultyNodeType.BACKUP, FaultyNodeRole.REGULAR, false});
105+
106+
params.add(new Object[] {FaultyNodeType.PRIMARY, FaultyNodeRole.TX_COORDINATOR, false});
107+
params.add(new Object[] {FaultyNodeType.BACKUP, FaultyNodeRole.TX_COORDINATOR, false});
108+
params.add(new Object[] {FaultyNodeType.PRIMARY, FaultyNodeRole.TX_COORDINATOR, true});
109+
params.add(new Object[] {FaultyNodeType.BACKUP, FaultyNodeRole.TX_COORDINATOR, true});
110+
111+
return params;
112+
}
113+
114+
/** {@inheritDoc} */
115+
@Override protected void beforeTestsStarted() throws Exception {
116+
initStoreStrategy();
117+
}
118+
119+
/** {@inheritDoc} */
120+
@Override protected void beforeTest() throws Exception {
121+
// No-op.
122+
}
123+
124+
/** {@inheritDoc} */
125+
@Override protected void afterTest() throws Exception {
126+
stopAllGrids();
127+
128+
storeStgy.resetStore();
129+
}
130+
131+
/** {@inheritDoc} */
132+
@Override protected int gridCount() {
133+
return GRID_COUNT;
134+
}
135+
136+
/** {@inheritDoc} */
137+
@Override protected int backups() {
138+
return 2;
139+
}
140+
141+
/** {@inheritDoc} */
142+
@Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
143+
return withFaulireHnd ? new StopNodeFailureHandler() : super.getFailureHandler(igniteInstanceName);
144+
}
145+
146+
/** {@inheritDoc} */
147+
@Override protected CacheConfiguration<?, ?> cacheConfiguration(String igniteInstanceName) throws Exception {
148+
CacheConfiguration<Integer, Integer> ccfg = (CacheConfiguration<Integer, Integer>)super.cacheConfiguration(igniteInstanceName);
149+
150+
ccfg.setInterceptor(
151+
new FaultyNodeInterceptor(
152+
igniteInstanceName,
153+
getTestIgniteInstanceIndex(igniteInstanceName) == FAULTY_NODE_IDX,
154+
faultyNodeRole
155+
)
156+
);
157+
158+
return ccfg;
159+
}
160+
161+
/**
162+
*
163+
*/
164+
@Test
165+
public void testSystemExceptionAfterCacheStoreCommit() throws Exception {
166+
IgniteEx ig = startGrids(gridCount());
167+
IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
168+
169+
fillCache(cache, KEYS_NUMBER);
170+
171+
int keysType = faultyNodeType == FaultyNodeType.PRIMARY ? 0 : 1;
172+
173+
List<Integer> keysOnFaultyNode = findKeys(grid(FAULTY_NODE_IDX).localNode(), cache, 5, 0, keysType);
174+
175+
IgniteEx txCoordinator =
176+
faultyNodeRole == FaultyNodeRole.TX_COORDINATOR
177+
? grid(FAULTY_NODE_IDX)
178+
: startClientGrid(GRID_COUNT + 1);
179+
180+
if (faultyNodeType == FaultyNodeType.PRIMARY)
181+
updateKeysInTxWithExceptionCatching(txCoordinator, keysOnFaultyNode);
182+
else
183+
updateKeysInTx(txCoordinator, keysOnFaultyNode);
184+
185+
if (withFaulireHnd) {
186+
// FH doesn't fail TX coordinator node now, this behavior is wrong and should be fixed here:
187+
// TODO https://issues.apache.org/jira/browse/IGNITE-26060
188+
if (faultyNodeRole != FaultyNodeRole.TX_COORDINATOR) {
189+
waitForTopology(3);
190+
191+
assertTrue("Client node should survive test scenario",
192+
G.allGrids()
193+
.stream()
194+
.filter(ignite -> ((IgniteEx)ignite).context().clientNode())
195+
.count() == 1);
196+
}
197+
}
198+
else
199+
checkKeysOnFaultyNode(keysOnFaultyNode);
200+
201+
checkKeysOnHealthyNodes(keysOnFaultyNode);
202+
}
203+
204+
/** */
205+
private void fillCache(IgniteCache<Integer, Integer> cache, int numOfKeys) {
206+
for (int i = 0; i < numOfKeys; i++)
207+
cache.put(i, i);
208+
}
209+
210+
/** */
211+
private void checkKeysOnFaultyNode(List<Integer> keysToCheck) {
212+
IgniteCache<Object, Object> cache = grid(FAULTY_NODE_IDX).cache(DEFAULT_CACHE_NAME);
213+
214+
for (Integer key : keysToCheck)
215+
assertEquals(storeStgy.getFromStore(key), cache.get(key));
216+
}
217+
218+
/** */
219+
private void checkKeysOnHealthyNodes(List<Integer> keysToCheck) throws IgniteInterruptedCheckedException {
220+
for (int i = 0; i < gridCount(); i++) {
221+
if (i != FAULTY_NODE_IDX) {
222+
IgniteEx ig = grid(i);
223+
224+
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
225+
226+
for (Integer key : keysToCheck) {
227+
boolean checkResult = waitForCondition(
228+
() -> {
229+
if (storeStgy.getFromStore(key) == null || cache.get(key) == null)
230+
return false;
231+
232+
return storeStgy.getFromStore(key).equals(cache.get(key));
233+
},
234+
1_000);
235+
236+
assertTrue(
237+
String.format("Key inconsistent with CacheStore found on node %d; nodeName: %s. " +
238+
"Key in store: %s, key in cache: %s.",
239+
i,
240+
ig.name(),
241+
storeStgy.getFromStore(key),
242+
cache.get(key)),
243+
checkResult);
244+
}
245+
}
246+
}
247+
}
248+
249+
/** */
250+
private void updateKeysInTx(Ignite ig, List<Integer> keys) {
251+
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
252+
253+
try (Transaction tx = ig.transactions().txStart()) {
254+
for (Integer key : keys)
255+
cache.put(key, KEY_UPDATE_FUNCTION.apply(key));
256+
257+
tx.commit();
258+
}
259+
}
260+
261+
/** */
262+
private void updateKeysInTxWithExceptionCatching(Ignite ig, List<Integer> keys) {
263+
try {
264+
updateKeysInTx(ig, keys);
265+
}
266+
catch (Exception ignored) {
267+
// No-op.
268+
}
269+
}
270+
271+
/** */
272+
private static class FaultyNodeInterceptor extends CacheInterceptorAdapter<Integer, Integer> {
273+
/** */
274+
private final FaultyNodeRole faultyNodeRole;
275+
276+
/** */
277+
private final String instanceName;
278+
279+
/** */
280+
private final boolean faultyNode;
281+
282+
/** */
283+
private final Set<Integer> seenKeys = ConcurrentHashMap.newKeySet();
284+
285+
/**
286+
* @param instanceName Ignite node instance name.
287+
* @param faultyNodeRole Flag if node is tx coordinator.
288+
*/
289+
private FaultyNodeInterceptor(String instanceName, boolean faultyNode, FaultyNodeRole faultyNodeRole) {
290+
this.instanceName = instanceName;
291+
this.faultyNode = faultyNode;
292+
this.faultyNodeRole = faultyNodeRole;
293+
}
294+
295+
/** {@inheritDoc} */
296+
@Override public @Nullable Integer onBeforePut(Cache.Entry<Integer, Integer> entry, Integer newVal) {
297+
// It is an initial cache loading, actual test logic will be executed later.
298+
if (newVal < 2 * KEYS_NUMBER)
299+
return newVal;
300+
301+
if (faultyNode) {
302+
if (faultyNodeRole == FaultyNodeRole.TX_COORDINATOR) {
303+
// On TX coordinator node CacheInterceptor#onBeforePut is called twice for the same key
304+
// at different stages of TX handling path.
305+
if (!seenKeys.add(newVal))
306+
throw new IgniteException("IgniteException from onBeforePut on tx coordinator: " + instanceName);
307+
}
308+
else
309+
throw new IgniteException("IgniteException from onBeforePut on primary or backup: " + instanceName);
310+
}
311+
312+
return newVal;
313+
}
314+
}
315+
}

modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledTransactionalCacheTest;
2525
import org.apache.ignite.cache.store.CacheStoreSessionListenerLifecycleSelfTest;
2626
import org.apache.ignite.cache.store.CacheStoreSessionListenerWriteBehindEnabledTest;
27+
import org.apache.ignite.cache.store.CacheStoreWithIgniteTxFailureTest;
2728
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
2829
import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
2930
import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
@@ -280,6 +281,7 @@ public static List<Class<?>> suite(Collection<Class> ignoredTests) {
280281
GridTestUtils.addTestIfNeeded(suite, CacheStoreListenerRWThroughDisabledAtomicCacheTest.class, ignoredTests);
281282
GridTestUtils.addTestIfNeeded(suite, CacheStoreListenerRWThroughDisabledTransactionalCacheTest.class, ignoredTests);
282283
GridTestUtils.addTestIfNeeded(suite, CacheStoreSessionListenerWriteBehindEnabledTest.class, ignoredTests);
284+
GridTestUtils.addTestIfNeeded(suite, CacheStoreWithIgniteTxFailureTest.class, ignoredTests);
283285

284286
GridTestUtils.addTestIfNeeded(suite, CacheClientStoreSelfTest.class, ignoredTests);
285287
GridTestUtils.addTestIfNeeded(suite, CacheStoreUsageMultinodeStaticStartAtomicTest.class, ignoredTests);

0 commit comments

Comments
 (0)