diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 7786560ad9089..05160eec6bdc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; @@ -31,6 +32,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -48,5 +50,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer()); factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer()); factory.register((short)7, TcpDiscoveryRingLatencyCheckMessage::new, new TcpDiscoveryRingLatencyCheckMessageSerializer()); + factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 470fafa4379d3..d1d15557df0d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3546,7 +3546,7 @@ else if (log.isTraceEnabled()) boolean changeTop = sndState != null && !sndState.isStartingPoint(); if (changeTop) - hndMsg.changeTopology(ring.previousNodeOf(next).id()); + hndMsg.previousNodeId(ring.previousNodeOf(next).id()); if (log.isDebugEnabled()) { log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + @@ -6879,7 +6879,7 @@ else if (log.isDebugEnabled()) } } } - else if (req.changeTopology()) { + else if (req.previousNodeId() != null) { // Node cannot connect to it's next (for local node it's previous). // Need to check connectivity to it. long rcvdTime = lastRingMsgReceivedTime; @@ -6904,7 +6904,7 @@ else if (req.changeTopology()) { InetSocketAddress liveAddr = null; if (previous != null && !previous.id().equals(nodeId) && - (req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) { + (req.previousNodeId() == null || previous.id().equals(req.previousNodeId()))) { // The connection recovery connection to one node is connCheckTick. // We need to suppose network delays. So we use half of this time. @@ -6927,7 +6927,7 @@ else if (req.changeTopology()) { if (log.isInfoEnabled()) { log.info("Previous node alive status [alive=" + ok + - ", checkPreviousNodeId=" + req.checkPreviousNodeId() + + ", checkPreviousNodeId=" + req.previousNodeId() + ", actualPreviousNode=" + previous + ", lastMessageReceivedTime=" + rcvdTime + ", now=" + now + ", connCheckInterval=" + connCheckInterval + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java index 384caf1a46cc7..76c5d68a9f42c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java @@ -18,39 +18,41 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Handshake request. */ -public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private UUID prevNodeId; + @Order(value = 5, method = "previousNodeId") + private @Nullable UUID prevNodeId; /** */ - private String dcId; + @Order(6) + private @Nullable String dcId; /** - * Constructor. - * - * @param creatorNodeId Creator node ID. + * Default constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) { - super(creatorNodeId); + public TcpDiscoveryHandshakeRequest() { + // No-op. } /** - * Gets topology change flag.
- * {@code True} means node intent to fail nodes in a ring. + * Constructor. * - * @return Change topology flag. + * @param creatorNodeId Creator node ID. */ - public boolean changeTopology() { - return getFlag(CHANGE_TOPOLOGY_FLAG_POS); + public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) { + super(creatorNodeId); } /** @@ -58,18 +60,16 @@ public boolean changeTopology() { * * @return Previous node ID to check. */ - public UUID checkPreviousNodeId() { + public @Nullable UUID previousNodeId() { return prevNodeId; } /** - * Sets topology change flag and previous node ID to check.
+ * Sets topology change request and previous node ID to check.
* - * @param prevNodeId If not {@code null}, will set topology check flag and set node ID to check. + * @param prevNodeId If not {@code null}, will set topology check request and node ID to check. */ - public void changeTopology(UUID prevNodeId) { - setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeId != null); - + public void previousNodeId(@Nullable UUID prevNodeId) { this.prevNodeId = prevNodeId; } @@ -83,9 +83,14 @@ public void dcId(String dcId) { this.dcId = dcId; } + /** {@inheritDoc} */ + @Override public short directType() { + return 8; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(), - "isChangeTopology", changeTopology()); + "isChangeTopology", prevNodeId != null); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 3b2c925804430..60db3be1436fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -307,7 +307,7 @@ public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended() throws Ex // Request to establish new permanent cluster connection from doubting node0 to node2. testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> { - if (hsRq.changeTopology() && frozenNodeId.equals(hsRq.checkPreviousNodeId())) { + if (hsRq.previousNodeId() != null && frozenNodeId.equals(hsRq.previousNodeId())) { // Continue simulation of node1 freeze at GC and processes no discovery messages. testSpi(frozenNode1).addrsToBlock = Collections.emptyList(); }