diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 71a8f65573799..2e74d95139726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1499,10 +1499,8 @@ private void sendLocalPartitions( try { cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + - node.id() + ", msg=" + m + ']'); + catch (ClusterTopologyCheckedException e) { + log.warning("Failed to send local partitions to node because it left grid [nodeId=" + node.id() + ", exchId=" + id + ']', e); } catch (IgniteCheckedException e) { U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a5f24c5e23820..9a0549e06d48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2117,15 +2117,16 @@ else if (localJoinExchange()) try { cctx.io().send(node, msg, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) { - log.debug( - "Failed to send local partitions on exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']' - ); - } + catch (ClusterTopologyCheckedException e) { + long retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); + + log.warning( + "Failed to send local partitions on exchange (node left) [nodeId=" + node.id() + + ", exchId=" + exchId + ", retryDelay=" + retryDelay + "ms]", e + ); if (cctx.discovery().alive(node.id())) { - U.sleep(cctx.gridConfig().getNetworkSendRetryDelay()); + U.sleep(retryDelay); continue; } @@ -2255,10 +2256,9 @@ private void sendPartitions(ClusterNode oldestNode) { try { sendLocalPartitions(oldestNode); } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + - ", exchId=" + exchId + ']'); + catch (ClusterTopologyCheckedException e) { + log.warning("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + + ", exchId=" + exchId + ']', e); } catch (IgniteCheckedException e) { if (reconnectOnError(e)) @@ -4418,8 +4418,7 @@ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSi cctx.io().send(node, res, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); + log.warning("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 2ef40cad294a7..8a404d6fe5690 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -19,12 +19,14 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -36,6 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.lang.IgniteInClosure; @@ -81,6 +84,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs /** */ private ListeningTestLogger testLog; + /** */ + private Supplier spiSupp = CustomTcpCommunicationSpi::new; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -102,8 +108,8 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs testLog.clearListeners(); testLog = null; - lifecycleBean = null; + spiSupp = CustomTcpCommunicationSpi::new; stopAllGrids(); } @@ -112,7 +118,7 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setCommunicationSpi(new CustomTcpCommunicationSpi()); + cfg.setCommunicationSpi(spiSupp.get()); if (testLog != null) cfg.setGridLogger(testLog); @@ -131,6 +137,72 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs return cfg; } + /** + * + */ + @Test + public void testSingleMessageErrorWarnings() throws Exception { + String logSubstr = "Failed to send local partitions to node because it left grid [nodeId="; + + LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); + testLog = new ListeningTestLogger(log, logListener); + + CountDownLatch singleMsgLatch = new CountDownLatch(1); + CountDownLatch nodeStopLatch = new CountDownLatch(1); + + AtomicReference crdIdRef = new AtomicReference<>(); + + spiSupp = () -> new TcpCommunicationSpi() { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) + throws IgniteSpiException { + boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; + UUID crdId = crdIdRef.get(); + + if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id())) { + singleMsgLatch.countDown(); + + try { + nodeStopLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException("Interrupted while waiting to proceed", e); + } + } + + super.sendMessage(node, msg, ackC); + } + }; + + IgniteEx crd = startGrid(0); + IgniteEx node1 = startGrid(1); + + awaitPartitionMapExchange(); + + crdIdRef.set(crd.localNode().id()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> + node1.context().cache().context().exchange().refreshPartitions()); + + try { + singleMsgLatch.await(); + + stopGrid(0); + + nodeStopLatch.countDown(); + + assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, getTestTimeout())); + } + finally { + singleMsgLatch.countDown(); + nodeStopLatch.countDown(); + + fut.cancel(); + } + } + /** * @throws Exception If failed. */