diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 3af938392a4c8..8bf794062ef45 100755 --- a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2146,4 +2146,11 @@ public static int compareArrays(double[] a1, double[] a2) { public static Collection emptyIfNull(@Nullable Collection col) { return col == null ? Collections.emptySet() : col; } + + /** + * @param map Map. + */ + public static Map emptyIfNull(@Nullable Map map) { + return map == null ? Collections.emptyMap() : map; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index cf07a3fcfb8c0..3e53b9552d75f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer; import org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer; import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer; +import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; @@ -109,6 +110,7 @@ import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer; import org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer; import org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer; +import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.IgniteTxKeySerializer; import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer; import org.apache.ignite.internal.codegen.JobStealingRequestSerializer; @@ -120,6 +122,8 @@ import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer; import org.apache.ignite.internal.codegen.NodeIdMessageSerializer; import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer; +import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer; +import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer; import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer; import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer; import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer; @@ -200,6 +204,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -212,7 +217,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -465,6 +473,11 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register(PartitionReservationsMap.TYPE_CODE, PartitionReservationsMap::new, new PartitionReservationsMapSerializer()); factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE, IgniteDhtPartitionHistorySuppliersMap::new, new IgniteDhtPartitionHistorySuppliersMapSerializer()); + factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer()); + factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer()); + factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new, + new IgniteDhtPartitionsToReloadMapSerializer()); + factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer()); // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this // [120..123] - DR diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6bff39ae692d1..9a6a8cfb04533 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; @@ -1440,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( } if (!partsSizes.isEmpty()) - m.partitionSizes(cctx, partsSizes); + m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new)); return m; } @@ -1771,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe boolean updated = false; - Map> partsSizes = msg.partitionSizes(cctx); + Map partsSizes = F.emptyIfNull(msg.partitionSizes()); for (Map.Entry entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -1781,11 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology(); if (top != null) { + PartitionSizesMap sizesMap = partsSizes.get(grpId); + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), - partsSizes.getOrDefault(grpId, Collections.emptyMap()), + sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(), msg.topologyVersion(), null, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java new file mode 100644 index 0000000000000..bcf6ab80c3a43 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Partition reload map for cache. */ +public class CachePartitionsToReloadMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 512; + + /** Partition reload map for cache. */ + @Order(value = 0, method = "cachePartitions") + private Map map; + + /** + * @return Partition reload map for cache. + */ + public Map cachePartitions() { + return map; + } + + /** + * @param map Partition reload map for cache. + */ + public void cachePartitions(Map map) { + this.map = map; + } + + /** + * @param cacheId Cache id. + * @return Partitions to reload for this cache. + */ + public @Nullable PartitionsToReload get(int cacheId) { + if (map == null) + return null; + + return map.get(cacheId); + } + + /** + * @param cacheId Cache id. + * @param parts Partitions to reload. + */ + public void put(int cacheId, PartitionsToReload parts) { + if (map == null) + map = new HashMap<>(); + + map.put(cacheId, parts); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0a1b955f0d035..e40f6234a904f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3311,12 +3311,18 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) { if (partMap == null) continue; + Map grpPartSizes = singleMsg.partitionSizes(top.groupId()); + for (Map.Entry e0 : partMap.entrySet()) { int p = e0.getKey(); GridDhtPartitionState state = e0.getValue(); - if (state == GridDhtPartitionState.OWNING) - partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p)); + if (state == GridDhtPartitionState.OWNING) { + Long size = grpPartSizes.get(p); + + if (size != null) + partSizes.put(p, size); + } } } @@ -4654,7 +4660,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); try { - Map> partsSizes = msg.partitionSizes(cctx); + Map partsSizes = F.emptyIfNull(msg.partitionSizes()); doInParallel( parallelismLvl, @@ -4665,11 +4671,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) { + PartitionSizesMap sizesMap = partsSizes.get(grpId); + grp.topology().update(resTopVer, msg.partitions().get(grpId), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), - partsSizes.getOrDefault(grpId, Collections.emptyMap()), + sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(), null, this, msg.lostPartitions(grpId)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 469a85140f740..15fef52e47319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -83,14 +82,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Partitions that must be cleared and re-loaded. */ @GridToStringInclude - @GridDirectTransient private IgniteDhtPartitionsToReloadMap partsToReload; - /** Serialized partitions that must be cleared and re-loaded. */ - private byte[] partsToReloadBytes; - - /** Serialized partitions sizes. */ - private byte[] partsSizesBytes; + /** Partition sizes. */ + private Map partsSizes; /** Topology version. */ private AffinityTopologyVersion topVer; @@ -178,8 +173,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, cp.partCntrs = partCntrs; cp.partHistSuppliers = partHistSuppliers; cp.partsToReload = partsToReload; - cp.partsToReloadBytes = partsToReloadBytes; - cp.partsSizesBytes = partsSizesBytes; + cp.partsSizes = partsSizes; cp.topVer = topVer; cp.errs = errs; cp.errsBytes = errsBytes; @@ -351,7 +345,7 @@ public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { /** * */ - public Set partsToReload(UUID nodeId, int grpId) { + public Collection partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); @@ -361,41 +355,17 @@ public Set partsToReload(UUID nodeId, int grpId) { /** * Supplies partition sizes map for all cache groups. * - * @param ctx Cache context. * @param partsSizes Partitions sizes map. */ - public void partitionSizes(GridCacheSharedContext ctx, Map> partsSizes) { - try { - byte[] marshalled = U.marshal(ctx, partsSizes); - - if (compressed()) - marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel()); - - partsSizesBytes = marshalled; - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + public void partitionSizes(Map partsSizes) { + this.partsSizes = partsSizes; } /** - * Returns partition sizes map for all cache groups. - * - * @param ctx Cache context. * @return Partition sizes map (grpId, (partId, partSize)). */ - public Map> partitionSizes(GridCacheSharedContext ctx) { - if (partsSizesBytes == null) - return Collections.emptyMap(); - - try { - return compressed() - ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader()) - : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader()); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + public Map partitionSizes() { + return partsSizes; } /** @@ -431,7 +401,6 @@ public void rebalanced(boolean rebalanced) { super.prepareMarshal(ctx); boolean marshal = (!F.isEmpty(parts) && partsBytes == null) || - (partsToReload != null && partsToReloadBytes == null) || (!F.isEmpty(errs) && errsBytes == null); if (marshal) { @@ -443,9 +412,6 @@ public void rebalanced(boolean rebalanced) { if (!F.isEmpty(parts) && partsBytes == null) objectsToMarshall.add(parts); - if (partsToReload != null && partsToReloadBytes == null) - objectsToMarshall.add(partsToReload); - if (!F.isEmpty(errs) && errsBytes == null) objectsToMarshall.add(errs); @@ -469,9 +435,6 @@ public void rebalanced(boolean rebalanced) { if (!F.isEmpty(parts) && partsBytes == null) partsBytes = iter.next(); - if (partsToReload != null && partsToReloadBytes == null) - partsToReloadBytes = iter.next(); - if (!F.isEmpty(errs) && errsBytes == null) errsBytes = iter.next(); } @@ -505,9 +468,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) { if (partsBytes != null && parts == null) objectsToUnmarshall.add(partsBytes); - if (partsToReloadBytes != null && partsToReload == null) - objectsToUnmarshall.add(partsToReloadBytes); - if (errsBytes != null && errs == null) objectsToUnmarshall.add(errsBytes); @@ -557,9 +517,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) { } } - if (partsToReloadBytes != null && partsToReload == null) - partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next(); - if (errsBytes != null && errs == null) errs = (Map)iter.next(); @@ -649,13 +606,13 @@ public void topologyVersion(AffinityTopologyVersion topVer) { writer.incrementState(); case 15: - if (!writer.writeByteArray(partsSizesBytes)) + if (!writer.writeMap(partsSizes, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeByteArray(partsToReloadBytes)) + if (!writer.writeMessage(partsToReload)) return false; writer.incrementState(); @@ -758,7 +715,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 15: - partsSizesBytes = reader.readByteArray(); + partsSizes = reader.readMap(MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -766,7 +723,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 16: - partsToReloadBytes = reader.readByteArray(); + partsToReload = reader.readMessage(); if (!reader.isLastRead()) return false; @@ -842,8 +799,6 @@ public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager disco public void cleanUp() { partsBytes = null; partCntrs = null; - partsToReloadBytes = null; - partsSizesBytes = null; errsBytes = null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index 8515004c3fdf4..bc965a49926d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -18,45 +18,47 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Partition reload map. */ -public class IgniteDhtPartitionsToReloadMap implements Serializable { - /** */ - private static final long serialVersionUID = 0L; +public class IgniteDhtPartitionsToReloadMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 513; /** */ - private Map>> map; + @Order(value = 0, method = "partitionsToReload") + private Map map; /** * @param nodeId Node ID. * @param cacheId Cache ID. * @return Collection of partitions to reload. */ - public synchronized Set get(UUID nodeId, int cacheId) { + public synchronized Collection get(UUID nodeId, int cacheId) { if (map == null) return Collections.emptySet(); - Map> nodeMap = map.get(nodeId); + CachePartitionsToReloadMap nodeMap = map.get(nodeId); if (nodeMap == null) return Collections.emptySet(); - Set parts = nodeMap.get(cacheId); + PartitionsToReload partsToReload = nodeMap.get(cacheId); - if (parts == null) + if (partsToReload == null) return Collections.emptySet(); - return parts; + return F.emptyIfNull(partsToReload.partitions()); } /** @@ -68,18 +70,12 @@ public synchronized void put(UUID nodeId, int cacheId, int partId) { if (map == null) map = new HashMap<>(); - Map> nodeMap = map.get(nodeId); - - if (nodeMap == null) { - nodeMap = new HashMap<>(); - - map.put(nodeId, nodeMap); - } + CachePartitionsToReloadMap nodeMap = map.computeIfAbsent(nodeId, k -> new CachePartitionsToReloadMap()); - Set parts = nodeMap.get(cacheId); + PartitionsToReload parts = nodeMap.get(cacheId); if (parts == null) { - parts = new HashSet<>(); + parts = new PartitionsToReload(); nodeMap.put(cacheId, parts); } @@ -98,4 +94,23 @@ public synchronized boolean isEmpty() { @Override public String toString() { return S.toString(IgniteDhtPartitionsToReloadMap.class, this); } + + /** + * @return Partition reload map. + */ + public Map partitionsToReload() { + return map; + } + + /** + * @param map Partition reload map. + */ + public void partitionsToReload(Map map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java new file mode 100644 index 0000000000000..987fa9fe604e3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Partition sizes map. */ +public class PartitionSizesMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 514; + + /** Partition sizes map. */ + @Order(value = 0, method = "partitionSizes") + private @Nullable Map partSizes; + + /** Default constructor. */ + public PartitionSizesMap() { + // No-op. + } + + /** + * @param partSizes Partition sizes map. + */ + public PartitionSizesMap(@Nullable Map partSizes) { + this.partSizes = partSizes; + } + + /** + * @return Partition sizes map. + */ + public @Nullable Map partitionSizes() { + return partSizes; + } + + /** + * @param partSizes Partition sizes map. + */ + public void partitionSizes(Map partSizes) { + this.partSizes = partSizes; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java new file mode 100644 index 0000000000000..023b28ad03f1a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Partitions to reload. */ +public class PartitionsToReload implements Message { + /** Type code. */ + public static final short TYPE_CODE = 511; + + /** Collection of partitions to reload. */ + @Order(value = 0, method = "partitions") + private Collection parts; + + /** + * @return Collection of partitions to reload. + */ + public Collection partitions() { + return parts; + } + + /** + * @param parts Collection of partitions to reload. + */ + public void partitions(Collection parts) { + this.parts = parts; + } + + /** + * @param partId Partition ID. + */ + public void add(int partId) { + if (parts == null) + parts = new HashSet<>(); + + parts.add(partId); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 9ca26f636ae3a..675f891be690c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -747,7 +747,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java index 0c2f415a7a1c1..7d6a079f0449b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -297,7 +298,7 @@ public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, G * means full map received is not related to exchange * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @param partsToReload Set of partitions that need to be reloaded. + * @param partsToReload Collection of partitions that need to be reloaded. * @param partSizes Global partition sizes. * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not * related to exchange. Value should be not less than previous 'Topology version from exchange'. @@ -309,7 +310,7 @@ public boolean update( @Nullable AffinityTopologyVersion exchangeResVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index b004053ca6f35..8227e16172814 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -1440,7 +1440,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut,