diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java index 7b5b87734e3f6..e8fdbbd0990c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java @@ -58,7 +58,6 @@ * @see CdcMain */ public class CdcConsumerState { - /** Log. */ private final IgniteLogger log; @@ -161,7 +160,7 @@ public T2 loadWalState() { * @return Saved state. */ public Map loadCaches() { - Map state = load(ft.cdcCachesState(), HashMap::new); + Map state = loadOrDefaultIfCorrupted(ft.cdcCachesState(), HashMap::new, true); log.info("Initial caches state loaded [cachesCnt=" + state.size() + ']'); @@ -187,7 +186,7 @@ public void saveCaches(Map cachesState) throws IOException { * @return Saved state. */ public Set> loadMappingsState() { - Set> state = load(ft.cdcMappingsState(), HashSet::new); + Set> state = loadOrDefaultIfCorrupted(ft.cdcMappingsState(), HashSet::new, true); assert state != null; @@ -207,7 +206,7 @@ public Set> loadMappingsState() { * @return Saved state. */ public Map loadTypesState() { - Map state = load(ft.cdcTypesState(), HashMap::new); + Map state = loadOrDefaultIfCorrupted(ft.cdcTypesState(), HashMap::new, true); assert state != null; @@ -230,16 +229,36 @@ private void save(T state, Path tmp, Path file) throws IOException { Files.move(tmp, file, ATOMIC_MOVE, REPLACE_EXISTING); } - /** Loads data from path. */ - private D load(Path state, Supplier dflt) { + /** + * Loads data from path. + * @param state Path to load data from. + * @param dflt Default value, if given file not exist or corrupted. + * @param delIfCorrupted Delete given file if corrupted and return {@code dlft} value. + */ + private D loadOrDefaultIfCorrupted(Path state, Supplier dflt, boolean delIfCorrupted) { if (!Files.exists(state)) return dflt.get(); try (ObjectInputStream ois = new ObjectInputStream(Files.newInputStream(state))) { - return (D)ois.readObject(); } - catch (IOException | ClassNotFoundException e) { + catch (IOException e) { + if (delIfCorrupted) { + try { + log.warning("State file was corrupted. Will remove the file and restore state with default [file=" + state + ']'); + + Files.delete(state); + } + catch (IOException ioe) { + throw new RuntimeException(e); + } + + return dflt.get(); + } + + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) { throw new RuntimeException(e); } } @@ -250,7 +269,7 @@ private D load(Path state, Supplier dflt) { * @return CDC mode state. */ public CdcMode loadCdcMode() { - CdcMode state = load(ft.cdcModeState(), () -> CdcMode.IGNITE_NODE_ACTIVE); + CdcMode state = loadOrDefaultIfCorrupted(ft.cdcModeState(), () -> CdcMode.IGNITE_NODE_ACTIVE, false); log.info("CDC mode loaded [" + state + ']'); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java new file mode 100644 index 0000000000000..302a3041347cb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + +/** */ +public class CorruptedCdcConsumerStateTest extends AbstractCdcTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalForceArchiveTimeout(100) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setCdcEnabled(true) + )); + + return cfg; + } + + /** */ + @Test + public void testCdcMainClearsCorruptedFiles() throws Exception { + try (Ignite ign = startGrid(0)) { + CountDownLatch sendCacheLatch = new CountDownLatch(1); + + CdcMain cdc = createCdc(new TestCdcConsumer(sendCacheLatch), ign.configuration()); + + IgniteInternalFuture cdcFut = runAsync(cdc); + + // Force writing consumer state to ./state directory. + try { + ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0); + + U.await(sendCacheLatch); + } + finally { + cdcFut.cancel(); + } + + // Corrupt data. + NodeFileTree ft = GridTestUtils.getFieldValue(cdc, "ft"); + + Path cdcCacheState = ft.cdcCachesState(); + + byte[] corrupted = new byte[10]; + ThreadLocalRandom.current().nextBytes(corrupted); + + Files.write(cdcCacheState, corrupted); + + sendCacheLatch = new CountDownLatch(1); + + cdcFut = runAsync(createCdc(new TestCdcConsumer(sendCacheLatch), ign.configuration())); + cdcFut.listen(sendCacheLatch::countDown); + + // Force writing consumer state to ./state directory. + try { + ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0); + + U.await(sendCacheLatch); + + assertFalse(cdcFut.isDone()); + assertNull(cdcFut.error()); + } + finally { + cdcFut.cancel(); + } + } + } + + /** */ + private static final class TestCdcConsumer implements CdcConsumer { + /** */ + private final CountDownLatch latch; + + /** */ + TestCdcConsumer(CountDownLatch latch) { + this.latch = latch; + } + + /** */ + @Override public void start(MetricRegistry mreg) { + // No-op. + } + + /** */ + @Override public boolean onEvents(Iterator evts) { + return false; + } + + /** */ + @Override public void onTypes(Iterator types) { + types.forEachRemaining(t -> {}); + } + + /** */ + @Override public void onMappings(Iterator mappings) { + mappings.forEachRemaining(t -> {}); + } + + /** */ + @Override public void onCacheChange(Iterator cacheEvts) { + cacheEvts.forEachRemaining(e -> { + if (e.cacheId() == CU.cacheId(DEFAULT_CACHE_NAME)) + latch.countDown(); + }); + } + + /** */ + @Override public void onCacheDestroy(Iterator caches) { + caches.forEachRemaining(c -> {}); + } + + /** */ + @Override public void stop() { + // No-op. + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 0d46f7390b14f..de17138ef67d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -27,6 +27,7 @@ import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest; import org.apache.ignite.cdc.CdcPushMetricsExporterTest; import org.apache.ignite.cdc.CdcSelfTest; +import org.apache.ignite.cdc.CorruptedCdcConsumerStateTest; import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest; import org.apache.ignite.cdc.TransformedCdcSelfTest; import org.apache.ignite.cdc.WalForCdcTest; @@ -161,6 +162,7 @@ public static void addRealPageStoreTests(List> suite, Collection GridTestUtils.addTestIfNeeded(suite, TransformedCdcSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, RestartWithWalForceArchiveTimeoutTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CorruptedCdcConsumerStateTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);