Skip to content

Commit 288cfd5

Browse files
authored
IGNITE-27325 CdcMain clears corrupted cdc state files (#12577)
1 parent ed020d7 commit 288cfd5

3 files changed

Lines changed: 188 additions & 9 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
* @see CdcMain
5959
*/
6060
public class CdcConsumerState {
61-
6261
/** Log. */
6362
private final IgniteLogger log;
6463

@@ -161,7 +160,7 @@ public T2<WALPointer, Integer> loadWalState() {
161160
* @return Saved state.
162161
*/
163162
public Map<Integer, Long> loadCaches() {
164-
Map<Integer, Long> state = load(ft.cdcCachesState(), HashMap::new);
163+
Map<Integer, Long> state = loadOrDefaultIfCorrupted(ft.cdcCachesState(), HashMap::new, true);
165164

166165
log.info("Initial caches state loaded [cachesCnt=" + state.size() + ']');
167166

@@ -187,7 +186,7 @@ public void saveCaches(Map<Integer, Long> cachesState) throws IOException {
187186
* @return Saved state.
188187
*/
189188
public Set<T2<Integer, Byte>> loadMappingsState() {
190-
Set<T2<Integer, Byte>> state = load(ft.cdcMappingsState(), HashSet::new);
189+
Set<T2<Integer, Byte>> state = loadOrDefaultIfCorrupted(ft.cdcMappingsState(), HashSet::new, true);
191190

192191
assert state != null;
193192

@@ -207,7 +206,7 @@ public Set<T2<Integer, Byte>> loadMappingsState() {
207206
* @return Saved state.
208207
*/
209208
public Map<Integer, Long> loadTypesState() {
210-
Map<Integer, Long> state = load(ft.cdcTypesState(), HashMap::new);
209+
Map<Integer, Long> state = loadOrDefaultIfCorrupted(ft.cdcTypesState(), HashMap::new, true);
211210

212211
assert state != null;
213212

@@ -230,16 +229,36 @@ private <T> void save(T state, Path tmp, Path file) throws IOException {
230229
Files.move(tmp, file, ATOMIC_MOVE, REPLACE_EXISTING);
231230
}
232231

233-
/** Loads data from path. */
234-
private <D> D load(Path state, Supplier<D> dflt) {
232+
/**
233+
* Loads data from path.
234+
* @param state Path to load data from.
235+
* @param dflt Default value, if given file not exist or corrupted.
236+
* @param delIfCorrupted Delete given file if corrupted and return {@code dlft} value.
237+
*/
238+
private <D> D loadOrDefaultIfCorrupted(Path state, Supplier<D> dflt, boolean delIfCorrupted) {
235239
if (!Files.exists(state))
236240
return dflt.get();
237241

238242
try (ObjectInputStream ois = new ObjectInputStream(Files.newInputStream(state))) {
239-
240243
return (D)ois.readObject();
241244
}
242-
catch (IOException | ClassNotFoundException e) {
245+
catch (IOException e) {
246+
if (delIfCorrupted) {
247+
try {
248+
log.warning("State file was corrupted. Will remove the file and restore state with default [file=" + state + ']');
249+
250+
Files.delete(state);
251+
}
252+
catch (IOException ioe) {
253+
throw new RuntimeException(e);
254+
}
255+
256+
return dflt.get();
257+
}
258+
259+
throw new RuntimeException(e);
260+
}
261+
catch (ClassNotFoundException e) {
243262
throw new RuntimeException(e);
244263
}
245264
}
@@ -250,7 +269,7 @@ private <D> D load(Path state, Supplier<D> dflt) {
250269
* @return CDC mode state.
251270
*/
252271
public CdcMode loadCdcMode() {
253-
CdcMode state = load(ft.cdcModeState(), () -> CdcMode.IGNITE_NODE_ACTIVE);
272+
CdcMode state = loadOrDefaultIfCorrupted(ft.cdcModeState(), () -> CdcMode.IGNITE_NODE_ACTIVE, false);
254273

255274
log.info("CDC mode loaded [" + state + ']');
256275

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.cdc;
19+
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.Iterator;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ThreadLocalRandom;
25+
import org.apache.ignite.Ignite;
26+
import org.apache.ignite.binary.BinaryType;
27+
import org.apache.ignite.configuration.CacheConfiguration;
28+
import org.apache.ignite.configuration.DataRegionConfiguration;
29+
import org.apache.ignite.configuration.DataStorageConfiguration;
30+
import org.apache.ignite.configuration.IgniteConfiguration;
31+
import org.apache.ignite.internal.IgniteInternalFuture;
32+
import org.apache.ignite.internal.cdc.CdcMain;
33+
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
34+
import org.apache.ignite.internal.util.typedef.internal.CU;
35+
import org.apache.ignite.internal.util.typedef.internal.U;
36+
import org.apache.ignite.metric.MetricRegistry;
37+
import org.apache.ignite.testframework.GridTestUtils;
38+
import org.junit.Test;
39+
40+
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
41+
42+
/** */
43+
public class CorruptedCdcConsumerStateTest extends AbstractCdcTest {
44+
/** {@inheritDoc} */
45+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
46+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
47+
48+
cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
49+
50+
cfg.setDataStorageConfiguration(
51+
new DataStorageConfiguration()
52+
.setWalForceArchiveTimeout(100)
53+
.setDefaultDataRegionConfiguration(
54+
new DataRegionConfiguration().setCdcEnabled(true)
55+
));
56+
57+
return cfg;
58+
}
59+
60+
/** */
61+
@Test
62+
public void testCdcMainClearsCorruptedFiles() throws Exception {
63+
try (Ignite ign = startGrid(0)) {
64+
CountDownLatch sendCacheLatch = new CountDownLatch(1);
65+
66+
CdcMain cdc = createCdc(new TestCdcConsumer(sendCacheLatch), ign.configuration());
67+
68+
IgniteInternalFuture<?> cdcFut = runAsync(cdc);
69+
70+
// Force writing consumer state to ./state directory.
71+
try {
72+
ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
73+
74+
U.await(sendCacheLatch);
75+
}
76+
finally {
77+
cdcFut.cancel();
78+
}
79+
80+
// Corrupt data.
81+
NodeFileTree ft = GridTestUtils.getFieldValue(cdc, "ft");
82+
83+
Path cdcCacheState = ft.cdcCachesState();
84+
85+
byte[] corrupted = new byte[10];
86+
ThreadLocalRandom.current().nextBytes(corrupted);
87+
88+
Files.write(cdcCacheState, corrupted);
89+
90+
sendCacheLatch = new CountDownLatch(1);
91+
92+
cdcFut = runAsync(createCdc(new TestCdcConsumer(sendCacheLatch), ign.configuration()));
93+
cdcFut.listen(sendCacheLatch::countDown);
94+
95+
// Force writing consumer state to ./state directory.
96+
try {
97+
ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
98+
99+
U.await(sendCacheLatch);
100+
101+
assertFalse(cdcFut.isDone());
102+
assertNull(cdcFut.error());
103+
}
104+
finally {
105+
cdcFut.cancel();
106+
}
107+
}
108+
}
109+
110+
/** */
111+
private static final class TestCdcConsumer implements CdcConsumer {
112+
/** */
113+
private final CountDownLatch latch;
114+
115+
/** */
116+
TestCdcConsumer(CountDownLatch latch) {
117+
this.latch = latch;
118+
}
119+
120+
/** */
121+
@Override public void start(MetricRegistry mreg) {
122+
// No-op.
123+
}
124+
125+
/** */
126+
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
127+
return false;
128+
}
129+
130+
/** */
131+
@Override public void onTypes(Iterator<BinaryType> types) {
132+
types.forEachRemaining(t -> {});
133+
}
134+
135+
/** */
136+
@Override public void onMappings(Iterator<TypeMapping> mappings) {
137+
mappings.forEachRemaining(t -> {});
138+
}
139+
140+
/** */
141+
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts) {
142+
cacheEvts.forEachRemaining(e -> {
143+
if (e.cacheId() == CU.cacheId(DEFAULT_CACHE_NAME))
144+
latch.countDown();
145+
});
146+
}
147+
148+
/** */
149+
@Override public void onCacheDestroy(Iterator<Integer> caches) {
150+
caches.forEachRemaining(c -> {});
151+
}
152+
153+
/** */
154+
@Override public void stop() {
155+
// No-op.
156+
}
157+
}
158+
}

modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
2828
import org.apache.ignite.cdc.CdcPushMetricsExporterTest;
2929
import org.apache.ignite.cdc.CdcSelfTest;
30+
import org.apache.ignite.cdc.CorruptedCdcConsumerStateTest;
3031
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
3132
import org.apache.ignite.cdc.TransformedCdcSelfTest;
3233
import org.apache.ignite.cdc.WalForCdcTest;
@@ -161,6 +162,7 @@ public static void addRealPageStoreTests(List<Class<?>> suite, Collection<Class>
161162
GridTestUtils.addTestIfNeeded(suite, TransformedCdcSelfTest.class, ignoredTests);
162163
GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, ignoredTests);
163164
GridTestUtils.addTestIfNeeded(suite, RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
165+
GridTestUtils.addTestIfNeeded(suite, CorruptedCdcConsumerStateTest.class, ignoredTests);
164166
GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, ignoredTests);
165167
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests);
166168
GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);

0 commit comments

Comments
 (0)