Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {

imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");

returnFalseIfWriteFailed(write, "writer.writeCollection", getExpr,
String collectionWriter = assignableFrom(erasedType(type), type(Set.class.getName()))
Comment thread
shishkovilja marked this conversation as resolved.
? "writer.writeSet"
: "writer.writeCollection";

returnFalseIfWriteFailed(write, collectionWriter, getExpr,
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
}

Expand Down Expand Up @@ -526,7 +530,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {

assert typeArgs.size() == 1;

returnFalseIfReadFailed(name, "reader.readCollection",
String collectionReader = assignableFrom(erasedType(type), type(Set.class.getName()))
? "reader.readSet"
: "reader.readCollection";

returnFalseIfReadFailed(name, collectionReader,
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.ignite.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand All @@ -43,7 +43,7 @@ public class IgniteDiagnosticRequest implements Message {

/** Infos to send to a remote node. */
@Order(2)
private @Nullable LinkedHashSet<DiagnosticBaseInfo> infos;
private @Nullable Set<DiagnosticBaseInfo> infos;

/** Local message related to remote info. */
private final Map<Object, List<String>> msgs = new LinkedHashMap<>();
Expand Down Expand Up @@ -71,12 +71,12 @@ public IgniteDiagnosticRequest() {
* @param nodeId Node ID.
* @param infos Diagnostic infos.
*/
public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Collection<DiagnosticBaseInfo> infos) {
public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Set<DiagnosticBaseInfo> infos) {
this(nodeId);

this.futId = futId;

infos(infos);
this.infos = infos;
}

/**
Expand Down Expand Up @@ -142,21 +142,13 @@ public void futureId(long futId) {
}

/** @return Compound diagnostic infos. */
public @Nullable Collection<DiagnosticBaseInfo> infos() {
public @Nullable Set<DiagnosticBaseInfo> infos() {
return infos;
}

/** Sets compound diagnostic infos. */
public void infos(@Nullable Collection<DiagnosticBaseInfo> infos) {
// Deserialization supports only `Collection` interface in MessageReader#readCollection.
this.infos = toLinkedHashSet(infos);
Comment thread
Vladsz83 marked this conversation as resolved.
}

/** */
private static @Nullable LinkedHashSet<DiagnosticBaseInfo> toLinkedHashSet(@Nullable Collection<DiagnosticBaseInfo> infos) {
return infos == null
? null
: infos instanceof LinkedHashSet ? (LinkedHashSet<DiagnosticBaseInfo>)infos : new LinkedHashSet<>(infos);
public void infos(@Nullable Set<DiagnosticBaseInfo> infos) {
Comment thread
shishkovilja marked this conversation as resolved.
Comment thread
Vladsz83 marked this conversation as resolved.
this.infos = infos;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
Expand Down Expand Up @@ -372,13 +373,24 @@ public ByteBuffer getBuffer() {
@Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;

C col = stream.readCollection(itemType, this);
C col = stream.readList(itemType, this);

lastRead = stream.lastFinished();

return col;
}

/** {@inheritDoc} */
@Override public <SET extends Set<?>> SET readSet(MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;

SET set = stream.readSet(itemType, this);

lastRead = stream.lastFinished();

return set;
}

/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
MessageCollectionItemType valType, boolean linked) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
Expand Down Expand Up @@ -345,6 +346,11 @@ public ByteBuffer getBuffer() {
return stream.lastFinished();
}

/** {@inheritDoc} */
@Override public <T> boolean writeSet(Set<T> set, MessageCollectionItemType itemType) {
return writeCollection(set, itemType);
}

/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -1601,11 +1603,36 @@ public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> item
}

/**
* Reads collection as an {@link ArrayList}.
*
* @param itemType Item type.
* @param reader Reader.
* @return {@link ArrayList}.
*/
public <L extends List<?>> L readList(MessageCollectionItemType itemType, MessageReader reader) {
return readCollection(itemType, reader, false);
}

/**
* Reads collection as a {@link HashSet}.
*
* @param itemType Item type.
* @param reader Reader.
* @return {@link HashSet}.
*/
public <SET extends Set<?>> SET readSet(MessageCollectionItemType itemType, MessageReader reader) {
return readCollection(itemType, reader, true);
}

/**
* Reads collection eather as a {@link ArrayList} or a {@link HashSet}.
*
* @param itemType Item type.
* @param reader Reader.
* @return Collection.
* @param set Read-as-Set flag.
* @return {@link ArrayList} or a {@link HashSet}.
*/
public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader) {
private <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader, boolean set) {
if (readSize == -1) {
int size = readInt();

Expand All @@ -1617,7 +1644,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item

if (readSize >= 0) {
if (col == null)
col = new ArrayList<>(readSize);
col = set ? U.newHashSet(readSize) : new ArrayList<>(readSize);

for (int i = readItems; i < readSize; i++) {
Object item = read(itemType, reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
Expand All @@ -28,6 +29,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
Expand All @@ -42,5 +44,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer());
factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer());
factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer());
factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -876,7 +877,7 @@ public IgniteInternalFuture<String> requestDiagnosticInfo(final UUID nodeId, Ign
*/
private IgniteInternalFuture<String> sendDiagnosticMessage(
UUID nodeId,
@Nullable Collection<IgniteDiagnosticRequest.DiagnosticBaseInfo> infos
@Nullable Set<IgniteDiagnosticRequest.DiagnosticBaseInfo> infos
) {
try {
IgniteDiagnosticRequest msg = new IgniteDiagnosticRequest(diagFutId.getAndIncrement(), nodeId, infos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
Expand Down Expand Up @@ -226,27 +227,36 @@ public default void setBuffer(ByteBuffer buf) {
*
* @param itemType Array component type.
* @param itemCls Array component class.
* @param <T> Type of the red object .
* @param <T> Type of the read object.
* @return Array of objects.
*/
public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls);

/**
* Reads collection.
* Reads any collection.
*
* @param itemType Collection item type.
* @param <C> Type of the red collection.
* @param <C> Type of the read collection.
* @return Collection.
*/
public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType);

/**
* Reads any collection and provides it as a set.
*
* @param itemType Set item type.
* @param <S> Type of the read set.
* @return Set.
*/
public <S extends Set<?>> S readSet(MessageCollectionItemType itemType);

/**
* Reads map.
*
* @param keyType Map key type.
* @param valType Map value type.
* @param linked Whether {@link LinkedHashMap} should be created.
* @param <M> Type of the red map.
* @param <M> Type of the read map.
* @return Map.
*/
// TODO: IGNITE-26329 — switch to the new readMap method without the flag parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
Expand Down Expand Up @@ -283,7 +284,7 @@ public default void setBuffer(ByteBuffer buf) {
public <T> boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType);

/**
* Writes collection.
* Writes collection with its elements order.
*
* @param col Collection.
* @param itemType Collection item type.
Expand All @@ -292,6 +293,16 @@ public default void setBuffer(ByteBuffer buf) {
*/
public <T> boolean writeCollection(Collection<T> col, MessageCollectionItemType itemType);

/**
* Writes set with its elements order.
*
* @param set Set.
* @param itemType Set item type.
* @param <T> Type of the objects that set contains.
* @return Whether value was fully written.
*/
public <T> boolean writeSet(Set<T> set, MessageCollectionItemType itemType);
Comment thread
shishkovilja marked this conversation as resolved.

/**
* Writes map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.Externalizable;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -300,7 +299,7 @@ public void failedNodes(@Nullable Set<UUID> failedNodes) {
/**
* @return Failed nodes IDs.
*/
@Nullable public Collection<UUID> failedNodes() {
@Nullable public Set<UUID> failedNodes() {
return failedNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@

package org.apache.ignite.spi.discovery.tcp.messages;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;

/**
* Message used to check whether a node is still connected to the topology.
* The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
* which directly replies to the sender without message re-translation to the coordinator.
*/
public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable {
public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

/**
* Default no-arg constructor for {@link Externalizable} interface.
* Default constructor for {@link DiscoveryMessageFactory}.
*/
public TcpDiscoveryConnectionCheckMessage() {
// No-op.
Expand All @@ -55,13 +53,8 @@ public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) {
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// This method has been left empty intentionally to keep message size at min.
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// This method has been left empty intentionally to keep message size at min.
@Override public short directType() {
return 6;
}

/** {@inheritDoc} */
Expand Down
Loading
Loading