Skip to content

Commit 04c41be

Browse files
authored
IGNITE-27556 : MessageSerializer for TcpDiscoveryNodeAddedMessage v2 simplified (#12790)
1 parent 64fa8a9 commit 04c41be

4 files changed

Lines changed: 112 additions & 17 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@
132132
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
133133
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
134134
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer;
135+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
136+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageMarshallableSerializer;
135137
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
136138
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer;
137139
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
@@ -228,6 +230,8 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
228230
factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());
229231
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
230232
new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
233+
factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
234+
new TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
231235

232236
// DiscoveryCustomMessage
233237
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
5050
@GridToStringExclude
5151
private Collection<TcpDiscoveryAbstractMessage> msgs;
5252

53-
/** Srialized bytes of {@link #msgs}. */
53+
/**
54+
* TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883
55+
* Srialized bytes of {@link #msgs}.
56+
*/
5457
@Order(2)
5558
byte[] msgsBytes;
5659

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public Map<String, Object> clientNodeAttributes() {
123123
*/
124124
public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
125125
this.clientNodeAttrs = clientNodeAttrs;
126+
clientNodeAttrsBytes = null;
126127
}
127128

128129
/** {@inheritDoc} */
@@ -135,6 +136,8 @@ public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
135136
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
136137
if (clientNodeAttrsBytes != null)
137138
clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr);
139+
140+
clientNodeAttrsBytes = null;
138141
}
139142

140143
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,46 +20,84 @@
2020
import java.util.Collection;
2121
import java.util.Map;
2222
import java.util.UUID;
23+
import org.apache.ignite.IgniteCheckedException;
2324
import org.apache.ignite.cluster.ClusterNode;
25+
import org.apache.ignite.internal.Order;
26+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
27+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2428
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2529
import org.apache.ignite.internal.util.typedef.internal.S;
30+
import org.apache.ignite.internal.util.typedef.internal.U;
31+
import org.apache.ignite.marshaller.Marshaller;
32+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
2633
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
2734
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
2835
import org.jetbrains.annotations.Nullable;
2936

3037
/**
38+
* TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode} after https://issues.apache.org/jira/browse/IGNITE-27899
3139
* Message telling nodes that new node should be added to topology.
3240
* When newly added node receives the message it connects to its next and finishes
3341
* join process.
3442
*/
3543
@TcpDiscoveryEnsureDelivery
3644
@TcpDiscoveryRedirectToClient
37-
public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage {
45+
public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage {
3846
/** */
3947
private static final long serialVersionUID = 0L;
4048

4149
/** Added node. */
42-
private final TcpDiscoveryNode node;
50+
private TcpDiscoveryNode node;
51+
52+
/** Marshalled {@link #node}. */
53+
@Order(0)
54+
@GridToStringExclude
55+
byte[] nodeBytes;
4356

4457
/** */
45-
private DiscoveryDataPacket dataPacket;
58+
@Order(1)
59+
DiscoveryDataPacket dataPacket;
4660

4761
/** Pending messages from previous node. */
4862
private Collection<TcpDiscoveryAbstractMessage> msgs;
4963

64+
/**
65+
* TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883
66+
* Marshalled {@link #msgs}.
67+
*/
68+
@Order(2)
69+
@GridToStringExclude
70+
byte[] msgsBytes;
71+
5072
/** Current topology. Initialized by coordinator. */
5173
@GridToStringInclude
5274
private Collection<TcpDiscoveryNode> top;
5375

76+
/** Marshalled {@link #top}. */
77+
@Order(3)
78+
@GridToStringExclude
79+
@Nullable byte[] topBytes;
80+
5481
/** */
5582
@GridToStringInclude
5683
private transient Collection<TcpDiscoveryNode> clientTop;
5784

5885
/** Topology snapshots history. */
5986
private Map<Long, Collection<ClusterNode>> topHist;
6087

88+
/** Marshalled {@link #topHist}. */
89+
@Order(4)
90+
@GridToStringExclude
91+
@Nullable byte[] topHistBytes;
92+
6193
/** Start time of the first grid node. */
62-
private final long gridStartTime;
94+
@Order(5)
95+
long gridStartTime;
96+
97+
/** Constructor for {@link DiscoveryMessageFactory}. */
98+
public TcpDiscoveryNodeAddedMessage() {
99+
// No-op.
100+
}
63101

64102
/**
65103
* Constructor.
@@ -69,7 +107,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM
69107
* @param dataPacket container for collecting discovery data across the cluster.
70108
* @param gridStartTime Start time of the first grid node.
71109
*/
72-
public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
110+
public TcpDiscoveryNodeAddedMessage(
111+
UUID creatorNodeId,
73112
TcpDiscoveryNode node,
74113
DiscoveryDataPacket dataPacket,
75114
long gridStartTime
@@ -90,13 +129,17 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
90129
public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
91130
super(msg);
92131

93-
this.node = msg.node;
94-
this.msgs = msg.msgs;
95-
this.top = msg.top;
96-
this.clientTop = msg.clientTop;
97-
this.topHist = msg.topHist;
98-
this.dataPacket = msg.dataPacket;
99-
this.gridStartTime = msg.gridStartTime;
132+
node = msg.node;
133+
nodeBytes = msg.nodeBytes;
134+
msgs = msg.msgs;
135+
msgsBytes = msg.msgsBytes;
136+
top = msg.top;
137+
topBytes = msg.topBytes;
138+
clientTop = msg.clientTop;
139+
topHist = msg.topHist;
140+
topHistBytes = msg.topHistBytes;
141+
dataPacket = msg.dataPacket;
142+
gridStartTime = msg.gridStartTime;
100143
}
101144

102145
/**
@@ -122,10 +165,9 @@ public TcpDiscoveryNode node() {
122165
*
123166
* @param msgs Pending messages to send to new node.
124167
*/
125-
public void messages(
126-
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs
127-
) {
168+
public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs) {
128169
this.msgs = msgs;
170+
msgsBytes = null;
129171
}
130172

131173
/**
@@ -144,6 +186,7 @@ public void messages(
144186
*/
145187
public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
146188
this.top = top;
189+
topBytes = null;
147190
}
148191

149192
/**
@@ -152,7 +195,7 @@ public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
152195
public void clientTopology(Collection<TcpDiscoveryNode> top) {
153196
assert top != null && !top.isEmpty() : top;
154197

155-
this.clientTop = top;
198+
clientTop = top;
156199
}
157200

158201
/**
@@ -178,6 +221,7 @@ public Map<Long, Collection<ClusterNode>> topologyHistory() {
178221
*/
179222
public void topologyHistory(@Nullable Map<Long, Collection<ClusterNode>> topHist) {
180223
this.topHist = topHist;
224+
topHistBytes = null;
181225
}
182226

183227
/**
@@ -210,6 +254,47 @@ public long gridStartTime() {
210254
return gridStartTime;
211255
}
212256

257+
/** @param marsh marshaller. */
258+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
259+
if (node != null)
260+
nodeBytes = U.marshal(marsh, node);
261+
262+
if (msgs != null)
263+
msgsBytes = U.marshal(marsh, msgs);
264+
265+
if (top != null)
266+
topBytes = U.marshal(marsh, top);
267+
268+
if (topHist != null)
269+
topHistBytes = U.marshal(marsh, topHist);
270+
}
271+
272+
/** {@inheritDoc} */
273+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
274+
if (nodeBytes != null)
275+
node = U.unmarshal(marsh, nodeBytes, clsLdr);
276+
277+
if (msgsBytes != null)
278+
msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
279+
280+
if (topBytes != null)
281+
top = U.unmarshal(marsh, topBytes, clsLdr);
282+
283+
if (topHistBytes != null)
284+
topHist = U.unmarshal(marsh, topHistBytes, clsLdr);
285+
286+
nodeBytes = null;
287+
topBytes = null;
288+
topHistBytes = null;
289+
msgsBytes = null;
290+
}
291+
292+
293+
/** {@inheritDoc} */
294+
@Override public short directType() {
295+
return 29;
296+
}
297+
213298
/** {@inheritDoc} */
214299
@Override public String toString() {
215300
return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString());

0 commit comments

Comments
 (0)