Skip to content

Commit 575b930

Browse files
committed
IGNITE-26111 TcpDiscoverySpi uses MessageSerializer
1 parent b73d260 commit 575b930

12 files changed

Lines changed: 245 additions & 23 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
4444
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
4545
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
46+
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
4647
import org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
4748
import org.apache.ignite.internal.codegen.TxLockSerializer;
4849
import org.apache.ignite.internal.codegen.UserAuthenticateRequestMessageSerializer;
@@ -191,6 +192,7 @@
191192
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
192193
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
193194
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
195+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
194196

195197
/**
196198
* Message factory implementation.
@@ -371,5 +373,8 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
371373
// [2048..2053] - Snapshots
372374
// [-42..-37] - former hadoop.
373375
// [64..71] - former IGFS.
376+
377+
// Discovery messages.
378+
factory.register((short)-1000, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
374379
}
375380
}

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ public GridDiscoveryManager(GridKernalContext ctx) {
345345
DiscoverySpi spi = getSpi();
346346

347347
spi.setNodeAttributes(ctx.nodeAttributes(), VER);
348+
spi.setMessageFactory(ctx.io().messageFactory());
348349
}
349350

350351
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
2626
import org.apache.ignite.lang.IgniteProductVersion;
27+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2728
import org.apache.ignite.spi.IgniteSpiAdapter;
2829
import org.apache.ignite.spi.IgniteSpiException;
2930
import org.apache.ignite.spi.IgniteSpiNoop;
@@ -70,6 +71,11 @@ public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements Disc
7071

7172
}
7273

74+
/** {@inheritDoc} */
75+
@Override public void setMessageFactory(MessageFactory msgFactory) {
76+
77+
}
78+
7379
/** {@inheritDoc} */
7480
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
7581

modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
2626
import org.apache.ignite.lang.IgniteProductVersion;
27+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2728
import org.apache.ignite.spi.IgniteSpi;
2829
import org.apache.ignite.spi.IgniteSpiException;
2930
import org.jetbrains.annotations.Nullable;
@@ -94,6 +95,11 @@ public interface DiscoverySpi extends IgniteSpi {
9495
*/
9596
public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
9697

98+
/**
99+
* @param msgFactory Message factory.
100+
*/
101+
public void setMessageFactory(MessageFactory msgFactory);
102+
97103
/**
98104
* Sets a listener for discovery events. Refer to
99105
* {@link org.apache.ignite.events.DiscoveryEvent} for a set of all possible

modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.ignite.lang.IgniteFuture;
3636
import org.apache.ignite.lang.IgniteProductVersion;
3737
import org.apache.ignite.marshaller.Marshaller;
38+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3839
import org.apache.ignite.plugin.security.SecurityCredentials;
3940
import org.apache.ignite.spi.IgniteSpiAdapter;
4041
import org.apache.ignite.spi.IgniteSpiContext;
@@ -129,6 +130,11 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDisc
129130
locNode = new IsolatedNode(ignite.configuration().getNodeId(), attrs, ver);
130131
}
131132

133+
/** {@inheritDoc} */
134+
@Override public void setMessageFactory(MessageFactory msgFactory) {
135+
// Np-op.
136+
}
137+
132138
/** {@inheritDoc} */
133139
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
134140
this.lsnr = lsnr;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ private void forceStopRead() throws InterruptedException {
11861186
TcpDiscoveryAbstractMessage msg;
11871187

11881188
try {
1189-
msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
1189+
msg = spi.readMessage(sock, in, sock.getSoTimeout());
11901190
}
11911191
catch (IgniteCheckedException e) {
11921192
if (log.isDebugEnabled())
@@ -1616,8 +1616,7 @@ public void cancel() {
16161616
List<TcpDiscoveryAbstractMessage> msgs = null;
16171617

16181618
while (!isInterrupted()) {
1619-
TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
1620-
U.resolveClassLoader(spi.ignite().configuration()));
1619+
TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, sock.getSoTimeout());
16211620

16221621
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
16231622
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3376,7 +3376,6 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
33763376
}
33773377

33783378
TcpDiscoveryAbstractMessage msg0 = msg;
3379-
byte[] msgBytes0 = msgBytes;
33803379

33813380
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
33823381
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
@@ -3389,16 +3388,14 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
33893388
U.resolveClassLoader(spi.ignite().configuration()));
33903389

33913390
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null);
3392-
3393-
msgBytes0 = null;
33943391
}
33953392
catch (IgniteCheckedException e) {
33963393
U.error(log, "Failed to create message copy: " + msg, e);
33973394
}
33983395
}
33993396
}
34003397

3401-
clientMsgWorker.addMessage(msg0, msgBytes0);
3398+
clientMsgWorker.addMessage(msg0);
34023399
}
34033400
}
34043401
}
@@ -6985,8 +6982,7 @@ else if (e.hasCause(ObjectStreamException.class) ||
69856982
try {
69866983
SecurityUtils.serializeVersion(1);
69876984

6988-
TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
6989-
U.resolveClassLoader(spi.ignite().configuration()));
6985+
TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, sock.getSoTimeout());
69906986

69916987
msg.senderNodeId(nodeId);
69926988

@@ -7758,11 +7754,6 @@ void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
77587754
try {
77597755
assert msg.verified() : msg;
77607756

7761-
byte[] msgBytes = msgT.get2();
7762-
7763-
if (msgBytes == null)
7764-
msgBytes = U.marshal(spi.marshaller(), msg);
7765-
77667757
DebugLogger msgLog = messageLogger(msg);
77677758

77687759
if (msg instanceof TcpDiscoveryClientAckResponse) {
@@ -7784,7 +7775,7 @@ else if (msgLog.isDebugEnabled()) {
77847775
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
77857776
}
77867777

7787-
spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
7778+
spi.writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
77887779
spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
77897780
}
77907781
}
@@ -7796,7 +7787,7 @@ else if (msgLog.isDebugEnabled()) {
77967787

77977788
assert topologyInitialized(msg) : msg;
77987789

7799-
spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
7790+
spi.writeToSocket(sock, msg, spi.getEffectiveSocketTimeout(false));
78007791
}
78017792

78027793
boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&

0 commit comments

Comments
 (0)