Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2146,4 +2146,11 @@ public static int compareArrays(double[] a1, double[] a2) {
public static <T> Collection<T> emptyIfNull(@Nullable Collection<T> col) {
return col == null ? Collections.emptySet() : col;
}

/**
* @param map Map.
*/
public static <K, V> Map<K, V> emptyIfNull(@Nullable Map<K, V> map) {
return map == null ? Collections.emptyMap() : map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
import org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
import org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
import org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
Expand All @@ -120,6 +122,8 @@
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
Expand Down Expand Up @@ -200,6 +204,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
Expand All @@ -212,7 +217,10 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
Expand Down Expand Up @@ -465,6 +473,11 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register(PartitionReservationsMap.TYPE_CODE, PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE, IgniteDhtPartitionHistorySuppliersMap::new,
new IgniteDhtPartitionHistorySuppliersMapSerializer());
factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer());
factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer());

// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
Expand Down Expand Up @@ -1440,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
}

if (!partsSizes.isEmpty())
m.partitionSizes(cctx, partsSizes);
m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new));

return m;
}
Expand Down Expand Up @@ -1771,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe

boolean updated = false;

Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());

for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
Expand All @@ -1781,11 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology();

if (top != null) {
PartitionSizesMap sizesMap = partsSizes.get(grpId);

updated |= top.update(null,
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
msg.topologyVersion(),
null,
null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/** Partition reload map for cache. */
public class CachePartitionsToReloadMap implements Message {
/** Type code. */
public static final short TYPE_CODE = 512;

/** Partition reload map for cache. */
@Order(value = 0, method = "cachePartitions")
private Map<Integer, PartitionsToReload> map;

/**
* @return Partition reload map for cache.
*/
public Map<Integer, PartitionsToReload> cachePartitions() {
return map;
}

/**
* @param map Partition reload map for cache.
*/
public void cachePartitions(Map<Integer, PartitionsToReload> map) {
this.map = map;
}

/**
* @param cacheId Cache id.
* @return Partitions to reload for this cache.
*/
public @Nullable PartitionsToReload get(int cacheId) {
if (map == null)
return null;

return map.get(cacheId);
}

/**
* @param cacheId Cache id.
* @param parts Partitions to reload.
*/
public void put(int cacheId, PartitionsToReload parts) {
if (map == null)
map = new HashMap<>();

map.put(cacheId, parts);
}

/** {@inheritDoc} */
@Override public short directType() {
return TYPE_CODE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3311,12 +3311,18 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
if (partMap == null)
continue;

Map<Integer, Long> grpPartSizes = singleMsg.partitionSizes(top.groupId());

for (Map.Entry<Integer, GridDhtPartitionState> e0 : partMap.entrySet()) {
int p = e0.getKey();
GridDhtPartitionState state = e0.getValue();

if (state == GridDhtPartitionState.OWNING)
partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p));
if (state == GridDhtPartitionState.OWNING) {
Long size = grpPartSizes.get(p);

if (size != null)
partSizes.put(p, size);
}
}
}

Expand Down Expand Up @@ -4654,7 +4660,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);

try {
Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());

doInParallel(
parallelismLvl,
Expand All @@ -4665,11 +4671,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);

if (grp != null) {
PartitionSizesMap sizesMap = partsSizes.get(grpId);

grp.topology().update(resTopVer,
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
null,
this,
msg.lostPartitions(grpId));
Expand Down
Loading
Loading