Skip to content

Commit 9680bd8

Browse files
authored
IGNITE-27294 Use MessageSerializer for TcpDiscoveryHandshakeRequest (#12568)
1 parent 99b88b9 commit 9680bd8

4 files changed

Lines changed: 33 additions & 25 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
2222
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
2323
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
24+
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
2425
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
2526
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
2627
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
@@ -31,6 +32,7 @@
3132
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
3233
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
3334
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
35+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
3436
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
3537
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
3638
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -48,5 +50,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
4850
factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer());
4951
factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer());
5052
factory.register((short)7, TcpDiscoveryRingLatencyCheckMessage::new, new TcpDiscoveryRingLatencyCheckMessageSerializer());
53+
factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer());
5154
}
5255
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3546,7 +3546,7 @@ else if (log.isTraceEnabled())
35463546
boolean changeTop = sndState != null && !sndState.isStartingPoint();
35473547

35483548
if (changeTop)
3549-
hndMsg.changeTopology(ring.previousNodeOf(next).id());
3549+
hndMsg.previousNodeId(ring.previousNodeOf(next).id());
35503550

35513551
if (log.isDebugEnabled()) {
35523552
log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
@@ -6879,7 +6879,7 @@ else if (log.isDebugEnabled())
68796879
}
68806880
}
68816881
}
6882-
else if (req.changeTopology()) {
6882+
else if (req.previousNodeId() != null) {
68836883
// Node cannot connect to it's next (for local node it's previous).
68846884
// Need to check connectivity to it.
68856885
long rcvdTime = lastRingMsgReceivedTime;
@@ -6904,7 +6904,7 @@ else if (req.changeTopology()) {
69046904
InetSocketAddress liveAddr = null;
69056905

69066906
if (previous != null && !previous.id().equals(nodeId) &&
6907-
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
6907+
(req.previousNodeId() == null || previous.id().equals(req.previousNodeId()))) {
69086908

69096909
// The connection recovery connection to one node is connCheckTick.
69106910
// We need to suppose network delays. So we use half of this time.
@@ -6927,7 +6927,7 @@ else if (req.changeTopology()) {
69276927

69286928
if (log.isInfoEnabled()) {
69296929
log.info("Previous node alive status [alive=" + ok +
6930-
", checkPreviousNodeId=" + req.checkPreviousNodeId() +
6930+
", checkPreviousNodeId=" + req.previousNodeId() +
69316931
", actualPreviousNode=" + previous +
69326932
", lastMessageReceivedTime=" + rcvdTime + ", now=" + now +
69336933
", connCheckInterval=" + connCheckInterval + ']');

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,58 +18,58 @@
1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2123
import org.apache.ignite.internal.util.typedef.internal.S;
24+
import org.apache.ignite.plugin.extensions.communication.Message;
2225
import org.jetbrains.annotations.Nullable;
2326

2427
/**
2528
* Handshake request.
2629
*/
27-
public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage {
30+
public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage implements Message {
2831
/** */
2932
private static final long serialVersionUID = 0L;
3033

3134
/** */
32-
private UUID prevNodeId;
35+
@Order(value = 5, method = "previousNodeId")
36+
private @Nullable UUID prevNodeId;
3337

3438
/** */
35-
private String dcId;
39+
@Order(6)
40+
private @Nullable String dcId;
3641

3742
/**
38-
* Constructor.
39-
*
40-
* @param creatorNodeId Creator node ID.
43+
* Default constructor for {@link DiscoveryMessageFactory}.
4144
*/
42-
public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) {
43-
super(creatorNodeId);
45+
public TcpDiscoveryHandshakeRequest() {
46+
// No-op.
4447
}
4548

4649
/**
47-
* Gets topology change flag.<br>
48-
* {@code True} means node intent to fail nodes in a ring.
50+
* Constructor.
4951
*
50-
* @return Change topology flag.
52+
* @param creatorNodeId Creator node ID.
5153
*/
52-
public boolean changeTopology() {
53-
return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
54+
public TcpDiscoveryHandshakeRequest(UUID creatorNodeId) {
55+
super(creatorNodeId);
5456
}
5557

5658
/**
5759
* Gets expected previous node ID to check.
5860
*
5961
* @return Previous node ID to check.
6062
*/
61-
public UUID checkPreviousNodeId() {
63+
public @Nullable UUID previousNodeId() {
6264
return prevNodeId;
6365
}
6466

6567
/**
66-
* Sets topology change flag and previous node ID to check.<br>
68+
* Sets topology change request and previous node ID to check.<br>
6769
*
68-
* @param prevNodeId If not {@code null}, will set topology check flag and set node ID to check.
70+
* @param prevNodeId If not {@code null}, will set topology check request and node ID to check.
6971
*/
70-
public void changeTopology(UUID prevNodeId) {
71-
setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeId != null);
72-
72+
public void previousNodeId(@Nullable UUID prevNodeId) {
7373
this.prevNodeId = prevNodeId;
7474
}
7575

@@ -83,9 +83,14 @@ public void dcId(String dcId) {
8383
this.dcId = dcId;
8484
}
8585

86+
/** {@inheritDoc} */
87+
@Override public short directType() {
88+
return 8;
89+
}
90+
8691
/** {@inheritDoc} */
8792
@Override public String toString() {
8893
return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),
89-
"isChangeTopology", changeTopology());
94+
"isChangeTopology", prevNodeId != null);
9095
}
9196
}

modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended() throws Ex
307307

308308
// Request to establish new permanent cluster connection from doubting node0 to node2.
309309
testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> {
310-
if (hsRq.changeTopology() && frozenNodeId.equals(hsRq.checkPreviousNodeId())) {
310+
if (hsRq.previousNodeId() != null && frozenNodeId.equals(hsRq.previousNodeId())) {
311311
// Continue simulation of node1 freeze at GC and processes no discovery messages.
312312
testSpi(frozenNode1).addrsToBlock = Collections.emptyList();
313313
}

0 commit comments

Comments
 (0)