From 9b12c27a6d0ebab067b17d8c799b0dba461b9884 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Mon, 30 Jun 2025 14:18:43 +0300 Subject: [PATCH 01/18] IGNITE-25793 Add missing logs for single message errors --- .../GridCachePartitionExchangeManager.java | 13 ++--- .../GridDhtPartitionsExchangeFuture.java | 53 +++++++++++++++---- .../preloader/InitNewCoordinatorFuture.java | 3 ++ 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 35e2706e63eb8..135ca3d1f62a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1314,11 +1314,8 @@ private void sendAllPartitions( cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) { - log.debug("Failed to send partition update to node because it left grid (will ignore) " + - "[node=" + node.id() + ", msg=" + m + ']'); - } + catch (ClusterTopologyCheckedException e) { + log.warning("Failed to send full partitions [nodeId=" + node.id() + ", topVer=" + msgTopVer + ']', e); } catch (IgniteCheckedException e) { failedNodes.add(node); @@ -1508,10 +1505,8 @@ private void sendLocalPartitions( try { cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + - node.id() + ", msg=" + m + ']'); + catch (ClusterTopologyCheckedException e) { + log.warning("Failed to send local partitions [nodeId=" + node.id() + ", exchId=" + id + ']', e); } catch (IgniteCheckedException e) { U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0674e9de75432..90536a912d662 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2150,19 +2150,39 @@ else if (localJoinExchange()) if (log.isTraceEnabled()) log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); + long sndStart = System.currentTimeMillis(); + int retry = 0; + + if (log.isInfoEnabled()) + log.info("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", attempt=" + (retry + 1) + ']'); + while (true) { try { cctx.io().send(node, msg, SYSTEM_POOL); + + if (log.isInfoEnabled()) { + long dur = System.currentTimeMillis() - sndStart; + log.info("Local partitions sent [nodeId=" + node.id() + ", exchId=" + exchId + + ", duration=" + dur + "ms, retries=" + retry + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) { - log.debug( - "Failed to send local partitions on exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']' - ); - } + + long retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); + + log.warning( + "Failed to send local partitions on exchange (node left) [nodeId=" + node.id() + + ", exchId=" + exchId + ", retryDelay=" + retryDelay + "ms]" + ); if (cctx.discovery().alive(node.id())) { - U.sleep(cctx.gridConfig().getNetworkSendRetryDelay()); + if (log.isInfoEnabled()) + log.info("Retry sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + + ", retry=" + (retry + 1) + ']'); + + U.sleep(retryDelay); + + retry++; continue; } @@ -2270,8 +2290,8 @@ private void sendAllPartitions( cctx.io().send(node, fullMsgToSend, SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); + log.warning("Failed to send partitions [nodeId=" + node.id() + + ", exchId=" + exchId + ']', e); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions [node=" + node + ']', e); @@ -2285,11 +2305,23 @@ private void sendAllPartitions( private void sendPartitions(ClusterNode oldestNode) { assert !exchCtx.exchangeFreeSwitch() : this; + if (log.isInfoEnabled()) + log.info("Send partitions to coordinator [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); + + long start = System.currentTimeMillis(); + try { sendLocalPartitions(oldestNode); + + if (log.isInfoEnabled()) + log.info("Finished sending partitions [nodeId=" + oldestNode.id() + ", exchId=" + exchId + + ", duration=" + (System.currentTimeMillis() - start) + "ms]"); } catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) + if (log.isInfoEnabled()) + log.info("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + + ", exchId=" + exchId + ']'); + else if (log.isDebugEnabled()) log.debug("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); } @@ -4326,8 +4358,7 @@ private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsS } } catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); + log.warning("Failed to send partitions [nodeId=" + node.id() + ", exchId=" + exchId + ']', e); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions [node=" + node + ']', e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java index 6e0333f6e0e3b..959cd612a2c25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java @@ -178,6 +178,9 @@ else if (!node.isLocal()) { if (log.isDebugEnabled()) log.debug("Failed to send partitions request, node failed: " + node); + log.warning("Failed to send partitions request (node left) [nodeId=" + node.id() + + ", exchId=" + exchFut.exchangeId() + ']'); + onNodeLeft(node.id()); } } From 18b5315a1e3ee7923ae06b2a5c58090b80413c8c Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Tue, 15 Jul 2025 13:28:39 +0300 Subject: [PATCH 02/18] IGNITE-25793 Add missing logs for single message errors --- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 90536a912d662..b93f9d6dde299 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2147,9 +2147,6 @@ else if (localJoinExchange()) msg.exchangeStartTime(startTime); - if (log.isTraceEnabled()) - log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); - long sndStart = System.currentTimeMillis(); int retry = 0; @@ -2306,7 +2303,7 @@ private void sendPartitions(ClusterNode oldestNode) { assert !exchCtx.exchangeFreeSwitch() : this; if (log.isInfoEnabled()) - log.info("Send partitions to coordinator [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); + log.info("Sending partitions to coordinator [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); long start = System.currentTimeMillis(); From 32a5636b8ca3cff3aa81cf186cc20150f6cfef10 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Tue, 15 Jul 2025 13:28:39 +0300 Subject: [PATCH 03/18] IGNITE-25793 Add missing logs for single message errors --- .../cache/GridCachePartitionExchangeManager.java | 4 ++-- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 135ca3d1f62a5..50b50407398bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1499,8 +1499,8 @@ private void sendLocalPartitions( null, grps); - if (log.isTraceEnabled()) - log.trace("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); + if (log.isInfoEnabled()) + log.info("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); try { cctx.io().sendNoRetry(node, m, SYSTEM_POOL); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b93f9d6dde299..ef48834648caa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2151,7 +2151,7 @@ else if (localJoinExchange()) int retry = 0; if (log.isInfoEnabled()) - log.info("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", attempt=" + (retry + 1) + ']'); + log.info("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ']'); while (true) { try { @@ -2160,7 +2160,7 @@ else if (localJoinExchange()) if (log.isInfoEnabled()) { long dur = System.currentTimeMillis() - sndStart; log.info("Local partitions sent [nodeId=" + node.id() + ", exchId=" + exchId + - ", duration=" + dur + "ms, retries=" + retry + ']'); + ", duration=" + dur + "ms"); } } catch (ClusterTopologyCheckedException ignored) { @@ -2318,9 +2318,6 @@ private void sendPartitions(ClusterNode oldestNode) { if (log.isInfoEnabled()) log.info("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); - else if (log.isDebugEnabled()) - log.debug("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + - ", exchId=" + exchId + ']'); } catch (IgniteCheckedException e) { if (reconnectOnError(e)) From 8207b241a80e91aeec2885db3c2c3c96ff84e84c Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Fri, 18 Jul 2025 13:07:42 +0300 Subject: [PATCH 04/18] IGNITE-25793 Add missing logs for single message errors --- .../cache/GridCachePartitionExchangeManager.java | 7 +++++-- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 50b50407398bc..b7f2e0d48799d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1314,8 +1314,11 @@ private void sendAllPartitions( cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException e) { - log.warning("Failed to send full partitions [nodeId=" + node.id() + ", topVer=" + msgTopVer + ']', e); + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) { + log.debug("Failed to send partition update to node because it left grid (will ignore) " + + "[node=" + node.id() + ", msg=" + m + ']'); + } } catch (IgniteCheckedException e) { failedNodes.add(node); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ef48834648caa..8ce642ea4e58b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -4352,7 +4352,8 @@ private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsS } } catch (ClusterTopologyCheckedException e) { - log.warning("Failed to send partitions [nodeId=" + node.id() + ", exchId=" + exchId + ']', e); + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions [node=" + node + ']', e); From 208370f5ff7cc00002c01b98e65b102fd49f5505 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Sat, 19 Jul 2025 14:34:52 +0300 Subject: [PATCH 05/18] IGNITE-25793 Add missing logs for single message errors --- .../GridCachePartitionExchangeManager.java | 4 +-- .../GridDhtPartitionsExchangeFuture.java | 31 ++----------------- .../preloader/InitNewCoordinatorFuture.java | 3 -- .../util/distributed/DistributedProcess.java | 6 ++-- 4 files changed, 7 insertions(+), 37 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index b7f2e0d48799d..c97815bcdb9ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1502,8 +1502,8 @@ private void sendLocalPartitions( null, grps); - if (log.isInfoEnabled()) - log.info("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); + if (log.isTraceEnabled()) + log.trace("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); try { cctx.io().sendNoRetry(node, m, SYSTEM_POOL); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8ce642ea4e58b..77744ad96e25f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2147,21 +2147,12 @@ else if (localJoinExchange()) msg.exchangeStartTime(startTime); - long sndStart = System.currentTimeMillis(); - int retry = 0; - - if (log.isInfoEnabled()) - log.info("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ']'); + if (log.isTraceEnabled()) + log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); while (true) { try { cctx.io().send(node, msg, SYSTEM_POOL); - - if (log.isInfoEnabled()) { - long dur = System.currentTimeMillis() - sndStart; - log.info("Local partitions sent [nodeId=" + node.id() + ", exchId=" + exchId + - ", duration=" + dur + "ms"); - } } catch (ClusterTopologyCheckedException ignored) { @@ -2173,14 +2164,8 @@ else if (localJoinExchange()) ); if (cctx.discovery().alive(node.id())) { - if (log.isInfoEnabled()) - log.info("Retry sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + - ", retry=" + (retry + 1) + ']'); - U.sleep(retryDelay); - retry++; - continue; } } @@ -2302,21 +2287,11 @@ private void sendAllPartitions( private void sendPartitions(ClusterNode oldestNode) { assert !exchCtx.exchangeFreeSwitch() : this; - if (log.isInfoEnabled()) - log.info("Sending partitions to coordinator [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); - - long start = System.currentTimeMillis(); - try { sendLocalPartitions(oldestNode); - - if (log.isInfoEnabled()) - log.info("Finished sending partitions [nodeId=" + oldestNode.id() + ", exchId=" + exchId + - ", duration=" + (System.currentTimeMillis() - start) + "ms]"); } catch (ClusterTopologyCheckedException ignore) { - if (log.isInfoEnabled()) - log.info("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + + log.warning("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); } catch (IgniteCheckedException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java index 959cd612a2c25..6e0333f6e0e3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java @@ -178,9 +178,6 @@ else if (!node.isLocal()) { if (log.isDebugEnabled()) log.debug("Failed to send partitions request, node failed: " + node); - log.warning("Failed to send partitions request (node left) [nodeId=" + node.id() + - ", exchId=" + exchFut.exchangeId() + ']'); - onNodeLeft(node.id()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index ee5a99931df3b..ab6136714a4b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -306,10 +306,8 @@ private void sendSingleMessage(Process p) { } catch (ClusterTopologyCheckedException e) { // The coordinator has failed. The single message will be sent when a new coordinator initialized. - if (log.isDebugEnabled()) { - log.debug("Failed to send a single message to coordinator: [crdId=" + crdId + - ", processId=" + p.id + ", error=" + e.getMessage() + ']'); - } + log.warning("Failed to send a single message to coordinator: [crdId=" + crdId + + ", processId=" + p.id + ", error=" + e.getMessage() + ']'); } catch (IgniteCheckedException e) { log.error("Unable to send message to coordinator.", e); From 7c69db6737248dd2629594760fe7997fd1a7d115 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Tue, 22 Jul 2025 05:44:15 +0300 Subject: [PATCH 06/18] IGNITE-25793 Add tests --- .../GridDhtPartitionsExchangeFuture.java | 12 ++-- ...ePartitionExchangeManagerWarningsTest.java | 57 ++++++++++++++++++- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 77744ad96e25f..b356630dc9640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2154,13 +2154,13 @@ else if (localJoinExchange()) try { cctx.io().send(node, msg, SYSTEM_POOL); } - catch (ClusterTopologyCheckedException ignored) { + catch (ClusterTopologyCheckedException e) { long retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); log.warning( "Failed to send local partitions on exchange (node left) [nodeId=" + node.id() + - ", exchId=" + exchId + ", retryDelay=" + retryDelay + "ms]" + ", exchId=" + exchId + ", retryDelay=" + retryDelay + "ms]", e ); if (cctx.discovery().alive(node.id())) { @@ -2272,8 +2272,8 @@ private void sendAllPartitions( cctx.io().send(node, fullMsgToSend, SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { - log.warning("Failed to send partitions [nodeId=" + node.id() + - ", exchId=" + exchId + ']', e); + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions [node=" + node + ']', e); @@ -2290,9 +2290,9 @@ private void sendPartitions(ClusterNode oldestNode) { try { sendLocalPartitions(oldestNode); } - catch (ClusterTopologyCheckedException ignore) { + catch (ClusterTopologyCheckedException e) { log.warning("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + - ", exchId=" + exchId + ']'); + ", exchId=" + exchId + ']', e); } catch (IgniteCheckedException e) { if (reconnectOnError(e)) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 2ef40cad294a7..6a70614c359d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -25,6 +25,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -32,12 +34,16 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; @@ -81,6 +87,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs /** */ private ListeningTestLogger testLog; + /** */ + private volatile Supplier spiFactory = CustomTcpCommunicationSpi::new; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -101,6 +110,8 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs if (testLog != null) testLog.clearListeners(); + spiFactory = CustomTcpCommunicationSpi::new; + testLog = null; lifecycleBean = null; @@ -112,7 +123,7 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setCommunicationSpi(new CustomTcpCommunicationSpi()); + cfg.setCommunicationSpi(spiFactory.get()); if (testLog != null) cfg.setGridLogger(testLog); @@ -131,6 +142,50 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs return cfg; } + @Test + public void testSingleMessageErrorWarnings () throws Exception { + spiFactory = TestRecordingCommunicationSpi::new; + + String logSubstr = "Failed to send local partitions"; + + LogListener logLstnr = LogListener.matches(logSubstr).build(); + + testLog = new ListeningTestLogger(log, logLstnr); + + + IgniteEx crd = startGrid(0); + + IgniteConfiguration cfg1 = getConfiguration(getTestIgniteInstanceName(1)); + + TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi(); + + spi.blockMessages((node, message) -> message instanceof GridDhtPartitionsSingleMessage); + + cfg1.setCommunicationSpi(spi); + + cfg1.setGridLogger(testLog); + + IgniteEx problemNode = startGrid(cfg1); + + assertTrue(spi.waitForBlocked(1, 5000)); + + stopGrid(0); +// +// +//// spi.blockMessages(TestRecordingCommunicationSpi.blockSingleExhangeMessage()); +// + +// spi.waitForBlocked(); + + + spi.stopBlock(); + + awaitPartitionMapExchange(); + + + assertTrue(logLstnr.check()); + } + /** * @throws Exception If failed. */ From 14f726805e97f15d34fcc56b25ab164ae15e79c8 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Tue, 29 Jul 2025 13:18:24 +0300 Subject: [PATCH 07/18] IGNITE-25293 Add test --- ...ePartitionExchangeManagerWarningsTest.java | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 6a70614c359d5..6ecad3e79e0ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -39,7 +39,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; @@ -144,46 +143,45 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Test public void testSingleMessageErrorWarnings () throws Exception { - spiFactory = TestRecordingCommunicationSpi::new; + String expectedLogMsg = "Failed to send local partitions"; - String logSubstr = "Failed to send local partitions"; + LogListener logListener = LogListener.matches(expectedLogMsg).build(); + testLog = new ListeningTestLogger(log, logListener); - LogListener logLstnr = LogListener.matches(logSubstr).build(); - - testLog = new ListeningTestLogger(log, logLstnr); - - - IgniteEx crd = startGrid(0); + IgniteConfiguration cfg0 = getConfiguration(getTestIgniteInstanceName(0)); + TestRecordingCommunicationSpi spi0 = new TestRecordingCommunicationSpi(); + cfg0.setCommunicationSpi(spi0); + cfg0.setGridLogger(testLog); + startGrid(cfg0); IgniteConfiguration cfg1 = getConfiguration(getTestIgniteInstanceName(1)); + TestRecordingCommunicationSpi spi1 = new TestRecordingCommunicationSpi(); + cfg1.setCommunicationSpi(spi1); + cfg1.setGridLogger(testLog); - TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi(); - - spi.blockMessages((node, message) -> message instanceof GridDhtPartitionsSingleMessage); + IgniteEx node1 = startGrid(cfg1); - cfg1.setCommunicationSpi(spi); + node1.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); - cfg1.setGridLogger(testLog); + spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); - IgniteEx problemNode = startGrid(cfg1); + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + node1.context().cache().context().exchange().refreshPartitions(); + }); - assertTrue(spi.waitForBlocked(1, 5000)); + assertTrue("Message not blocked", spi1.waitForBlocked(1, 5_000)); stopGrid(0); -// -// -//// spi.blockMessages(TestRecordingCommunicationSpi.blockSingleExhangeMessage()); -// - -// spi.waitForBlocked(); + spi1.stopBlock(); - spi.stopBlock(); - - awaitPartitionMapExchange(); + fut.get(5_000); + U.sleep(500); - assertTrue(logLstnr.check()); + assertTrue("Expected log not found", + GridTestUtils.waitForCondition(logListener::check, 3000)); } /** From 63ec1487ae0b05bfb33f15f0431b7721074c5c97 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Mon, 11 Aug 2025 11:26:01 +0300 Subject: [PATCH 08/18] IGNITE-25793 Revert useless logs --- .../GridCachePartitionExchangeManager.java | 7 ++-- ...ePartitionExchangeManagerWarningsTest.java | 37 ++++++++----------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c97815bcdb9ee..d3b985c62d39e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1315,10 +1315,9 @@ private void sendAllPartitions( cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) { - log.debug("Failed to send partition update to node because it left grid (will ignore) " + - "[node=" + node.id() + ", msg=" + m + ']'); - } + if (log.isDebugEnabled()) + log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + + node.id() + ", msg=" + m + ']'); } catch (IgniteCheckedException e) { failedNodes.add(node); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 6ecad3e79e0ed..21cbdfa69170d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -40,10 +40,12 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; import org.apache.ignite.plugin.extensions.communication.Message; @@ -58,6 +60,7 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.util.TestTcpCommunicationSpi; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; @@ -142,46 +145,36 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs } @Test - public void testSingleMessageErrorWarnings () throws Exception { - String expectedLogMsg = "Failed to send local partitions"; + public void testSingleMessageErrorWarnings() throws Exception { + long waitingTimeout = 5_000; + + LogListener logListener = LogListener.matches("Failed to send local partitions").atLeast(1).build(); - LogListener logListener = LogListener.matches(expectedLogMsg).build(); testLog = new ListeningTestLogger(log, logListener); - IgniteConfiguration cfg0 = getConfiguration(getTestIgniteInstanceName(0)); - TestRecordingCommunicationSpi spi0 = new TestRecordingCommunicationSpi(); - cfg0.setCommunicationSpi(spi0); - cfg0.setGridLogger(testLog); - startGrid(cfg0); + spiFactory = TestRecordingCommunicationSpi::new; - IgniteConfiguration cfg1 = getConfiguration(getTestIgniteInstanceName(1)); - TestRecordingCommunicationSpi spi1 = new TestRecordingCommunicationSpi(); - cfg1.setCommunicationSpi(spi1); - cfg1.setGridLogger(testLog); + IgniteEx crd = startGrid(0); - IgniteEx node1 = startGrid(cfg1); + IgniteEx node1 = startGrid(1); - node1.cluster().state(ClusterState.ACTIVE); awaitPartitionMapExchange(); - spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); + TestRecordingCommunicationSpi.spi(node1).blockMessages(GridDhtPartitionsSingleMessage.class, crd.name()); IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { - node1.context().cache().context().exchange().refreshPartitions(); + node1.context().cache().context().exchange().refreshPartitions(); // works if breakpoint + step over }); - assertTrue("Message not blocked", spi1.waitForBlocked(1, 5_000)); + TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(); stopGrid(0); - spi1.stopBlock(); - - fut.get(5_000); - - U.sleep(500); + TestRecordingCommunicationSpi.spi(grid(1)).stopBlock(); assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, 3000)); + } /** From a9f9c13bf0298f6de95e44dbbc145c8083d44b71 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Sat, 16 Aug 2025 20:39:14 +0300 Subject: [PATCH 09/18] IGNITE-25793 Fix flaky test --- ...ePartitionExchangeManagerWarningsTest.java | 68 ++++++++++++++----- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 21cbdfa69170d..3a9e6925b4d49 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -34,18 +35,14 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; import org.apache.ignite.plugin.extensions.communication.Message; @@ -60,7 +57,6 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; -import org.apache.ignite.util.TestTcpCommunicationSpi; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; @@ -144,37 +140,75 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs return cfg; } + /** + * + */ @Test public void testSingleMessageErrorWarnings() throws Exception { - long waitingTimeout = 5_000; + final long waitingTimeout = 5_000; LogListener logListener = LogListener.matches("Failed to send local partitions").atLeast(1).build(); - testLog = new ListeningTestLogger(log, logListener); - spiFactory = TestRecordingCommunicationSpi::new; + final CountDownLatch beforeSend = new CountDownLatch(1); + final CountDownLatch proceed = new CountDownLatch(1); - IgniteEx crd = startGrid(0); + final AtomicReference crdIdRef = new AtomicReference<>(); - IgniteEx node1 = startGrid(1); + spiFactory = () -> new TcpCommunicationSpi() { + @Override public void sendMessage( + ClusterNode node, + Message msg, + IgniteInClosure ackC + ) throws IgniteSpiException { + boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; + UUID crdId = crdIdRef.get(); + + if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id()) ) { + beforeSend.countDown(); + try { + if (!proceed.await(waitingTimeout, TimeUnit.MILLISECONDS)) + throw new IgniteSpiException("Test timeout waiting to proceed"); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IgniteSpiException("Interrupted while waiting to proceed", e); + } + } + + super.sendMessage(node, msg, ackC); + } + }; + + IgniteEx crd = startGrid(0); + IgniteEx node1 = startGrid(1); awaitPartitionMapExchange(); - TestRecordingCommunicationSpi.spi(node1).blockMessages(GridDhtPartitionsSingleMessage.class, crd.name()); + crdIdRef.set(crd.localNode().id()); IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { - node1.context().cache().context().exchange().refreshPartitions(); // works if breakpoint + step over + node1.context().cache().context().exchange().refreshPartitions(); }); - TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(); + boolean entered = false; + try { + assertTrue("Did not enter sendMessage() in time", + entered = beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); - stopGrid(0); + stopGrid(0); - TestRecordingCommunicationSpi.spi(grid(1)).stopBlock(); + proceed.countDown(); - assertTrue("Expected log not found", - GridTestUtils.waitForCondition(logListener::check, 3000)); + fut.get(waitingTimeout); + assertTrue("Expected log not found", + GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); + } + finally { + proceed.countDown(); + if (!entered) beforeSend.countDown(); + } } /** From f2752e2b52abf3801f00b626ebb4b97df66d6968 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Mon, 18 Aug 2025 13:11:49 +0300 Subject: [PATCH 10/18] IGNITE-25793 Minor fix revert useless changes --- .../cache/GridCachePartitionExchangeManager.java | 7 ++++--- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 1 - .../internal/util/distributed/DistributedProcess.java | 6 ++++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d3b985c62d39e..c97815bcdb9ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1315,9 +1315,10 @@ private void sendAllPartitions( cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + - node.id() + ", msg=" + m + ']'); + if (log.isDebugEnabled()) { + log.debug("Failed to send partition update to node because it left grid (will ignore) " + + "[node=" + node.id() + ", msg=" + m + ']'); + } } catch (IgniteCheckedException e) { failedNodes.add(node); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b356630dc9640..233f3ef9f8d45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2155,7 +2155,6 @@ else if (localJoinExchange()) cctx.io().send(node, msg, SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { - long retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); log.warning( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index ab6136714a4b4..ee5a99931df3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -306,8 +306,10 @@ private void sendSingleMessage(Process p) { } catch (ClusterTopologyCheckedException e) { // The coordinator has failed. The single message will be sent when a new coordinator initialized. - log.warning("Failed to send a single message to coordinator: [crdId=" + crdId + - ", processId=" + p.id + ", error=" + e.getMessage() + ']'); + if (log.isDebugEnabled()) { + log.debug("Failed to send a single message to coordinator: [crdId=" + crdId + + ", processId=" + p.id + ", error=" + e.getMessage() + ']'); + } } catch (IgniteCheckedException e) { log.error("Unable to send message to coordinator.", e); From 10ae71e97675f6e891647b30fe5b5fc81abc1c51 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Tue, 3 Mar 2026 23:33:27 +0300 Subject: [PATCH 11/18] fix reviewed --- .../GridDhtPartitionsExchangeFuture.java | 2 +- ...ePartitionExchangeManagerWarningsTest.java | 45 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e592a88f1d179..dc83daa93fde5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2257,7 +2257,7 @@ private void sendPartitions(ClusterNode oldestNode) { sendLocalPartitions(oldestNode); } catch (ClusterTopologyCheckedException e) { - log.warning("Coordinator left during partition exchange, will retry [nodeId=" + oldestNode.id() + + log.warning("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']', e); } catch (IgniteCheckedException e) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 3a9e6925b4d49..a491a5adbcd2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; - import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -86,7 +85,7 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs private ListeningTestLogger testLog; /** */ - private volatile Supplier spiFactory = CustomTcpCommunicationSpi::new; + private Supplier spiSupp = CustomTcpCommunicationSpi::new; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { @@ -108,10 +107,8 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs if (testLog != null) testLog.clearListeners(); - spiFactory = CustomTcpCommunicationSpi::new; - + spiSupp = CustomTcpCommunicationSpi::new; testLog = null; - lifecycleBean = null; stopAllGrids(); @@ -121,7 +118,7 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setCommunicationSpi(spiFactory.get()); + cfg.setCommunicationSpi(spiSupp.get()); if (testLog != null) cfg.setGridLogger(testLog); @@ -145,17 +142,18 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs */ @Test public void testSingleMessageErrorWarnings() throws Exception { - final long waitingTimeout = 5_000; + long waitingTimeout = 5_000; + String logSubstr = "Failed to send local partitions [nodeId="; - LogListener logListener = LogListener.matches("Failed to send local partitions").atLeast(1).build(); + LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); testLog = new ListeningTestLogger(log, logListener); - final CountDownLatch beforeSend = new CountDownLatch(1); - final CountDownLatch proceed = new CountDownLatch(1); + CountDownLatch beforeSendLatch = new CountDownLatch(1); + CountDownLatch proceedLatch = new CountDownLatch(1); - final AtomicReference crdIdRef = new AtomicReference<>(); + AtomicReference crdIdRef = new AtomicReference<>(); - spiFactory = () -> new TcpCommunicationSpi() { + spiSupp = () -> new TcpCommunicationSpi() { @Override public void sendMessage( ClusterNode node, Message msg, @@ -164,11 +162,11 @@ public void testSingleMessageErrorWarnings() throws Exception { boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; UUID crdId = crdIdRef.get(); - if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id()) ) { - beforeSend.countDown(); + if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id())) { + beforeSendLatch.countDown(); try { - if (!proceed.await(waitingTimeout, TimeUnit.MILLISECONDS)) + if (!proceedLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)) throw new IgniteSpiException("Test timeout waiting to proceed"); } catch (InterruptedException e) { @@ -183,22 +181,22 @@ public void testSingleMessageErrorWarnings() throws Exception { IgniteEx crd = startGrid(0); IgniteEx node1 = startGrid(1); + awaitPartitionMapExchange(); crdIdRef.set(crd.localNode().id()); - IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { - node1.context().cache().context().exchange().refreshPartitions(); - }); + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> + node1.context().cache().context().exchange().refreshPartitions()); boolean entered = false; try { - assertTrue("Did not enter sendMessage() in time", - entered = beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); + assertTrue("Did not enter sendMessage() in time", entered = + beforeSendLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)); stopGrid(0); - proceed.countDown(); + proceedLatch.countDown(); fut.get(waitingTimeout); @@ -206,8 +204,9 @@ public void testSingleMessageErrorWarnings() throws Exception { GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); } finally { - proceed.countDown(); - if (!entered) beforeSend.countDown(); + proceedLatch.countDown(); + + if (!entered) beforeSendLatch.countDown(); } } From d823efe294db96828ab9938464aeb809e2b2b044 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Wed, 4 Mar 2026 12:38:38 +0300 Subject: [PATCH 12/18] fix minor --- ...ridCachePartitionExchangeManagerWarningsTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index a491a5adbcd2d..fd9800ca7e8af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -154,11 +154,9 @@ public void testSingleMessageErrorWarnings() throws Exception { AtomicReference crdIdRef = new AtomicReference<>(); spiSupp = () -> new TcpCommunicationSpi() { - @Override public void sendMessage( - ClusterNode node, - Message msg, - IgniteInClosure ackC - ) throws IgniteSpiException { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) + throws IgniteSpiException { boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; UUID crdId = crdIdRef.get(); @@ -171,6 +169,7 @@ public void testSingleMessageErrorWarnings() throws Exception { } catch (InterruptedException e) { Thread.currentThread().interrupt(); + throw new IgniteSpiException("Interrupted while waiting to proceed", e); } } @@ -200,8 +199,7 @@ public void testSingleMessageErrorWarnings() throws Exception { fut.get(waitingTimeout); - assertTrue("Expected log not found", - GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); + assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); } finally { proceedLatch.countDown(); From 05667813accac1cb8cb074ec56d2c488ce8f529b Mon Sep 17 00:00:00 2001 From: Didar Shayarov <75740594+w3ll1ngt@users.noreply.github.com> Date: Wed, 4 Mar 2026 13:43:36 +0300 Subject: [PATCH 13/18] Update GridCachePartitionExchangeManager.java Co-authored-by: Dmitry Werner --- .../processors/cache/GridCachePartitionExchangeManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index af6f6e4ae1698..2e74d95139726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1500,7 +1500,7 @@ private void sendLocalPartitions( cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { - log.warning("Failed to send local partitions [nodeId=" + node.id() + ", exchId=" + id + ']', e); + log.warning("Failed to send local partitions to node because it left grid [nodeId=" + node.id() + ", exchId=" + id + ']', e); } catch (IgniteCheckedException e) { U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e); From 9732f76ce51ba7b2a891727ebed6be38596103e4 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Wed, 4 Mar 2026 15:20:22 +0300 Subject: [PATCH 14/18] Update log substring --- .../GridCachePartitionExchangeManagerWarningsTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index fd9800ca7e8af..03f5a981580ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -107,9 +107,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs if (testLog != null) testLog.clearListeners(); - spiSupp = CustomTcpCommunicationSpi::new; testLog = null; lifecycleBean = null; + spiSupp = CustomTcpCommunicationSpi::new; stopAllGrids(); } @@ -143,7 +143,7 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Test public void testSingleMessageErrorWarnings() throws Exception { long waitingTimeout = 5_000; - String logSubstr = "Failed to send local partitions [nodeId="; + String logSubstr = "Failed to send local partitions to node because it left grid [nodeId="; LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); testLog = new ListeningTestLogger(log, logListener); @@ -156,7 +156,7 @@ public void testSingleMessageErrorWarnings() throws Exception { spiSupp = () -> new TcpCommunicationSpi() { /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) - throws IgniteSpiException { + throws IgniteSpiException { boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; UUID crdId = crdIdRef.get(); @@ -191,7 +191,7 @@ public void testSingleMessageErrorWarnings() throws Exception { boolean entered = false; try { assertTrue("Did not enter sendMessage() in time", entered = - beforeSendLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)); + beforeSendLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)); stopGrid(0); From 970a2903233b4bb4d3fbd43c907fa5271bf76b5b Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Wed, 4 Mar 2026 15:20:22 +0300 Subject: [PATCH 15/18] Update log substring --- ...ePartitionExchangeManagerWarningsTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index fd9800ca7e8af..b9d60525aebaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -107,9 +107,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs if (testLog != null) testLog.clearListeners(); - spiSupp = CustomTcpCommunicationSpi::new; testLog = null; lifecycleBean = null; + spiSupp = CustomTcpCommunicationSpi::new; stopAllGrids(); } @@ -143,28 +143,28 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs @Test public void testSingleMessageErrorWarnings() throws Exception { long waitingTimeout = 5_000; - String logSubstr = "Failed to send local partitions [nodeId="; + String logSubstr = "Failed to send local partitions to node because it left grid [nodeId="; LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); testLog = new ListeningTestLogger(log, logListener); - CountDownLatch beforeSendLatch = new CountDownLatch(1); - CountDownLatch proceedLatch = new CountDownLatch(1); + CountDownLatch beforeSend = new CountDownLatch(1); + CountDownLatch proceed = new CountDownLatch(1); AtomicReference crdIdRef = new AtomicReference<>(); spiSupp = () -> new TcpCommunicationSpi() { /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) - throws IgniteSpiException { + throws IgniteSpiException { boolean isSingleMsg = ((GridIoMessage)msg).message() instanceof GridDhtPartitionsSingleMessage; UUID crdId = crdIdRef.get(); if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id())) { - beforeSendLatch.countDown(); + beforeSend.countDown(); try { - if (!proceedLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)) + if (!proceed.await(waitingTimeout, TimeUnit.MILLISECONDS)) throw new IgniteSpiException("Test timeout waiting to proceed"); } catch (InterruptedException e) { @@ -191,20 +191,20 @@ public void testSingleMessageErrorWarnings() throws Exception { boolean entered = false; try { assertTrue("Did not enter sendMessage() in time", entered = - beforeSendLatch.await(waitingTimeout, TimeUnit.MILLISECONDS)); + beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); stopGrid(0); - proceedLatch.countDown(); + proceed.countDown(); fut.get(waitingTimeout); assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); } finally { - proceedLatch.countDown(); + proceed.countDown(); - if (!entered) beforeSendLatch.countDown(); + if (!entered) beforeSend.countDown(); } } From 680cc58feb6d3b6fdfc16aa266448f53afb8a07c Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Wed, 4 Mar 2026 16:55:20 +0300 Subject: [PATCH 16/18] Remove excessive finally logic in test --- .../GridCachePartitionExchangeManagerWarningsTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index b9d60525aebaf..12bfe30881a78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -188,10 +188,8 @@ public void testSingleMessageErrorWarnings() throws Exception { IgniteInternalFuture fut = GridTestUtils.runAsync(() -> node1.context().cache().context().exchange().refreshPartitions()); - boolean entered = false; try { - assertTrue("Did not enter sendMessage() in time", entered = - beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); + assertTrue("Did not enter sendMessage() in time", beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); stopGrid(0); @@ -202,9 +200,8 @@ public void testSingleMessageErrorWarnings() throws Exception { assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); } finally { + beforeSend.countDown(); proceed.countDown(); - - if (!entered) beforeSend.countDown(); } } From d500f7aa7df07fe20e6db416250b24e582ac892b Mon Sep 17 00:00:00 2001 From: Didar Shayarov <75740594+w3ll1ngt@users.noreply.github.com> Date: Fri, 13 Mar 2026 13:56:30 +0300 Subject: [PATCH 17/18] Apply suggestions from code review Co-authored-by: Ilya Shishkov --- ...CachePartitionExchangeManagerWarningsTest.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 12bfe30881a78..921f0d588f54c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -148,8 +148,8 @@ public void testSingleMessageErrorWarnings() throws Exception { LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); testLog = new ListeningTestLogger(log, logListener); - CountDownLatch beforeSend = new CountDownLatch(1); - CountDownLatch proceed = new CountDownLatch(1); + CountDownLatch singleMsgLatch = new CountDownLatch(1); + CountDownLatch nodeStopLatch = new CountDownLatch(1); AtomicReference crdIdRef = new AtomicReference<>(); @@ -161,11 +161,10 @@ public void testSingleMessageErrorWarnings() throws Exception { UUID crdId = crdIdRef.get(); if (isSingleMsg && node != null && crdId != null && crdId.equals(node.id())) { - beforeSend.countDown(); + singleMsgLatch.countDown(); try { - if (!proceed.await(waitingTimeout, TimeUnit.MILLISECONDS)) - throw new IgniteSpiException("Test timeout waiting to proceed"); + nodeStopLatch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -189,15 +188,15 @@ public void testSingleMessageErrorWarnings() throws Exception { node1.context().cache().context().exchange().refreshPartitions()); try { - assertTrue("Did not enter sendMessage() in time", beforeSend.await(waitingTimeout, TimeUnit.MILLISECONDS)); + singleMsgLatch.await(); stopGrid(0); - proceed.countDown(); + nodeStopLatch.countDown(); fut.get(waitingTimeout); - assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, waitingTimeout)); + assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, getTestTimeout())); } finally { beforeSend.countDown(); From 75a6173024ed7a1352fdcf7eea3611edab327141 Mon Sep 17 00:00:00 2001 From: Didar Shayarov Date: Fri, 13 Mar 2026 15:02:38 +0300 Subject: [PATCH 18/18] Fix tests with timeout removal --- .../preloader/GridDhtPartitionsExchangeFuture.java | 3 +-- ...GridCachePartitionExchangeManagerWarningsTest.java | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index dc83daa93fde5..9a0549e06d48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -4418,8 +4418,7 @@ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSi cctx.io().send(node, res, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); + log.warning("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java index 921f0d588f54c..8a404d6fe5690 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java @@ -142,7 +142,6 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs */ @Test public void testSingleMessageErrorWarnings() throws Exception { - long waitingTimeout = 5_000; String logSubstr = "Failed to send local partitions to node because it left grid [nodeId="; LogListener logListener = LogListener.matches(logSubstr).atLeast(1).build(); @@ -164,7 +163,7 @@ public void testSingleMessageErrorWarnings() throws Exception { singleMsgLatch.countDown(); try { - nodeStopLatch.await(); + nodeStopLatch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -194,13 +193,13 @@ public void testSingleMessageErrorWarnings() throws Exception { nodeStopLatch.countDown(); - fut.get(waitingTimeout); - assertTrue("Expected log not found", GridTestUtils.waitForCondition(logListener::check, getTestTimeout())); } finally { - beforeSend.countDown(); - proceed.countDown(); + singleMsgLatch.countDown(); + nodeStopLatch.countDown(); + + fut.cancel(); } }