Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
Expand All @@ -31,6 +32,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
Expand All @@ -48,5 +50,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer());
factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer());
factory.register((short)7, TcpDiscoveryRingLatencyCheckMessage::new, new TcpDiscoveryRingLatencyCheckMessageSerializer());
factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3546,7 +3546,7 @@ else if (log.isTraceEnabled())
boolean changeTop = sndState != null && !sndState.isStartingPoint();

if (changeTop)
hndMsg.changeTopology(ring.previousNodeOf(next).id());
hndMsg.previousNodeId(ring.previousNodeOf(next).id());

if (log.isDebugEnabled()) {
log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
Expand Down Expand Up @@ -6879,7 +6879,7 @@ else if (log.isDebugEnabled())
}
}
}
else if (req.changeTopology()) {
else if (req.previousNodeId() != null) {
// Node cannot connect to it's next (for local node it's previous).
// Need to check connectivity to it.
long rcvdTime = lastRingMsgReceivedTime;
Expand All @@ -6904,7 +6904,7 @@ else if (req.changeTopology()) {
InetSocketAddress liveAddr = null;

if (previous != null && !previous.id().equals(nodeId) &&
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
(req.previousNodeId() == null || previous.id().equals(req.previousNodeId()))) {

// The connection recovery connection to one node is connCheckTick.
// We need to suppose network delays. So we use half of this time.
Expand All @@ -6927,7 +6927,7 @@ else if (req.changeTopology()) {

if (log.isInfoEnabled()) {
log.info("Previous node alive status [alive=" + ok +
", checkPreviousNodeId=" + req.checkPreviousNodeId() +
", checkPreviousNodeId=" + req.previousNodeId() +
", actualPreviousNode=" + previous +
", lastMessageReceivedTime=" + rcvdTime + ", now=" + now +
", connCheckInterval=" + connCheckInterval + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,58 @@
package org.apache.ignite.spi.discovery.tcp.messages;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* Handshake request.
*/
public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage {
public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

/** */
private UUID prevNodeId;
@Order(value = 5, method = "previousNodeId")
private @Nullable UUID prevNodeId;

/** */
private String dcId;
@Order(6)
private @Nullable String dcId;

/**
* Constructor.
*
* @param creatorNodeId Creator node ID.
* Default constructor for {@link DiscoveryMessageFactory}.
*/
public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) {
super(creatorNodeId);
public TcpDiscoveryHandshakeRequest() {
// No-op.
}

/**
* Gets topology change flag.<br>
* {@code True} means node intent to fail nodes in a ring.
* Constructor.
*
* @return Change topology flag.
* @param creatorNodeId Creator node ID.
*/
public boolean changeTopology() {
return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) {
super(creatorNodeId);
}

/**
* Gets expected previous node ID to check.
*
* @return Previous node ID to check.
*/
public UUID checkPreviousNodeId() {
public @Nullable UUID previousNodeId() {
return prevNodeId;
}

/**
* Sets topology change flag and previous node ID to check.<br>
* Sets topology change request and previous node ID to check.<br>
*
* @param prevNodeId If not {@code null}, will set topology check flag and set node ID to check.
* @param prevNodeId If not {@code null}, will set topology check request and node ID to check.
*/
public void changeTopology(UUID prevNodeId) {
setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeId != null);

public void previousNodeId(@Nullable UUID prevNodeId) {
this.prevNodeId = prevNodeId;
}

Expand All @@ -83,9 +83,14 @@ public void dcId(String dcId) {
this.dcId = dcId;
}

/** {@inheritDoc} */
@Override public short directType() {
return 8;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),
"isChangeTopology", changeTopology());
"isChangeTopology", prevNodeId != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended() throws Ex

// Request to establish new permanent cluster connection from doubting node0 to node2.
testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> {
if (hsRq.changeTopology() && frozenNodeId.equals(hsRq.checkPreviousNodeId())) {
if (hsRq.previousNodeId() != null && frozenNodeId.equals(hsRq.previousNodeId())) {
// Continue simulation of node1 freeze at GC and processes no discovery messages.
testSpi(frozenNode1).addrsToBlock = Collections.emptyList();
}
Expand Down
Loading