diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 3969c9dd31776..2a4832e019352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -114,6 +114,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageMarshallableSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; @@ -199,6 +201,8 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer()); factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); + factory.register((short)29, TcpDiscoveryNodeAddedMessage::new, + new TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); // DiscoveryCustomMessage factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 3eaf562e5a7fb..1e38accc11252 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -50,7 +50,10 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess @GridToStringExclude private Collection msgs; - /** Srialized bytes of {@link #msgs}. */ + /** + * TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883 + * Srialized bytes of {@link #msgs}. + */ @Order(2) byte[] msgsBytes; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 451689eabe008..c11bee62e8513 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -123,6 +123,7 @@ public Map clientNodeAttributes() { */ public void clientNodeAttributes(Map clientNodeAttrs) { this.clientNodeAttrs = clientNodeAttrs; + clientNodeAttrsBytes = null; } /** {@inheritDoc} */ @@ -135,6 +136,8 @@ public void clientNodeAttributes(Map clientNodeAttrs) { @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (clientNodeAttrsBytes != null) clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr); + + clientNodeAttrsBytes = null; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 36540d8b7dfc1..e4c6bddb008f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -20,37 +20,64 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; /** + * TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode} after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** Added node. */ - private final TcpDiscoveryNode node; + private TcpDiscoveryNode node; + + /** Marshalled {@link #node}. */ + @Order(0) + @GridToStringExclude + byte[] nodeBytes; /** */ - private DiscoveryDataPacket dataPacket; + @Order(1) + DiscoveryDataPacket dataPacket; /** Pending messages from previous node. */ private Collection msgs; + /** + * TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883 + * Marshalled {@link #msgs}. + */ + @Order(2) + @GridToStringExclude + byte[] msgsBytes; + /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection top; + /** Marshalled {@link #top}. */ + @Order(3) + @GridToStringExclude + @Nullable byte[] topBytes; + /** */ @GridToStringInclude private transient Collection clientTop; @@ -58,8 +85,19 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM /** Topology snapshots history. */ private Map> topHist; + /** Marshalled {@link #topHist}. */ + @Order(4) + @GridToStringExclude + @Nullable byte[] topHistBytes; + /** Start time of the first grid node. */ - private final long gridStartTime; + @Order(5) + long gridStartTime; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } /** * Constructor. @@ -69,7 +107,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM * @param dataPacket container for collecting discovery data across the cluster. * @param gridStartTime Start time of the first grid node. */ - public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, + public TcpDiscoveryNodeAddedMessage( + UUID creatorNodeId, TcpDiscoveryNode node, DiscoveryDataPacket dataPacket, long gridStartTime @@ -90,13 +129,17 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - this.node = msg.node; - this.msgs = msg.msgs; - this.top = msg.top; - this.clientTop = msg.clientTop; - this.topHist = msg.topHist; - this.dataPacket = msg.dataPacket; - this.gridStartTime = msg.gridStartTime; + node = msg.node; + nodeBytes = msg.nodeBytes; + msgs = msg.msgs; + msgsBytes = msg.msgsBytes; + top = msg.top; + topBytes = msg.topBytes; + clientTop = msg.clientTop; + topHist = msg.topHist; + topHistBytes = msg.topHistBytes; + dataPacket = msg.dataPacket; + gridStartTime = msg.gridStartTime; } /** @@ -122,10 +165,9 @@ public TcpDiscoveryNode node() { * * @param msgs Pending messages to send to new node. */ - public void messages( - @Nullable Collection msgs - ) { + public void messages(@Nullable Collection msgs) { this.msgs = msgs; + msgsBytes = null; } /** @@ -144,6 +186,7 @@ public void messages( */ public void topology(@Nullable Collection top) { this.top = top; + topBytes = null; } /** @@ -152,7 +195,7 @@ public void topology(@Nullable Collection top) { public void clientTopology(Collection top) { assert top != null && !top.isEmpty() : top; - this.clientTop = top; + clientTop = top; } /** @@ -178,6 +221,7 @@ public Map> topologyHistory() { */ public void topologyHistory(@Nullable Map> topHist) { this.topHist = topHist; + topHistBytes = null; } /** @@ -210,6 +254,47 @@ public long gridStartTime() { return gridStartTime; } + /** @param marsh marshaller. */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (node != null) + nodeBytes = U.marshal(marsh, node); + + if (msgs != null) + msgsBytes = U.marshal(marsh, msgs); + + if (top != null) + topBytes = U.marshal(marsh, top); + + if (topHist != null) + topHistBytes = U.marshal(marsh, topHist); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (nodeBytes != null) + node = U.unmarshal(marsh, nodeBytes, clsLdr); + + if (msgsBytes != null) + msgs = U.unmarshal(marsh, msgsBytes, clsLdr); + + if (topBytes != null) + top = U.unmarshal(marsh, topBytes, clsLdr); + + if (topHistBytes != null) + topHist = U.unmarshal(marsh, topHistBytes, clsLdr); + + nodeBytes = null; + topBytes = null; + topHistBytes = null; + msgsBytes = null; + } + + + /** {@inheritDoc} */ + @Override public short directType() { + return 29; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString());