Skip to content

Commit 7d1815f

Browse files
committed
IGNITE-28290 : Utility discovery collection message.
1 parent d9046e5 commit 7d1815f

5 files changed

Lines changed: 182 additions & 61 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,9 @@
137137
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
138138
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
139139
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
140-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageMarshallableSerializer;
140+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
141+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
142+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageMarshallableSerializer;
141143
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
142144
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
143145
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -198,6 +200,9 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
198200

199201
/** {@inheritDoc} */
200202
@Override public void registerAll(MessageFactory factory) {
203+
factory.register((short)-200, TcpDiscoveryCollectionMessage::new,
204+
new TcpDiscoveryCollectionMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
205+
201206
factory.register((short)-115, SchemaAlterTableAddColumnOperation::new,
202207
new SchemaAlterTableAddColumnOperationSerializer());
203208
factory.register((short)-114, SchemaIndexCreateOperation::new,
@@ -254,8 +259,7 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
254259
factory.register((short)25, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer());
255260
factory.register((short)26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer());
256261
factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());
257-
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
258-
new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
262+
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());
259263
factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
260264
new TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
261265
factory.register((short)30, FullMessage::new, new FullMessageSerializer());

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1879,18 +1879,27 @@ private void prepareNodeAddedMessage(
18791879

18801880
nodeAddedMsg.topology(topToSnd);
18811881

1882-
Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
1883-
18841882
if (msgs != null) {
1885-
msgs0 = new ArrayList<>(msgs.size());
1883+
Collection<TcpDiscoveryAbstractMessage> msgs0 = new ArrayList<>(msgs.size());
18861884

18871885
for (PendingMessage pendingMsg : msgs) {
1888-
if (pendingMsg.msg != null)
1886+
if (pendingMsg.msg == null)
1887+
continue;
1888+
1889+
if (pendingMsg.msg == nodeAddedMsg) {
1890+
TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
1891+
msg0 = new TcpDiscoveryNodeAddedMessage(msg0);
1892+
// Removes message self-inclusion and prevents infinite write/read message cycles and stack overflow.
1893+
msg0.messages(null);
1894+
1895+
msgs0.add(msg0);
1896+
}
1897+
else
18891898
msgs0.add(pendingMsg.msg);
18901899
}
1891-
}
18921900

1893-
nodeAddedMsg.messages(msgs0);
1901+
nodeAddedMsg.messages(msgs0);
1902+
}
18941903

18951904
Map<Long, Collection<ClusterNode>> hist;
18961905

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,22 @@
1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

2020
import java.util.Collection;
21+
import java.util.Collections;
2122
import java.util.Objects;
2223
import java.util.UUID;
23-
import org.apache.ignite.IgniteCheckedException;
2424
import org.apache.ignite.internal.Order;
2525
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
26-
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
26+
import org.apache.ignite.internal.util.typedef.F;
2727
import org.apache.ignite.internal.util.typedef.internal.S;
28-
import org.apache.ignite.internal.util.typedef.internal.U;
2928
import org.apache.ignite.lang.IgniteUuid;
30-
import org.apache.ignite.marshaller.Marshaller;
31-
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
29+
import org.apache.ignite.plugin.extensions.communication.Message;
30+
import org.jetbrains.annotations.Nullable;
3231

3332
/**
3433
* Message telling that client node is reconnecting to topology.
3534
*/
3635
@TcpDiscoveryEnsureDelivery
37-
public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements MarshallableMessage {
36+
public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements Message {
3837
/** */
3938
private static final long serialVersionUID = 0L;
4039

@@ -46,16 +45,9 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
4645
@Order(1)
4746
IgniteUuid lastMsgId;
4847

49-
/** Pending messages. */
50-
@GridToStringExclude
51-
private Collection<TcpDiscoveryAbstractMessage> msgs;
52-
53-
/**
54-
* TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883
55-
* Srialized bytes of {@link #msgs}.
56-
*/
48+
/** Pending messages holder. */
5749
@Order(2)
58-
byte[] msgsBytes;
50+
@Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
5951

6052
/** Constructor for {@link DiscoveryMessageFactory}. */
6153
public TcpDiscoveryClientReconnectMessage() {
@@ -91,15 +83,15 @@ public IgniteUuid lastMessageId() {
9183
/**
9284
* @param msgs Pending messages.
9385
*/
94-
public void pendingMessages(Collection<TcpDiscoveryAbstractMessage> msgs) {
95-
this.msgs = msgs;
86+
public void pendingMessages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs) {
87+
pendingMsgsMsg = F.isEmpty(msgs) ? null : new TcpDiscoveryCollectionMessage(msgs);
9688
}
9789

9890
/**
9991
* @return Pending messages.
10092
*/
10193
public Collection<TcpDiscoveryAbstractMessage> pendingMessages() {
102-
return msgs;
94+
return pendingMsgsMsg == null ? Collections.emptyList() : pendingMsgsMsg.messages();
10395
}
10496

10597
/**
@@ -131,18 +123,6 @@ public boolean success() {
131123
Objects.equals(lastMsgId, other.lastMsgId);
132124
}
133125

134-
/** {@inheritDoc} */
135-
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
136-
if (msgs != null)
137-
msgsBytes = U.marshal(marsh, msgs);
138-
}
139-
140-
/** {@inheritDoc} */
141-
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
142-
if (msgsBytes != null)
143-
msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
144-
}
145-
146126
/** {@inheritDoc} */
147127
@Override public short directType() {
148128
return 28;
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp.messages;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import org.apache.ignite.IgniteCheckedException;
26+
import org.apache.ignite.internal.Order;
27+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
28+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
29+
import org.apache.ignite.internal.util.typedef.F;
30+
import org.apache.ignite.internal.util.typedef.internal.S;
31+
import org.apache.ignite.internal.util.typedef.internal.U;
32+
import org.apache.ignite.marshaller.Marshaller;
33+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
34+
import org.apache.ignite.plugin.extensions.communication.Message;
35+
import org.jetbrains.annotations.Nullable;
36+
37+
/**
38+
* TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883
39+
* Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} with the original order.
40+
* Several of them might be a {@link Message}, several may not and require the original marshalling.
41+
*/
42+
public class TcpDiscoveryCollectionMessage implements MarshallableMessage {
43+
/** {@link TcpDiscoveryAbstractMessage} messages which are a {@link Message}. */
44+
@Order(0)
45+
@Nullable Map<Integer, Message> writableMsgs;
46+
47+
/** Marshallable or Java-serializable messages which are not a {@link Message}. */
48+
@Nullable private Map<Integer, TcpDiscoveryAbstractMessage> marshallableMsgs;
49+
50+
/** Marshalled {@link #marshallableMsgs}. */
51+
@Order(1)
52+
@GridToStringExclude
53+
@Nullable byte[] marshallableMsgsBytes;
54+
55+
/** Constructor for {@link DiscoveryMessageFactory}. */
56+
public TcpDiscoveryCollectionMessage() {
57+
// No-op.
58+
}
59+
60+
/** @param msgs Discovery messages to hold. */
61+
public TcpDiscoveryCollectionMessage(Collection<TcpDiscoveryAbstractMessage> msgs) {
62+
if (F.isEmpty(msgs))
63+
return;
64+
65+
// Keeps the original message order.
66+
int idx = 0;
67+
68+
for (TcpDiscoveryAbstractMessage m : msgs) {
69+
if (m instanceof Message) {
70+
if (writableMsgs == null)
71+
writableMsgs = U.newHashMap(msgs.size());
72+
73+
writableMsgs.put(idx++, (Message)m);
74+
75+
continue;
76+
}
77+
78+
if (marshallableMsgs == null)
79+
marshallableMsgs = U.newHashMap(msgs.size());
80+
81+
marshallableMsgs.put(idx++, m);
82+
}
83+
}
84+
85+
/** @param marsh marshaller. */
86+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
87+
if (marshallableMsgs != null)
88+
marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs);
89+
}
90+
91+
/** {@inheritDoc} */
92+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
93+
if (marshallableMsgsBytes != null)
94+
marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr);
95+
96+
marshallableMsgsBytes = null;
97+
}
98+
99+
/**
100+
* Gets pending messages sent to new node by its previous.
101+
*
102+
* @return Pending messages from previous node.
103+
*/
104+
public Collection<TcpDiscoveryAbstractMessage> messages() {
105+
if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs))
106+
return Collections.emptyList();
107+
108+
int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size())
109+
+ (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size());
110+
111+
List<TcpDiscoveryAbstractMessage> res = new ArrayList<>(totalSz);
112+
113+
for (int i = 0; i < totalSz; ++i) {
114+
Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i);
115+
116+
if (m == null) {
117+
TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i);
118+
119+
assert adm != null;
120+
121+
res.add(adm);
122+
}
123+
else {
124+
assert marshallableMsgs == null || marshallableMsgs.get(i) == null;
125+
assert m instanceof TcpDiscoveryAbstractMessage;
126+
127+
res.add((TcpDiscoveryAbstractMessage)m);
128+
}
129+
}
130+
131+
return res;
132+
}
133+
134+
/** {@inheritDoc} */
135+
@Override public short directType() {
136+
return -200;
137+
}
138+
139+
/** {@inheritDoc} */
140+
@Override public String toString() {
141+
return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString());
142+
}
143+
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2727
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2828
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
29+
import org.apache.ignite.internal.util.typedef.F;
2930
import org.apache.ignite.internal.util.typedef.internal.S;
3031
import org.apache.ignite.internal.util.typedef.internal.U;
3132
import org.apache.ignite.marshaller.Marshaller;
@@ -58,16 +59,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM
5859
@Order(1)
5960
DiscoveryDataPacket dataPacket;
6061

61-
/** Pending messages from previous node. */
62-
private Collection<TcpDiscoveryAbstractMessage> msgs;
63-
64-
/**
65-
* TODO: Use direct messages or a message container after https://issues.apache.org/jira/browse/IGNITE-25883
66-
* Marshalled {@link #msgs}.
67-
*/
62+
/** Pending messages containner. */
6863
@Order(2)
69-
@GridToStringExclude
70-
byte[] msgsBytes;
64+
@Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
7165

7266
/** Current topology. Initialized by coordinator. */
7367
@GridToStringInclude
@@ -131,8 +125,7 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
131125

132126
node = msg.node;
133127
nodeBytes = msg.nodeBytes;
134-
msgs = msg.msgs;
135-
msgsBytes = msg.msgsBytes;
128+
pendingMsgsMsg = msg.pendingMsgsMsg;
136129
top = msg.top;
137130
topBytes = msg.topBytes;
138131
clientTop = msg.clientTop;
@@ -156,8 +149,8 @@ public TcpDiscoveryNode node() {
156149
*
157150
* @return Pending messages from previous node.
158151
*/
159-
@Nullable public Collection<TcpDiscoveryAbstractMessage> messages() {
160-
return msgs;
152+
public @Nullable Collection<TcpDiscoveryAbstractMessage> messages() {
153+
return pendingMsgsMsg == null ? null : pendingMsgsMsg.messages();
161154
}
162155

163156
/**
@@ -166,8 +159,7 @@ public TcpDiscoveryNode node() {
166159
* @param msgs Pending messages to send to new node.
167160
*/
168161
public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs) {
169-
this.msgs = msgs;
170-
msgsBytes = null;
162+
pendingMsgsMsg = F.isEmpty(msgs) ? null : new TcpDiscoveryCollectionMessage(msgs);
171163
}
172164

173165
/**
@@ -259,9 +251,6 @@ public long gridStartTime() {
259251
if (node != null)
260252
nodeBytes = U.marshal(marsh, node);
261253

262-
if (msgs != null)
263-
msgsBytes = U.marshal(marsh, msgs);
264-
265254
if (top != null)
266255
topBytes = U.marshal(marsh, top);
267256

@@ -274,9 +263,6 @@ public long gridStartTime() {
274263
if (nodeBytes != null)
275264
node = U.unmarshal(marsh, nodeBytes, clsLdr);
276265

277-
if (msgsBytes != null)
278-
msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
279-
280266
if (topBytes != null)
281267
top = U.unmarshal(marsh, topBytes, clsLdr);
282268

@@ -286,7 +272,6 @@ public long gridStartTime() {
286272
nodeBytes = null;
287273
topBytes = null;
288274
topHistBytes = null;
289-
msgsBytes = null;
290275
}
291276

292277

0 commit comments

Comments
 (0)