diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 4591382105b0c..d0c0d14e3b3fe 100644 --- a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -388,7 +388,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - returnFalseIfWriteFailed(write, "writer.writeCollection", getExpr, + String collectionWriter = assignableFrom(erasedType(type), type(Set.class.getName())) + ? "writer.writeSet" + : "writer.writeCollection"; + + returnFalseIfWriteFailed(write, collectionWriter, getExpr, "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); } @@ -526,7 +530,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { assert typeArgs.size() == 1; - returnFalseIfReadFailed(name, "reader.readCollection", + String collectionReader = assignableFrom(erasedType(type), type(Set.class.getName())) + ? "reader.readSet" + : "reader.readCollection"; + + returnFalseIfReadFailed(name, collectionReader, "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java index 528329174e9de..5f34d24c2a04d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java @@ -18,11 +18,11 @@ package org.apache.ignite.internal; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; @@ -43,7 +43,7 @@ public class IgniteDiagnosticRequest implements Message { /** Infos to send to a remote node. */ @Order(2) - private @Nullable LinkedHashSet infos; + private @Nullable Set infos; /** Local message related to remote info. */ private final Map> msgs = new LinkedHashMap<>(); @@ -71,12 +71,12 @@ public IgniteDiagnosticRequest() { * @param nodeId Node ID. * @param infos Diagnostic infos. */ - public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Collection infos) { + public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Set infos) { this(nodeId); this.futId = futId; - infos(infos); + this.infos = infos; } /** @@ -142,21 +142,13 @@ public void futureId(long futId) { } /** @return Compound diagnostic infos. */ - public @Nullable Collection infos() { + public @Nullable Set infos() { return infos; } /** Sets compound diagnostic infos. */ - public void infos(@Nullable Collection infos) { - // Deserialization supports only `Collection` interface in MessageReader#readCollection. - this.infos = toLinkedHashSet(infos); - } - - /** */ - private static @Nullable LinkedHashSet toLinkedHashSet(@Nullable Collection infos) { - return infos == null - ? null - : infos instanceof LinkedHashSet ? (LinkedHashSet)infos : new LinkedHashSet<>(infos); + public void infos(@Nullable Set infos) { + this.infos = infos; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index ec42277634c00..24d97da7b2de5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -21,6 +21,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; @@ -372,13 +373,24 @@ public ByteBuffer getBuffer() { @Override public > C readCollection(MessageCollectionItemType itemType) { DirectByteBufferStream stream = state.item().stream; - C col = stream.readCollection(itemType, this); + C col = stream.readList(itemType, this); lastRead = stream.lastFinished(); return col; } + /** {@inheritDoc} */ + @Override public > SET readSet(MessageCollectionItemType itemType) { + DirectByteBufferStream stream = state.item().stream; + + SET set = stream.readSet(itemType, this); + + lastRead = stream.lastFinished(); + + return set; + } + /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, boolean linked) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 7a4cf9138fb27..1da76aa14cfe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -21,6 +21,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; @@ -345,6 +346,11 @@ public ByteBuffer getBuffer() { return stream.lastFinished(); } + /** {@inheritDoc} */ + @Override public boolean writeSet(Set set, MessageCollectionItemType itemType) { + return writeCollection(set, itemType); + } + /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index a95c5e094a19c..5c526bf6fd3b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -22,10 +22,12 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.RandomAccess; +import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -1601,11 +1603,36 @@ public T[] readObjectArray(MessageCollectionItemType itemType, Class item } /** + * Reads collection as an {@link ArrayList}. + * + * @param itemType Item type. + * @param reader Reader. + * @return {@link ArrayList}. + */ + public > L readList(MessageCollectionItemType itemType, MessageReader reader) { + return readCollection(itemType, reader, false); + } + + /** + * Reads collection as a {@link HashSet}. + * + * @param itemType Item type. + * @param reader Reader. + * @return {@link HashSet}. + */ + public > SET readSet(MessageCollectionItemType itemType, MessageReader reader) { + return readCollection(itemType, reader, true); + } + + /** + * Reads collection eather as a {@link ArrayList} or a {@link HashSet}. + * * @param itemType Item type. * @param reader Reader. - * @return Collection. + * @param set Read-as-Set flag. + * @return {@link ArrayList} or a {@link HashSet}. */ - public > C readCollection(MessageCollectionItemType itemType, MessageReader reader) { + private > C readCollection(MessageCollectionItemType itemType, MessageReader reader, boolean set) { if (readSize == -1) { int size = readInt(); @@ -1617,7 +1644,7 @@ public > C readCollection(MessageCollectionItemType item if (readSize >= 0) { if (col == null) - col = new ArrayList<>(readSize); + col = set ? U.newHashSet(readSize) : new ArrayList<>(readSize); for (int i = readItems; i < readSize; i++) { Object item = read(itemType, reader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index b9661ab1bb331..a858bdb5fce0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -20,6 +20,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; @@ -28,6 +29,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -42,5 +44,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer()); factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer()); factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer()); + factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 0055209a81da2..5759101b2ca41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -876,7 +877,7 @@ public IgniteInternalFuture requestDiagnosticInfo(final UUID nodeId, Ign */ private IgniteInternalFuture sendDiagnosticMessage( UUID nodeId, - @Nullable Collection infos + @Nullable Set infos ) { try { IgniteDiagnosticRequest msg = new IgniteDiagnosticRequest(diagFutId.getAndIncrement(), nodeId, infos); diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index b5e4d506be76f..c8cad7c0df69c 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -226,27 +227,36 @@ public default void setBuffer(ByteBuffer buf) { * * @param itemType Array component type. * @param itemCls Array component class. - * @param Type of the red object . + * @param Type of the read object. * @return Array of objects. */ public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls); /** - * Reads collection. + * Reads any collection. * * @param itemType Collection item type. - * @param Type of the red collection. + * @param Type of the read collection. * @return Collection. */ public > C readCollection(MessageCollectionItemType itemType); + /** + * Reads any collection and provides it as a set. + * + * @param itemType Set item type. + * @param Type of the read set. + * @return Set. + */ + public > S readSet(MessageCollectionItemType itemType); + /** * Reads map. * * @param keyType Map key type. * @param valType Map value type. * @param linked Whether {@link LinkedHashMap} should be created. - * @param Type of the red map. + * @param Type of the read map. * @return Map. */ // TODO: IGNITE-26329 — switch to the new readMap method without the flag parameter diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index ad19e865fc68a..43fda9736d9e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -21,6 +21,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -283,7 +284,7 @@ public default void setBuffer(ByteBuffer buf) { public boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType); /** - * Writes collection. + * Writes collection with its elements order. * * @param col Collection. * @param itemType Collection item type. @@ -292,6 +293,16 @@ public default void setBuffer(ByteBuffer buf) { */ public boolean writeCollection(Collection col, MessageCollectionItemType itemType); + /** + * Writes set with its elements order. + * + * @param set Set. + * @param itemType Set item type. + * @param Type of the objects that set contains. + * @return Whether value was fully written. + */ + public boolean writeSet(Set set, MessageCollectionItemType itemType); + /** * Writes map. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index c8fea72ec3cfd..4abf7a295adc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,7 +19,6 @@ import java.io.Externalizable; import java.io.Serializable; -import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -300,7 +299,7 @@ public void failedNodes(@Nullable Set failedNodes) { /** * @return Failed nodes IDs. */ - @Nullable public Collection failedNodes() { + @Nullable public Set failedNodes() { return failedNodes; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java index 21079d0aa12ef..3cf1784da5600 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -17,11 +17,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; /** @@ -29,12 +27,12 @@ * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node * which directly replies to the sender without message re-translation to the coordinator. */ -public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable { +public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** - * Default no-arg constructor for {@link Externalizable} interface. + * Default constructor for {@link DiscoveryMessageFactory}. */ public TcpDiscoveryConnectionCheckMessage() { // No-op. @@ -55,13 +53,8 @@ public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) { } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // This method has been left empty intentionally to keep message size at min. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // This method has been left empty intentionally to keep message size at min. + @Override public short directType() { + return 6; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index b7fd6f74daa8f..4875ec0a465e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -21,6 +21,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -299,6 +300,11 @@ private boolean writeField(Class type) { return writeField(Collection.class); } + /** {@inheritDoc} */ + @Override public boolean writeSet(Set set, MessageCollectionItemType itemType) { + return writeField(Set.class); + } + /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType) { @@ -553,6 +559,13 @@ private void readField(Class type) { return null; } + /** {@inheritDoc} */ + @Override public > S readSet(MessageCollectionItemType itemType) { + readField(Set.class); + + return null; + } + /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, boolean linked) { diff --git a/modules/core/src/test/resources/codegen/TestCollectionsMessage.java b/modules/core/src/test/resources/codegen/TestCollectionsMessage.java index c7c6ddc287d9c..358123c837415 100644 --- a/modules/core/src/test/resources/codegen/TestCollectionsMessage.java +++ b/modules/core/src/test/resources/codegen/TestCollectionsMessage.java @@ -19,6 +19,7 @@ import java.util.BitSet; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -96,6 +97,12 @@ public class TestCollectionsMessage implements Message { @Order(22) private List gridLongListList; + @Order(23) + private Set boxedIntegerSet; + + @Order(24) + private Set bitSetSet; + public List booleanArrayList() { return booleanArrayList; } @@ -280,6 +287,22 @@ public void gridLongListList(List gridLongListList) { this.gridLongListList = gridLongListList; } + public Set boxedIntegerSet() { + return boxedIntegerSet; + } + + public void boxedIntegerSet(Set boxedIntegerSet) { + this.boxedIntegerSet = boxedIntegerSet; + } + + public Set bitSetSet() { + return bitSetSet; + } + + public void bitSetSet(Set bitSetSet) { + this.bitSetSet = bitSetSet; + } + public short directType() { return 0; } diff --git a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java index fbe5cae6a8f88..44d3382c2182b 100644 --- a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java +++ b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java @@ -179,6 +179,18 @@ public class TestCollectionsMessageSerializer implements MessageSerializer { return false; writer.incrementState(); + + case 23: + if (!writer.writeSet(msg.boxedIntegerSet(), MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 24: + if (!writer.writeSet(msg.bitSetSet(), MessageCollectionItemType.BIT_SET)) + return false; + + writer.incrementState(); } return true; @@ -368,6 +380,22 @@ public class TestCollectionsMessageSerializer implements MessageSerializer { case 22: msg.gridLongListList(reader.readCollection(MessageCollectionItemType.GRID_LONG_LIST)); + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: + msg.boxedIntegerSet(reader.readSet(MessageCollectionItemType.INT)); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 24: + msg.bitSetSet(reader.readSet(MessageCollectionItemType.BIT_SET)); + if (!reader.isLastRead()) return false;