Skip to content

Commit d40158d

Browse files
authored
IGNITE-28750 Fix AssertionError in a local CQ during TX rollback caused by node failure (#13216)
1 parent f6f439e commit d40158d

5 files changed

Lines changed: 266 additions & 3 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,10 @@ private String taskName() {
767767
@Override public boolean isPrimaryOnly() {
768768
return locOnly && !skipPrimaryCheck;
769769
}
770+
771+
@Override public boolean isLocalOnly() {
772+
return locOnly;
773+
}
770774
};
771775

772776
CacheContinuousQueryManager mgr = manager(ctx);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,7 @@ public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
131131
* @return {@code True} if this listener should be called on events on primary partitions only.
132132
*/
133133
public boolean isPrimaryOnly();
134+
135+
/** @return {@code true} if this listener belongs to a local continuous query. */
136+
public boolean isLocalOnly();
134137
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,15 +248,23 @@ private void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
248248
* @param part Partition number.
249249
* @param cntr Update counter.
250250
* @param topVer Topology version.
251+
* @param primary Primary partition flag.
251252
* @return Context.
252253
*/
253-
@Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx,
254+
@Nullable public CounterSkipContext skipUpdateCounter(
255+
@Nullable CounterSkipContext skipCtx,
254256
int part,
255257
long cntr,
256258
AffinityTopologyVersion topVer,
257-
boolean primary) {
258-
for (CacheContinuousQueryListener lsnr : lsnrs.values())
259+
boolean primary
260+
) {
261+
for (CacheContinuousQueryListener<?, ?> lsnr : lsnrs.values()) {
262+
// Local CQs notify listeners directly and do not use skipped-counter mechanism
263+
if (lsnr.isLocalOnly())
264+
continue;
265+
259266
skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary);
267+
}
260268

261269
return skipCtx;
262270
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.query.continuous;
19+
20+
import java.util.Map;
21+
import java.util.TreeMap;
22+
import java.util.concurrent.ThreadLocalRandom;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Function;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
28+
import javax.cache.Cache;
29+
import javax.cache.event.CacheEntryListenerException;
30+
import javax.cache.event.CacheEntryUpdatedListener;
31+
import org.apache.ignite.IgniteCache;
32+
import org.apache.ignite.cache.CacheAtomicityMode;
33+
import org.apache.ignite.cache.CacheMode;
34+
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
35+
import org.apache.ignite.cache.query.ContinuousQuery;
36+
import org.apache.ignite.cache.query.QueryCursor;
37+
import org.apache.ignite.cluster.ClusterState;
38+
import org.apache.ignite.configuration.CacheConfiguration;
39+
import org.apache.ignite.configuration.DataRegionConfiguration;
40+
import org.apache.ignite.configuration.DataStorageConfiguration;
41+
import org.apache.ignite.configuration.IgniteConfiguration;
42+
import org.apache.ignite.failure.StopNodeFailureHandler;
43+
import org.apache.ignite.internal.IgniteEx;
44+
import org.apache.ignite.internal.IgniteInternalFuture;
45+
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
46+
import org.apache.ignite.testframework.GridTestUtils;
47+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
48+
import org.apache.ignite.transactions.Transaction;
49+
import org.apache.ignite.transactions.TransactionConcurrency;
50+
import org.apache.ignite.transactions.TransactionIsolation;
51+
import org.junit.Test;
52+
import org.junit.runner.RunWith;
53+
import org.junit.runners.Parameterized;
54+
55+
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
56+
import static org.apache.ignite.cache.CacheMode.REPLICATED;
57+
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
58+
59+
/**
60+
* Tests interaction between local continuous queries created with {@link ContinuousQuery#setLocal(boolean)} set to
61+
* {@code true} and transaction rollback counter cleanup.
62+
*/
63+
@RunWith(Parameterized.class)
64+
public class LocalContinuousQueryWithNodeFailureTest extends GridCommonAbstractTest {
65+
/** */
66+
private static final int NODE_CNT = 3;
67+
68+
/** */
69+
private static final int TX_THREADS = 10;
70+
71+
/** */
72+
private static final int KEYS_PER_TX = 10;
73+
74+
/** */
75+
@Parameterized.Parameter
76+
public CacheMode cacheMode;
77+
78+
/** */
79+
@Parameterized.Parameters(name = "cacheMode={0}")
80+
public static Object[] params() {
81+
return new Object[] {REPLICATED, PARTITIONED};
82+
}
83+
84+
/** {@inheritDoc} */
85+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
86+
CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME)
87+
.setCacheMode(cacheMode)
88+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
89+
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
90+
91+
if (cacheMode == PARTITIONED)
92+
cacheCfg.setBackups(NODE_CNT - 1);
93+
94+
return super.getConfiguration(igniteInstanceName)
95+
.setDataStorageConfiguration(new DataStorageConfiguration()
96+
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
97+
.setPersistenceEnabled(true)))
98+
.setFailureHandler(new StopNodeFailureHandler())
99+
.setCacheConfiguration(cacheCfg);
100+
}
101+
102+
/** {@inheritDoc} */
103+
@Override protected void beforeTest() throws Exception {
104+
super.beforeTest();
105+
106+
cleanPersistenceDir();
107+
}
108+
109+
/** {@inheritDoc} */
110+
@Override protected void afterTest() throws Exception {
111+
stopAllGrids();
112+
113+
cleanPersistenceDir();
114+
115+
super.afterTest();
116+
}
117+
118+
/**
119+
* Checks that local and distributed continuous queries behave correctly when transaction rollback closes
120+
* partition update counter gaps after a node failure.
121+
*/
122+
@Test
123+
public void testTransactionalCache() throws Exception {
124+
startGrids(NODE_CNT).cluster().state(ClusterState.ACTIVE);
125+
126+
awaitPartitionMapExchange();
127+
128+
AtomicBoolean stopTxLoad = new AtomicBoolean();
129+
AtomicBoolean nodeFailed = new AtomicBoolean();
130+
131+
AtomicInteger updatesDistrBeforeFail = new AtomicInteger();
132+
AtomicInteger updatesLocBeforeFail = new AtomicInteger();
133+
134+
AtomicInteger updatesDistrAfterFail = new AtomicInteger();
135+
AtomicInteger updatesLocAfterFail = new AtomicInteger();
136+
137+
IgniteCache<Object, Object> cache1 = grid(1).cache(DEFAULT_CACHE_NAME);
138+
IgniteCache<Object, Object> cache0 = grid(0).cache(DEFAULT_CACHE_NAME);
139+
140+
IgniteInternalFuture<?> txLoadFut = launchTxLoad(grid(1), cache1, stopTxLoad);
141+
142+
try (
143+
QueryCursor<Cache.Entry<Object, Object>> locCur = cache0.query(
144+
buildContinuousQuery(true, nodeFailed, updatesLocBeforeFail, updatesLocAfterFail));
145+
146+
QueryCursor<Cache.Entry<Object, Object>> distrCur = cache0.query(
147+
buildContinuousQuery(false, nodeFailed, updatesDistrBeforeFail, updatesDistrAfterFail))
148+
) {
149+
assertTrue(
150+
String.format("Failed to receive expected updates before node failure [locUpdates=%s, distrUpdates=%s]",
151+
updatesLocBeforeFail.get(), updatesDistrBeforeFail.get()),
152+
waitForCondition(() -> updatesLocBeforeFail.get() > 0 && updatesDistrBeforeFail.get() > 0,
153+
getTestTimeout() / 2)
154+
);
155+
156+
failNode(NODE_CNT - 1);
157+
158+
waitForTopology(NODE_CNT - 1);
159+
160+
nodeFailed.set(true);
161+
162+
assertTrue(
163+
String.format("Failed to receive expected updates after node failure [locUpdates=%s, distrUpdates=%s]",
164+
updatesLocAfterFail.get(), updatesDistrAfterFail.get()),
165+
waitForCondition(() -> updatesLocAfterFail.get() > 0 && updatesDistrAfterFail.get() > 0,
166+
getTestTimeout() / 2)
167+
);
168+
169+
for (int i : IntStream.range(0, NODE_CNT - 1).toArray()) {
170+
IgniteEx grid = grid(i);
171+
172+
assertFalse("Grid " + i + " is stopping", grid.context().isStopping());
173+
assertNull("Failure context is not null for grid " + i, grid.context().failure().failureContext());
174+
}
175+
}
176+
finally {
177+
stopTxLoad.set(true);
178+
179+
txLoadFut.get();
180+
}
181+
}
182+
183+
/** */
184+
private IgniteInternalFuture<?> launchTxLoad(
185+
IgniteEx grid,
186+
IgniteCache<Object, Object> cache,
187+
AtomicBoolean stopTxLoad
188+
) {
189+
return GridTestUtils.runMultiThreadedAsync(() -> {
190+
ThreadLocalRandom rnd = ThreadLocalRandom.current();
191+
192+
while (!stopTxLoad.get()) {
193+
try (
194+
Transaction tx = grid.transactions().txStart(
195+
TransactionConcurrency.PESSIMISTIC,
196+
TransactionIsolation.REPEATABLE_READ)
197+
) {
198+
Map<Integer, Object> vals = rnd.ints()
199+
.limit(KEYS_PER_TX)
200+
.boxed()
201+
.collect(Collectors.toMap(
202+
Function.identity(),
203+
Function.identity(),
204+
(a, b) -> a,
205+
TreeMap::new
206+
));
207+
208+
cache.putAll(vals);
209+
210+
tx.commit();
211+
}
212+
catch (Exception ignore) {
213+
// No-op.
214+
}
215+
}
216+
}, TX_THREADS, "test-tx");
217+
}
218+
219+
/** */
220+
private ContinuousQuery<Object, Object> buildContinuousQuery(
221+
boolean locOnly,
222+
AtomicBoolean nodeFailed,
223+
AtomicInteger cntrBeforeFail,
224+
AtomicInteger cntrAfterFail
225+
) {
226+
return new ContinuousQuery<>()
227+
.setLocal(locOnly)
228+
.setLocalListener(new CacheEntryUpdatedListener<>() {
229+
@Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
230+
for (Object ignored : iterable) {
231+
if (!nodeFailed.get())
232+
cntrBeforeFail.incrementAndGet();
233+
else
234+
cntrAfterFail.incrementAndGet();
235+
}
236+
237+
doSleep(1);
238+
}
239+
});
240+
}
241+
242+
/** */
243+
private void failNode(int lastNodeIdx) {
244+
((TcpDiscoverySpi)grid(lastNodeIdx).configuration().getDiscoverySpi()).simulateNodeFailure();
245+
}
246+
}

modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
4141
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
4242
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
43+
import org.apache.ignite.internal.processors.cache.query.continuous.LocalContinuousQueryWithNodeFailureTest;
4344
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsQueryTest;
4445
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsSystemViewTablesTest;
4546
import org.apache.ignite.internal.processors.query.DropTableAfterCteSqlQueryTest;
@@ -86,6 +87,7 @@
8687
QueryEntityAliasesTest.class,
8788
CacheContinuousQueryEntriesExpireTest.class,
8889
DropTableAfterCteSqlQueryTest.class,
90+
LocalContinuousQueryWithNodeFailureTest.class,
8991
})
9092
public class IgniteCacheQuerySelfTestSuite6 {
9193
}

0 commit comments

Comments
 (0)