Skip to content

Commit fd3cc2e

Browse files
committed
IGNITE-27678 Same partitions on different nodes can hold different updates if writeThrough is enabled
1 parent 257d6b5 commit fd3cc2e

3 files changed

Lines changed: 215 additions & 8 deletions

File tree

modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.util.GridCommandHandlerPropertiesTest;
3535
import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest;
3636
import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest;
37+
import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest;
3738
import org.apache.ignite.util.IdleVerifyDumpTest;
3839
import org.apache.ignite.util.MetricCommandTest;
3940
import org.apache.ignite.util.PerformanceStatisticsCommandTest;
@@ -77,7 +78,8 @@
7778

7879
SecurityCommandHandlerPermissionsTest.class,
7980

80-
IdleVerifyDumpTest.class
81+
IdleVerifyDumpTest.class,
82+
IdleVerifyCheckWithWriteThroughTest.class
8183
})
8284
public class IgniteControlUtilityTestSuite2 {
8385
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.util;
19+
20+
import java.util.List;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import javax.cache.configuration.Factory;
25+
import javax.cache.integration.CacheWriterException;
26+
import org.apache.ignite.Ignite;
27+
import org.apache.ignite.IgniteCache;
28+
import org.apache.ignite.IgniteException;
29+
import org.apache.ignite.cache.CacheAtomicityMode;
30+
import org.apache.ignite.cache.CacheMode;
31+
import org.apache.ignite.cache.store.CacheStore;
32+
import org.apache.ignite.cache.store.CacheStoreSession;
33+
import org.apache.ignite.cache.store.CacheStoreSessionListener;
34+
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
35+
import org.apache.ignite.configuration.CacheConfiguration;
36+
import org.apache.ignite.configuration.IgniteConfiguration;
37+
import org.apache.ignite.failure.AbstractFailureHandler;
38+
import org.apache.ignite.failure.FailureContext;
39+
import org.apache.ignite.internal.IgniteEx;
40+
import org.apache.ignite.internal.IgniteInternalFuture;
41+
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
42+
import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
43+
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
44+
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
45+
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
46+
import org.apache.ignite.testframework.GridTestUtils;
47+
import org.apache.ignite.transactions.Transaction;
48+
import org.junit.Test;
49+
import org.junit.runners.Parameterized;
50+
51+
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
52+
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
53+
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
54+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
55+
56+
/** */
57+
public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest {
58+
/** */
59+
private AtomicReference<Throwable> err;
60+
61+
/** Node kill trigger. */
62+
private static final CountDownLatch nodeKill = new CountDownLatch(1);
63+
64+
/** Tx message flag. */
65+
private static volatile boolean finalTxMsgPassed;
66+
67+
/** Session method flag. */
68+
private static final AtomicBoolean sessionTriggered = new AtomicBoolean();
69+
70+
/** */
71+
@Parameterized.Parameters(name = "cmdHnd={0}")
72+
public static List<String> commandHandlers() {
73+
return List.of(CLI_CMD_HND);
74+
}
75+
76+
/** {@inheritDoc} */
77+
@Override protected void beforeTest() throws Exception {
78+
super.beforeTest();
79+
80+
stopAllGrids();
81+
82+
persistenceEnable(false);
83+
84+
err = new AtomicReference<>();
85+
}
86+
87+
/** {@inheritDoc} */
88+
@Override protected boolean persistenceEnable() {
89+
return false;
90+
}
91+
92+
/** {@inheritDoc} */
93+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
94+
return super.getConfiguration(igniteInstanceName)
95+
.setCommunicationSpi(new TestRecordingCommunicationSpi())
96+
.setFailureHandler(new AbstractFailureHandler() {
97+
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
98+
err.compareAndSet(null, failureCtx.error());
99+
100+
return false;
101+
}
102+
});
103+
}
104+
105+
/** Test scenario:
106+
* <ul>
107+
* <li>Start 3 node [node0, node1, node2].</li>
108+
* <li>Initialize put operation into transactional cache where [node1] holds primary partition for such insertion.</li>
109+
* <li>Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure.</li>
110+
* </ul>
111+
*
112+
* @see IgniteTxManager#salvageTx(IgniteInternalTx)
113+
*/
114+
@Test
115+
public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception {
116+
// sequential start is important here
117+
startGrid(0);
118+
startGrid(1);
119+
startGrid(2);
120+
121+
injectTestSystemOut();
122+
123+
int gridToStop = 1;
124+
125+
IgniteEx instanceToStop = grid(gridToStop);
126+
127+
TestRecordingCommunicationSpi commSpi =
128+
(TestRecordingCommunicationSpi)instanceToStop.configuration().getCommunicationSpi();
129+
commSpi.record(GridDhtTxFinishRequest.class);
130+
131+
commSpi.blockMessages((node, msg) -> {
132+
boolean ret = msg instanceof GridDhtTxFinishRequest;
133+
134+
if (ret) {
135+
nodeKill.countDown();
136+
finalTxMsgPassed = true;
137+
}
138+
139+
return ret;
140+
});
141+
142+
Factory<? extends CacheStore<Object, Object>> storeFactory = new MapCacheStoreStrategy().getStoreFactory();
143+
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>("cache");
144+
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
145+
ccfg.setCacheMode(CacheMode.REPLICATED);
146+
ccfg.setReadThrough(true);
147+
ccfg.setWriteThrough(true);
148+
ccfg.setCacheStoreFactory(storeFactory);
149+
ccfg.setCacheStoreSessionListenerFactories(new TestCacheStoreFactory());
150+
151+
IgniteCache<Integer, Object> cache = instanceToStop.createCache(ccfg);
152+
153+
awaitPartitionMapExchange();
154+
155+
IgniteInternalFuture<Object> stopFut = GridTestUtils.runAsync(() -> {
156+
nodeKill.await();
157+
stopGrid(gridToStop);
158+
});
159+
160+
// primary key for [node1]
161+
Integer primaryKey = primaryKey(cache);
162+
163+
//noinspection EmptyCatchBlock
164+
try (Transaction tx = instanceToStop.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
165+
cache.put(primaryKey, new Object());
166+
tx.commit();
167+
}
168+
catch (Throwable th) {
169+
// No op
170+
}
171+
172+
stopFut.get(getTestTimeout());
173+
174+
assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
175+
176+
String out = testOut.toString();
177+
178+
assertContains(log, out, "The check procedure has failed");
179+
// Update counters are equal but size is different
180+
assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=0");
181+
assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=1");
182+
}
183+
184+
/** */
185+
private static class TestCacheStoreFactory implements Factory<CacheStoreSessionListener> {
186+
/** {@inheritDoc} */
187+
@Override public CacheStoreSessionListener create() {
188+
return new TestCacheJdbcStoreSessionListener();
189+
}
190+
}
191+
192+
/** */
193+
private static class TestCacheJdbcStoreSessionListener extends CacheJdbcStoreSessionListener {
194+
/** {@inheritDoc} */
195+
@Override public void start() throws IgniteException {
196+
// No op.
197+
}
198+
199+
/** {@inheritDoc} */
200+
@Override public void onSessionStart(CacheStoreSession ses) {
201+
// No op, originally connection need to be initialized here.
202+
}
203+
204+
/** {@inheritDoc} */
205+
@Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
206+
if (finalTxMsgPassed && sessionTriggered.compareAndSet(false, true))
207+
throw new CacheWriterException("Internal storage exception raised");
208+
}
209+
}
210+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht;
1919

20-
import java.util.ArrayList;
2120
import java.util.Collection;
2221
import java.util.Collections;
22+
import java.util.List;
2323
import java.util.Map;
2424
import java.util.UUID;
2525
import javax.cache.processor.EntryProcessor;
@@ -226,12 +226,7 @@ public GridDhtTxRemote(
226226

227227
/** {@inheritDoc} */
228228
@Override public Collection<UUID> masterNodeIds() {
229-
Collection<UUID> res = new ArrayList<>(2);
230-
231-
res.add(nearNodeId);
232-
res.add(nodeId);
233-
234-
return res;
229+
return nearNodeId != nodeId ? List.of(nearNodeId, nodeId) : List.of(nearNodeId);
235230
}
236231

237232
/** {@inheritDoc} */

0 commit comments

Comments
 (0)