Skip to content

Commit 4ea4647

Browse files
committed
Merge remote-tracking branch 'refs/remotes/apache/master' into ignite-27631
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
2 parents 8fb0eec + 40d9b61 commit 4ea4647

29 files changed

Lines changed: 687 additions & 272 deletions

modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception {
401401
return;
402402
}
403403

404-
returnFalseIfWriteFailed(write, field, "writer.writeObjectArray", getExpr, messageCollectionItemTypes(type));
404+
returnFalseIfWriteFailed(write, field, "writer.writeObjectArray", getExpr, messageCollectionItemTypes(field, type));
405405

406406
return;
407407
}
@@ -426,7 +426,7 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
426426
List<String> args = new ArrayList<>();
427427

428428
args.add(getExpr);
429-
args.add(messageCollectionItemTypes(type));
429+
args.add(messageCollectionItemTypes(field, type));
430430

431431
if (compress)
432432
args.add("true"); // the value of the compress argument in the MessageWriter#writeMap method
@@ -454,7 +454,7 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
454454
}
455455

456456
else if (assignableFrom(erasedType(type), type(Collection.class.getName())))
457-
returnFalseIfWriteFailed(write, field, "writer.writeCollection", getExpr, messageCollectionItemTypes(type));
457+
returnFalseIfWriteFailed(write, field, "writer.writeCollection", getExpr, messageCollectionItemTypes(field, type));
458458

459459
else if (enumType(env, type)) {
460460
Element element = env.getTypeUtils().asElement(type);
@@ -621,15 +621,15 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception {
621621
}
622622

623623
if (componentType.getKind() == TypeKind.ARRAY) {
624-
returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(type));
624+
returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type));
625625

626626
return;
627627
}
628628

629629
if (componentType.getKind() == TypeKind.DECLARED) {
630630
Element componentElement = ((DeclaredType)componentType).asElement();
631631

632-
returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(type));
632+
returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type));
633633

634634
if (!"java.lang".equals(env.getElementUtils().getPackageOf(componentElement).getQualifiedName().toString())) {
635635
String importCls = ((QualifiedNameable)componentElement).getQualifiedName().toString();
@@ -664,7 +664,7 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
664664

665665
List<String> args = new ArrayList<>();
666666

667-
args.add(messageCollectionItemTypes(type));
667+
args.add(messageCollectionItemTypes(field, type));
668668

669669
if (compress)
670670
args.add("true"); // the value of the compress argument in the MessageReader#readMap method
@@ -692,7 +692,7 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
692692
}
693693

694694
else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
695-
returnFalseIfReadFailed(field, "reader.readCollection", messageCollectionItemTypes(type));
695+
returnFalseIfReadFailed(field, "reader.readCollection", messageCollectionItemTypes(field, type));
696696
}
697697
else if (enumType(env, type)) {
698698
String fieldPrefix = typeNameToFieldName(env.getTypeUtils().asElement(type).getSimpleName().toString());
@@ -715,7 +715,18 @@ else if (enumType(env, type)) {
715715
}
716716

717717
/** */
718-
private String messageCollectionItemTypes(TypeMirror type) throws Exception {
718+
private String messageCollectionItemTypes(VariableElement field, TypeMirror type) throws Exception {
719+
String desc = messageCollectionItemTypeDescriptor(type);
720+
String descName = field.getSimpleName() + "CollDesc";
721+
String typeName = desc.substring(desc.indexOf(' ') + 1, desc.indexOf('('));
722+
723+
fields.add("private final static " + typeName + " " + descName + " = " + desc + ";");
724+
725+
return descName;
726+
}
727+
728+
/** */
729+
private String messageCollectionItemTypeDescriptor(TypeMirror type) throws Exception {
719730
imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");
720731

721732
if (type.getKind() == TypeKind.ARRAY) {
@@ -744,7 +755,7 @@ else if (componentType.getKind() == TypeKind.DECLARED) {
744755

745756
imports.add("org.apache.ignite.plugin.extensions.communication.MessageArrayType");
746757

747-
return "new MessageArrayType(" + messageCollectionItemTypes(componentType) + ", " + clazz + ")";
758+
return "new MessageArrayType(" + messageCollectionItemTypeDescriptor(componentType) + ", " + clazz + ")";
748759
}
749760
else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
750761
imports.add("org.apache.ignite.plugin.extensions.communication.MessageMapType");
@@ -754,8 +765,8 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
754765
assert typeArgs.size() == 2;
755766

756767
return "new MessageMapType(" +
757-
messageCollectionItemTypes(typeArgs.get(0)) + ", " +
758-
messageCollectionItemTypes(typeArgs.get(1)) + ", " +
768+
messageCollectionItemTypeDescriptor(typeArgs.get(0)) + ", " +
769+
messageCollectionItemTypeDescriptor(typeArgs.get(1)) + ", " +
759770
assignableFrom(erasedType(type), type(LinkedHashMap.class.getName())) + ")";
760771
}
761772
else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
@@ -766,7 +777,7 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
766777
assert typeArgs.size() == 1;
767778

768779
return "new MessageCollectionType(" +
769-
messageCollectionItemTypes(typeArgs.get(0)) + ", " +
780+
messageCollectionItemTypeDescriptor(typeArgs.get(0)) + ", " +
770781
assignableFrom(erasedType(type), type(Set.class.getName())) + ")";
771782
}
772783
else {

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessageSerializer;
3232
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
3333
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
34+
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
35+
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessageSerializer;
3436
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
3537
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer;
3638
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
@@ -249,6 +251,7 @@ public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable C
249251
factory.register((short)513, StopRoutineAckDiscoveryMessage::new, new StopRoutineAckDiscoveryMessageSerializer());
250252
factory.register((short)514, StopRoutineDiscoveryMessage::new, new StopRoutineDiscoveryMessageSerializer());
251253
factory.register((short)515, CacheAffinityChangeMessage::new, new CacheAffinityChangeMessageSerializer());
254+
factory.register((short)516, ClientCacheChangeDiscoveryMessage::new, new ClientCacheChangeDiscoveryMessageSerializer());
252255

253256
factory.register((short)520, SnapshotOperationResponse::new, new SnapshotOperationResponseSerializer());
254257
factory.register((short)521, SnapshotHandlerResult::new, new SnapshotHandlerResultSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,39 +22,48 @@
2222
import java.util.Iterator;
2323
import java.util.Map;
2424
import java.util.Set;
25+
import org.apache.ignite.internal.Order;
2526
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2627
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2728
import org.apache.ignite.internal.util.typedef.F;
2829
import org.apache.ignite.internal.util.typedef.internal.S;
2930
import org.apache.ignite.lang.IgniteUuid;
31+
import org.apache.ignite.plugin.extensions.communication.Message;
3032
import org.jetbrains.annotations.Nullable;
3133

3234
/**
3335
* Sent from cache client node to asynchronously notify about started.closed client caches.
3436
*/
35-
public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage {
37+
public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage, Message {
3638
/** */
3739
private static final long serialVersionUID = 0L;
3840

3941
/** */
40-
private final IgniteUuid id = IgniteUuid.randomUuid();
42+
@Order(0)
43+
IgniteUuid id;
4144

4245
/** */
4346
@GridToStringInclude
44-
private Map<Integer, Boolean> startedCaches;
47+
@Order(1)
48+
Map<Integer, Boolean> startedCaches;
4549

4650
/** */
4751
@GridToStringInclude
48-
private Set<Integer> closedCaches;
52+
@Order(2)
53+
Set<Integer> closedCaches;
4954

5055
/** Update timeout object, used to batch multiple starts/close into single discovery message. */
51-
private transient ClientCacheUpdateTimeout updateTimeoutObj;
56+
private ClientCacheUpdateTimeout updateTimeoutObj;
57+
58+
/** */
59+
public ClientCacheChangeDiscoveryMessage() {}
5260

5361
/**
5462
* @param startedCaches Started caches.
5563
* @param closedCaches Closed caches.
5664
*/
5765
public ClientCacheChangeDiscoveryMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
66+
id = IgniteUuid.randomUuid();
5867
this.startedCaches = startedCaches;
5968
this.closedCaches = closedCaches;
6069
}
@@ -164,6 +173,11 @@ public void updateTimeoutObject(ClientCacheUpdateTimeout updateTimeoutObj) {
164173
return null;
165174
}
166175

176+
/** {@inheritDoc} */
177+
@Override public short directType() {
178+
return 516;
179+
}
180+
167181
/** {@inheritDoc} */
168182
@Override public String toString() {
169183
return S.toString(ClientCacheChangeDiscoveryMessage.class, this);

modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import org.apache.ignite.configuration.IgniteConfiguration;
3636
import org.apache.ignite.events.DiscoveryEvent;
3737
import org.apache.ignite.events.Event;
38+
import org.apache.ignite.internal.GridKernalContext;
3839
import org.apache.ignite.internal.IgniteEx;
39-
import org.apache.ignite.internal.IgniteKernal;
4040
import org.apache.ignite.internal.IgniteNodeAttributes;
4141
import org.apache.ignite.internal.managers.communication.GridMessageListener;
4242
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -952,22 +952,12 @@ private class GridDummySpiContext implements IgniteSpiContext {
952952

953953
/** {@inheritDoc} */
954954
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
955-
Ignite ignite0 = ignite;
956-
957-
if (!(ignite0 instanceof IgniteKernal))
958-
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
959-
960-
((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
955+
context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
961956
}
962957

963958
/** {@inheritDoc} */
964959
@Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
965-
Ignite ignite0 = ignite;
966-
967-
if (!(ignite0 instanceof IgniteKernal))
968-
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
969-
970-
((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
960+
context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
971961
}
972962

973963
/** {@inheritDoc} */
@@ -1004,5 +994,15 @@ private class GridDummySpiContext implements IgniteSpiContext {
1004994
@Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
1005995
// No-op.
1006996
}
997+
998+
/** */
999+
private GridKernalContext context() {
1000+
Ignite ignite0 = ignite;
1001+
1002+
if (ignite0 == null)
1003+
throw new IgniteSpiException(isStopping() ? "The node is stopping" : "The node is not yet started");
1004+
1005+
return ((IgniteEx)ignite0).context();
1006+
}
10071007
}
10081008
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7460,10 +7460,13 @@ else if (log.isDebugEnabled())
74607460
* @param clientMsgWrk Client message worker to start.
74617461
* @return Whether connection was successful.
74627462
* @throws IOException If IO failed.
7463+
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
74637464
*/
74647465
@SuppressWarnings({"IfMayBeConditional"})
7465-
private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
7466-
@Nullable ClientMessageWorker clientMsgWrk) throws IOException {
7466+
private boolean processJoinRequestMessage(
7467+
TcpDiscoveryJoinRequestMessage msg,
7468+
@Nullable ClientMessageWorker clientMsgWrk
7469+
) throws IOException, IgniteCheckedException {
74677470
assert msg != null;
74687471
assert !msg.responded();
74697472

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,9 +1573,12 @@ public long getCoordinatorSinceTimestamp() {
15731573
* @return Opened socket.
15741574
* @throws IOException If failed.
15751575
* @throws IgniteSpiOperationTimeoutException In case of timeout.
1576+
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
15761577
*/
1577-
protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
1578-
throws IOException, IgniteSpiOperationTimeoutException {
1578+
protected Socket openSocket(
1579+
InetSocketAddress sockAddr,
1580+
IgniteSpiOperationTimeoutHelper timeoutHelper
1581+
) throws IOException, IgniteSpiOperationTimeoutException, IgniteCheckedException {
15791582
return openSocket(createSocket(), sockAddr, timeoutHelper);
15801583
}
15811584

@@ -1588,10 +1591,13 @@ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeou
15881591
* @return Connected socket.
15891592
* @throws IOException If failed.
15901593
* @throws IgniteSpiOperationTimeoutException In case of timeout.
1594+
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
15911595
*/
1592-
protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
1593-
throws IOException, IgniteSpiOperationTimeoutException {
1594-
1596+
protected Socket openSocket(
1597+
Socket sock,
1598+
InetSocketAddress remAddr,
1599+
IgniteSpiOperationTimeoutHelper timeoutHelper
1600+
) throws IOException, IgniteSpiOperationTimeoutException, IgniteCheckedException {
15951601
assert remAddr != null;
15961602

15971603
try {
@@ -1608,7 +1614,7 @@ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOpe
16081614

16091615
return sock;
16101616
}
1611-
catch (IOException | IgniteSpiOperationTimeoutException e) {
1617+
catch (IOException | IgniteCheckedException e) {
16121618
if (sock != null)
16131619
U.closeQuiet(sock);
16141620

@@ -1686,8 +1692,14 @@ Socket createSocket() throws IOException {
16861692
* @param data Raw data to write.
16871693
* @param timeout Socket write timeout.
16881694
* @throws IOException If IO failed or write timed out.
1695+
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
16891696
*/
1690-
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException {
1697+
protected void writeToSocket(
1698+
Socket sock,
1699+
TcpDiscoveryAbstractMessage msg,
1700+
byte[] data,
1701+
long timeout
1702+
) throws IOException, IgniteCheckedException {
16911703
assert sock != null;
16921704
assert data != null;
16931705

@@ -1771,9 +1783,14 @@ protected void writeMessage(
17711783
* @param res Integer response.
17721784
* @param timeout Socket timeout.
17731785
* @throws IOException If IO failed or write timed out.
1786+
* @throws IgniteCheckedException If node is not yet initialized or is stopping.
17741787
*/
1775-
protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
1776-
throws IOException {
1788+
protected void writeToSocket(
1789+
TcpDiscoveryAbstractMessage msg,
1790+
Socket sock,
1791+
int res,
1792+
long timeout
1793+
) throws IOException, IgniteCheckedException {
17771794
assert sock != null;
17781795

17791796
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
@@ -1799,8 +1816,7 @@ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int r
17991816
* @throws IOException If IO failed or read timed out.
18001817
* @throws IgniteCheckedException If unmarshalling failed.
18011818
*/
1802-
protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException,
1803-
IgniteCheckedException {
1819+
protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException, IgniteCheckedException {
18041820
Socket sock = ses.socket();
18051821

18061822
assert sock != null;
@@ -2445,12 +2461,17 @@ protected Marshaller marshaller() {
24452461
}
24462462

24472463
/** Starts a timer for a socket operation. */
2448-
private SocketTimeoutObject startTimer(Socket sock, long timeout) {
2449-
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
2464+
private SocketTimeoutObject startTimer(Socket sock, long timeout) throws IgniteCheckedException {
2465+
try {
2466+
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
24502467

2451-
addTimeoutObject(obj);
2468+
addTimeoutObject(obj);
24522469

2453-
return obj;
2470+
return obj;
2471+
}
2472+
catch (IgniteSpiException e) {
2473+
throw new IgniteCheckedException("Failed to perform socket operation", e);
2474+
}
24542475
}
24552476

24562477
/**

0 commit comments

Comments
 (0)