From a06488ef5ae1d8e63227f71b855474de3655a78f Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Fri, 31 Oct 2025 12:00:35 +0500 Subject: [PATCH 1/8] IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap --- .../communication/GridIoMessageFactory.java | 13 +++ .../GridCachePartitionExchangeManager.java | 9 +- .../preloader/CachePartitionsToReloadMap.java | 75 +++++++++++++++ .../GridDhtPartitionsExchangeFuture.java | 6 +- .../GridDhtPartitionsFullMessage.java | 68 +++----------- .../IgniteDhtPartitionsToReloadMap.java | 56 ++++++----- .../dht/preloader/PartitionSizesMap.java | 93 +++++++++++++++++++ .../dht/preloader/PartitionsToReload.java | 63 +++++++++++++ .../topology/GridClientPartitionTopology.java | 2 +- .../topology/GridDhtPartitionTopology.java | 5 +- .../GridDhtPartitionTopologyImpl.java | 2 +- 11 files changed, 307 insertions(+), 85 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index c36ad13741dd8..2209a1822deb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer; import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer; import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer; +import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; @@ -102,6 +103,7 @@ import org.apache.ignite.internal.codegen.HandshakeMessageSerializer; import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer; import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer; +import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.IgniteTxKeySerializer; import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer; import org.apache.ignite.internal.codegen.JobStealingRequestSerializer; @@ -112,6 +114,8 @@ import org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer; import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer; import org.apache.ignite.internal.codegen.NodeIdMessageSerializer; +import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer; +import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer; import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer; import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer; import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer; @@ -191,6 +195,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -200,6 +205,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -445,6 +453,11 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register(GridCacheOperationMessage.TYPE_CODE, GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer()); factory.register(BinaryMetadataVersionInfo.TYPE_CODE, BinaryMetadataVersionInfo::new, new BinaryMetadataVersionInfoSerializer()); + factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer()); + factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer()); + factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new, + new IgniteDhtPartitionsToReloadMapSerializer()); + factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer()); // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this // [120..123] - DR diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6bff39ae692d1..0ccb44ef6e5fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; @@ -1440,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( } if (!partsSizes.isEmpty()) - m.partitionSizes(cctx, partsSizes); + m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new)); return m; } @@ -1771,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe boolean updated = false; - Map> partsSizes = msg.partitionSizes(cctx); + Map partsSizes = msg.partitionSizes(); for (Map.Entry entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -1781,11 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology(); if (top != null) { + PartitionSizesMap sizesMap = partsSizes.get(grpId); + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), - partsSizes.getOrDefault(grpId, Collections.emptyMap()), + sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(), msg.topologyVersion(), null, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java new file mode 100644 index 0000000000000..81a4502caeb83 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Partition reload map for cache. */ +public class CachePartitionsToReloadMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 507; + + /** Partition reload map for cache. */ + @Order(value = 0, method = "cachePartitions") + private Map map; + + /** + * @return Partition reload map for cache. + */ + public Map cachePartitions() { + return map; + } + + /** + * @param map Partition reload map for cache. + */ + public void cachePartitions(Map map) { + this.map = map; + } + + /** + * @param cacheId Cache id. + * @return Partitions to reload for this cache. + */ + public @Nullable PartitionsToReload get(int cacheId) { + if (map == null) + return null; + + return map.get(cacheId); + } + + /** + * @param cacheId Cache id. + * @param parts Partitions to reload. + */ + public void put(int cacheId, PartitionsToReload parts) { + if (map == null) + map = new HashMap<>(); + + map.put(cacheId, parts); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 35ac04f63851c..750d700876926 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -4654,7 +4654,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); try { - Map> partsSizes = msg.partitionSizes(cctx); + Map partsSizes = msg.partitionSizes(); doInParallel( parallelismLvl, @@ -4665,11 +4665,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) { + PartitionSizesMap sizesMap = partsSizes.get(grpId); + grp.topology().update(resTopVer, msg.partitions().get(grpId), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), - partsSizes.getOrDefault(grpId, Collections.emptyMap()), + sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(), null, this, msg.lostPartitions(grpId)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 926a77c76445a..fde6b9213c3ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -91,14 +90,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Partitions that must be cleared and re-loaded. */ @GridToStringInclude - @GridDirectTransient private IgniteDhtPartitionsToReloadMap partsToReload; - /** Serialized partitions that must be cleared and re-loaded. */ - private byte[] partsToReloadBytes; - - /** Serialized partitions sizes. */ - private byte[] partsSizesBytes; + /** Partition sizes. */ + private Map partsSizes; /** Topology version. */ private AffinityTopologyVersion topVer; @@ -188,8 +183,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, cp.partHistSuppliers = partHistSuppliers; cp.partHistSuppliersBytes = partHistSuppliersBytes; cp.partsToReload = partsToReload; - cp.partsToReloadBytes = partsToReloadBytes; - cp.partsSizesBytes = partsSizesBytes; + cp.partsSizes = partsSizes; cp.topVer = topVer; cp.errs = errs; cp.errsBytes = errsBytes; @@ -361,7 +355,7 @@ public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { /** * */ - public Set partsToReload(UUID nodeId, int grpId) { + public Collection partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); @@ -371,41 +365,20 @@ public Set partsToReload(UUID nodeId, int grpId) { /** * Supplies partition sizes map for all cache groups. * - * @param ctx Cache context. * @param partsSizes Partitions sizes map. */ - public void partitionSizes(GridCacheSharedContext ctx, Map> partsSizes) { - try { - byte[] marshalled = U.marshal(ctx, partsSizes); - - if (compressed()) - marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel()); - - partsSizesBytes = marshalled; - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + public void partitionSizes(Map partsSizes) { + this.partsSizes = partsSizes; } /** - * Returns partition sizes map for all cache groups. - * - * @param ctx Cache context. * @return Partition sizes map (grpId, (partId, partSize)). */ - public Map> partitionSizes(GridCacheSharedContext ctx) { - if (partsSizesBytes == null) + public Map partitionSizes() { + if (partsSizes == null) return Collections.emptyMap(); - try { - return compressed() - ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader()) - : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader()); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + return partsSizes; } /** @@ -443,7 +416,6 @@ public void rebalanced(boolean rebalanced) { boolean marshal = (!F.isEmpty(parts) && partsBytes == null) || (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) || (partHistSuppliers != null && partHistSuppliersBytes == null) || - (partsToReload != null && partsToReloadBytes == null) || (!F.isEmpty(errs) && errsBytes == null); if (marshal) { @@ -461,9 +433,6 @@ public void rebalanced(boolean rebalanced) { if (partHistSuppliers != null && partHistSuppliersBytes == null) objectsToMarshall.add(partHistSuppliers); - if (partsToReload != null && partsToReloadBytes == null) - objectsToMarshall.add(partsToReload); - if (!F.isEmpty(errs) && errsBytes == null) objectsToMarshall.add(errs); @@ -493,9 +462,6 @@ public void rebalanced(boolean rebalanced) { if (partHistSuppliers != null && partHistSuppliersBytes == null) partHistSuppliersBytes = iter.next(); - if (partsToReload != null && partsToReloadBytes == null) - partsToReloadBytes = iter.next(); - if (!F.isEmpty(errs) && errsBytes == null) errsBytes = iter.next(); } @@ -535,9 +501,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) { if (partHistSuppliersBytes != null && partHistSuppliers == null) objectsToUnmarshall.add(partHistSuppliersBytes); - if (partsToReloadBytes != null && partsToReload == null) - objectsToUnmarshall.add(partsToReloadBytes); - if (errsBytes != null && errs == null) objectsToUnmarshall.add(errsBytes); @@ -593,9 +556,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) { if (partHistSuppliersBytes != null && partHistSuppliers == null) partHistSuppliers = (IgniteDhtPartitionHistorySuppliersMap)iter.next(); - if (partsToReloadBytes != null && partsToReload == null) - partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next(); - if (errsBytes != null && errs == null) errs = (Map)iter.next(); @@ -685,13 +645,13 @@ public void topologyVersion(AffinityTopologyVersion topVer) { writer.incrementState(); case 15: - if (!writer.writeByteArray(partsSizesBytes)) + if (!writer.writeMap(partsSizes, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeByteArray(partsToReloadBytes)) + if (!writer.writeMessage(partsToReload)) return false; writer.incrementState(); @@ -794,7 +754,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 15: - partsSizesBytes = reader.readByteArray(); + partsSizes = reader.readMap(MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -802,7 +762,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 16: - partsToReloadBytes = reader.readByteArray(); + partsToReload = reader.readMessage(); if (!reader.isLastRead()) return false; @@ -880,8 +840,6 @@ public void cleanUp() { partCntrs = null; partCntrsBytes = null; partHistSuppliersBytes = null; - partsToReloadBytes = null; - partsSizesBytes = null; errsBytes = null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index 8515004c3fdf4..ec88b0f7b2593 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -18,45 +18,46 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Partition reload map. */ -public class IgniteDhtPartitionsToReloadMap implements Serializable { - /** */ - private static final long serialVersionUID = 0L; +public class IgniteDhtPartitionsToReloadMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 508; /** */ - private Map>> map; + @Order(value = 0, method = "partitionsToReload") + private Map map; /** * @param nodeId Node ID. * @param cacheId Cache ID. * @return Collection of partitions to reload. */ - public synchronized Set get(UUID nodeId, int cacheId) { + public synchronized Collection get(UUID nodeId, int cacheId) { if (map == null) return Collections.emptySet(); - Map> nodeMap = map.get(nodeId); + CachePartitionsToReloadMap nodeMap = map.get(nodeId); if (nodeMap == null) return Collections.emptySet(); - Set parts = nodeMap.get(cacheId); + PartitionsToReload partsToReload = nodeMap.get(cacheId); - if (parts == null) + if (partsToReload == null) return Collections.emptySet(); - return parts; + return partsToReload.partitions(); } /** @@ -68,18 +69,12 @@ public synchronized void put(UUID nodeId, int cacheId, int partId) { if (map == null) map = new HashMap<>(); - Map> nodeMap = map.get(nodeId); - - if (nodeMap == null) { - nodeMap = new HashMap<>(); - - map.put(nodeId, nodeMap); - } + CachePartitionsToReloadMap nodeMap = map.computeIfAbsent(nodeId, k -> new CachePartitionsToReloadMap()); - Set parts = nodeMap.get(cacheId); + PartitionsToReload parts = nodeMap.get(cacheId); if (parts == null) { - parts = new HashSet<>(); + parts = new PartitionsToReload(); nodeMap.put(cacheId, parts); } @@ -98,4 +93,23 @@ public synchronized boolean isEmpty() { @Override public String toString() { return S.toString(IgniteDhtPartitionsToReloadMap.class, this); } + + /** + * @return Partition reload map. + */ + public Map partitionsToReload() { + return map; + } + + /** + * @param map Partition reload map. + */ + public void partitionsToReload(Map map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java new file mode 100644 index 0000000000000..ae980365662e9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Partition sizes map. */ +public class PartitionSizesMap implements Message { + /** Type code. */ + public static final short TYPE_CODE = 509; + + /** Partition sizes map. */ + @Order(value = 0, method = "partitionSizesMap") + private @Nullable Map partsSizes; + + /** Default constructor. */ + public PartitionSizesMap() { + // No-op. + } + + /** + * @param partsSizes Partition sizes map. + */ + public PartitionSizesMap(Map partsSizes) { + this.partsSizes = partsSizes; + } + + /** + * @return Partition sizes map. + */ + public Map partitionSizes() { + return partsSizes != null ? partsSizes : Collections.emptyMap(); + } + + /** + * Used only for serialization. + * Since the code generation framework currently does not support null values for primitive wrappers, + * we use {@link Long#MIN_VALUE} instead of null. + * + * @return Partition sizes map. + */ + public @Nullable Map partitionSizesMap() { + if (partsSizes == null) + return null; + + return partsSizes.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() != null ? e.getValue() : Long.MIN_VALUE)); + } + + /** + * Used only for deserialization. + * + * @param partsSizes Partition sizes map. + */ + public void partitionSizesMap(@Nullable Map partsSizes) { + if (partsSizes == null) + this.partsSizes = partsSizes; + else + this.partsSizes = partsSizes.entrySet().stream() + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping( + e -> e.getValue() != Long.MIN_VALUE ? e.getValue() : null, + Collectors.reducing(null, (v1, v2) -> v1) + ) + )); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java new file mode 100644 index 0000000000000..7c0eacef9ea36 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Collection; +import java.util.HashSet; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Partitions to reload. */ +public class PartitionsToReload implements Message { + /** Type code. */ + public static final short TYPE_CODE = 506; + + /** Collection of partitions to reload. */ + @Order(value = 0, method = "partitions") + private Collection parts; + + /** + * @return Collection of partitions to reload. + */ + public Collection partitions() { + return parts; + } + + /** + * @param parts Collection of partitions to reload. + */ + public void partitions(Collection parts) { + this.parts = parts; + } + + /** + * @param partId Partition ID. + */ + public void add(int partId) { + if (parts == null) + parts = new HashSet<>(); + + parts.add(partId); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 9ca26f636ae3a..675f891be690c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -747,7 +747,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java index 0c2f415a7a1c1..7d6a079f0449b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -297,7 +298,7 @@ public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, G * means full map received is not related to exchange * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @param partsToReload Set of partitions that need to be reloaded. + * @param partsToReload Collection of partitions that need to be reloaded. * @param partSizes Global partition sizes. * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not * related to exchange. Value should be not less than previous 'Topology version from exchange'. @@ -309,7 +310,7 @@ public boolean update( @Nullable AffinityTopologyVersion exchangeResVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index b004053ca6f35..8227e16172814 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -1440,7 +1440,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, - Set partsToReload, + Collection partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, From 58f94289e5bac644bbe669c15a979d96ff47ab63 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 3 Nov 2025 20:43:34 +0500 Subject: [PATCH 2/8] IGNITE-26866 address review comments --- .../ignite/internal/util/lang/GridFunc.java | 7 +++++ .../communication/GridIoMessageFactory.java | 2 +- .../GridDhtPartitionsFullMessage.java | 5 +--- .../IgniteDhtPartitionsToReloadMap.java | 3 ++- .../dht/preloader/PartitionSizesMap.java | 27 +++++-------------- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 3af938392a4c8..8bf794062ef45 100755 --- a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2146,4 +2146,11 @@ public static int compareArrays(double[] a1, double[] a2) { public static Collection emptyIfNull(@Nullable Collection col) { return col == null ? Collections.emptySet() : col; } + + /** + * @param map Map. + */ + public static Map emptyIfNull(@Nullable Map map) { + return map == null ? Collections.emptyMap() : map; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 4d153015c11f3..902e0082cef9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -216,8 +216,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 27a9c50deed35..71ef2e64a343f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -365,10 +365,7 @@ public void partitionSizes(Map partsSizes) { * @return Partition sizes map (grpId, (partId, partSize)). */ public Map partitionSizes() { - if (partsSizes == null) - return Collections.emptyMap(); - - return partsSizes; + return F.emptyIfNull(partsSizes); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index ec88b0f7b2593..18719aa889352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -57,7 +58,7 @@ public synchronized Collection get(UUID nodeId, int cacheId) { if (partsToReload == null) return Collections.emptySet(); - return partsToReload.partitions(); + return F.emptyIfNull(partsToReload.partitions()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java index ae980365662e9..52c228fe6a90e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -17,10 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collections; import java.util.Map; -import java.util.stream.Collectors; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -41,7 +40,7 @@ public PartitionSizesMap() { /** * @param partsSizes Partition sizes map. */ - public PartitionSizesMap(Map partsSizes) { + public PartitionSizesMap(@Nullable Map partsSizes) { this.partsSizes = partsSizes; } @@ -49,7 +48,7 @@ public PartitionSizesMap(Map partsSizes) { * @return Partition sizes map. */ public Map partitionSizes() { - return partsSizes != null ? partsSizes : Collections.emptyMap(); + return F.emptyIfNull(partsSizes); } /** @@ -60,30 +59,18 @@ public Map partitionSizes() { * @return Partition sizes map. */ public @Nullable Map partitionSizesMap() { - if (partsSizes == null) - return null; - - return partsSizes.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() != null ? e.getValue() : Long.MIN_VALUE)); + return partsSizes == null ? null : F.viewReadOnly(partsSizes, v -> v != null ? v : Long.MIN_VALUE); } /** * Used only for deserialization. + * Since the code generation framework currently does not support null values for primitive wrappers, + * we use {@link Long#MIN_VALUE} instead of null. * * @param partsSizes Partition sizes map. */ public void partitionSizesMap(@Nullable Map partsSizes) { - if (partsSizes == null) - this.partsSizes = partsSizes; - else - this.partsSizes = partsSizes.entrySet().stream() - .collect(Collectors.groupingBy( - Map.Entry::getKey, - Collectors.mapping( - e -> e.getValue() != Long.MIN_VALUE ? e.getValue() : null, - Collectors.reducing(null, (v1, v2) -> v1) - ) - )); + this.partsSizes = partsSizes == null ? null : F.viewReadOnly(partsSizes, v -> v != Long.MIN_VALUE ? v : null); } /** {@inheritDoc} */ From 477afebfb53333b27f80e4926e19f744371665d4 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 3 Nov 2025 21:04:53 +0500 Subject: [PATCH 3/8] IGNITE-26866 fix type code --- .../distributed/dht/preloader/CachePartitionsToReloadMap.java | 2 +- .../dht/preloader/IgniteDhtPartitionsToReloadMap.java | 2 +- .../cache/distributed/dht/preloader/PartitionSizesMap.java | 2 +- .../cache/distributed/dht/preloader/PartitionsToReload.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java index 81a4502caeb83..bcf6ab80c3a43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java @@ -26,7 +26,7 @@ /** Partition reload map for cache. */ public class CachePartitionsToReloadMap implements Message { /** Type code. */ - public static final short TYPE_CODE = 507; + public static final short TYPE_CODE = 512; /** Partition reload map for cache. */ @Order(value = 0, method = "cachePartitions") diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index 18719aa889352..bc965a49926d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -33,7 +33,7 @@ */ public class IgniteDhtPartitionsToReloadMap implements Message { /** Type code. */ - public static final short TYPE_CODE = 508; + public static final short TYPE_CODE = 513; /** */ @Order(value = 0, method = "partitionsToReload") diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java index 52c228fe6a90e..da1c580d10df8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -26,7 +26,7 @@ /** Partition sizes map. */ public class PartitionSizesMap implements Message { /** Type code. */ - public static final short TYPE_CODE = 509; + public static final short TYPE_CODE = 514; /** Partition sizes map. */ @Order(value = 0, method = "partitionSizesMap") diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java index 7c0eacef9ea36..ae0148be2b39f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java @@ -26,7 +26,7 @@ /** Partitions to reload. */ public class PartitionsToReload implements Message { /** Type code. */ - public static final short TYPE_CODE = 506; + public static final short TYPE_CODE = 511; /** Collection of partitions to reload. */ @Order(value = 0, method = "partitions") From 0b3f482c58113efb6c3462ef869c7320919a0ddb Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 5 Nov 2025 11:27:42 +0500 Subject: [PATCH 4/8] IGNITE-26866 minor --- .../cache/distributed/dht/preloader/PartitionsToReload.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java index ae0148be2b39f..023b28ad03f1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.HashSet; - import org.apache.ignite.internal.Order; import org.apache.ignite.plugin.extensions.communication.Message; From 811e4ecadfcc9210be2929975d32e51796d9b3de Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 5 Nov 2025 15:03:20 +0500 Subject: [PATCH 5/8] IGNITE-26866 fix logic in PartitionSizesMap --- .../GridCachePartitionExchangeManager.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../GridDhtPartitionsFullMessage.java | 2 +- .../dht/preloader/PartitionSizesMap.java | 30 +++++++------------ 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0ccb44ef6e5fe..ecd45277089a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1772,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe boolean updated = false; - Map partsSizes = msg.partitionSizes(); + Map partsSizes = F.emptyIfNull(msg.partitionSizes()); for (Map.Entry entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3391a131e0aa3..720d52c406c4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -4654,7 +4654,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); try { - Map partsSizes = msg.partitionSizes(); + Map partsSizes = F.emptyIfNull(msg.partitionSizes()); doInParallel( parallelismLvl, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 71ef2e64a343f..15fef52e47319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -365,7 +365,7 @@ public void partitionSizes(Map partsSizes) { * @return Partition sizes map (grpId, (partId, partSize)). */ public Map partitionSizes() { - return F.emptyIfNull(partsSizes); + return partsSizes; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java index da1c580d10df8..5b04c8ce9a52a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -17,9 +17,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.util.Collections; import java.util.Map; +import java.util.stream.Collectors; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -29,7 +30,7 @@ public class PartitionSizesMap implements Message { public static final short TYPE_CODE = 514; /** Partition sizes map. */ - @Order(value = 0, method = "partitionSizesMap") + @Order(value = 0, method = "partitionSizes") private @Nullable Map partsSizes; /** Default constructor. */ @@ -48,29 +49,18 @@ public PartitionSizesMap(@Nullable Map partsSizes) { * @return Partition sizes map. */ public Map partitionSizes() { - return F.emptyIfNull(partsSizes); + return partsSizes == null + ? Collections.emptyMap() + : partsSizes.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** - * Used only for serialization. - * Since the code generation framework currently does not support null values for primitive wrappers, - * we use {@link Long#MIN_VALUE} instead of null. - * - * @return Partition sizes map. - */ - public @Nullable Map partitionSizesMap() { - return partsSizes == null ? null : F.viewReadOnly(partsSizes, v -> v != null ? v : Long.MIN_VALUE); - } - - /** - * Used only for deserialization. - * Since the code generation framework currently does not support null values for primitive wrappers, - * we use {@link Long#MIN_VALUE} instead of null. - * * @param partsSizes Partition sizes map. */ - public void partitionSizesMap(@Nullable Map partsSizes) { - this.partsSizes = partsSizes == null ? null : F.viewReadOnly(partsSizes, v -> v != Long.MIN_VALUE ? v : null); + public void partitionSizes(Map partsSizes) { + this.partsSizes = partsSizes; } /** {@inheritDoc} */ From ab4a84fdaf04053569e7b76205b23d4bd888a703 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 5 Nov 2025 16:15:56 +0500 Subject: [PATCH 6/8] IGNITE-26866 fix logic in PartitionSizesMap --- .../cache/GridCachePartitionExchangeManager.java | 2 +- .../preloader/GridDhtPartitionsExchangeFuture.java | 12 +++++++++--- .../distributed/dht/preloader/PartitionSizesMap.java | 10 ++-------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index ecd45277089a2..9a6a8cfb04533 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1788,7 +1788,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), - sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(), + sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(), msg.topologyVersion(), null, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 720d52c406c4d..7b8806b81750a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3311,12 +3311,18 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) { if (partMap == null) continue; + Map partsSizes = singleMsg.partitionSizes(top.groupId()); + for (Map.Entry e0 : partMap.entrySet()) { int p = e0.getKey(); GridDhtPartitionState state = e0.getValue(); - if (state == GridDhtPartitionState.OWNING) - partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p)); + if (state == GridDhtPartitionState.OWNING) { + Long size = partsSizes.get(p); + + if (size != null) + partSizes.put(p, size); + } } } @@ -4671,7 +4677,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa msg.partitions().get(grpId), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), - sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(), + sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(), null, this, msg.lostPartitions(grpId)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java index 5b04c8ce9a52a..41a6b9956dcba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collections; import java.util.Map; -import java.util.stream.Collectors; import org.apache.ignite.internal.Order; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -48,12 +46,8 @@ public PartitionSizesMap(@Nullable Map partsSizes) { /** * @return Partition sizes map. */ - public Map partitionSizes() { - return partsSizes == null - ? Collections.emptyMap() - : partsSizes.entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + public @Nullable Map partitionSizes() { + return partsSizes; } /** From c9b3713936d3dacbaaf19c35a6674ec5716ba476 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 5 Nov 2025 16:29:15 +0500 Subject: [PATCH 7/8] IGNITE-26866 rename --- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 7b8806b81750a..e40f6234a904f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3311,14 +3311,14 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) { if (partMap == null) continue; - Map partsSizes = singleMsg.partitionSizes(top.groupId()); + Map grpPartSizes = singleMsg.partitionSizes(top.groupId()); for (Map.Entry e0 : partMap.entrySet()) { int p = e0.getKey(); GridDhtPartitionState state = e0.getValue(); if (state == GridDhtPartitionState.OWNING) { - Long size = partsSizes.get(p); + Long size = grpPartSizes.get(p); if (size != null) partSizes.put(p, size); From e8d88aa7093cdb509e5ba991c432ddbdab98529a Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 5 Nov 2025 16:34:01 +0500 Subject: [PATCH 8/8] IGNITE-26866 rename --- .../dht/preloader/PartitionSizesMap.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java index 41a6b9956dcba..987fa9fe604e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java @@ -29,7 +29,7 @@ public class PartitionSizesMap implements Message { /** Partition sizes map. */ @Order(value = 0, method = "partitionSizes") - private @Nullable Map partsSizes; + private @Nullable Map partSizes; /** Default constructor. */ public PartitionSizesMap() { @@ -37,24 +37,24 @@ public PartitionSizesMap() { } /** - * @param partsSizes Partition sizes map. + * @param partSizes Partition sizes map. */ - public PartitionSizesMap(@Nullable Map partsSizes) { - this.partsSizes = partsSizes; + public PartitionSizesMap(@Nullable Map partSizes) { + this.partSizes = partSizes; } /** * @return Partition sizes map. */ public @Nullable Map partitionSizes() { - return partsSizes; + return partSizes; } /** - * @param partsSizes Partition sizes map. + * @param partSizes Partition sizes map. */ - public void partitionSizes(Map partsSizes) { - this.partsSizes = partsSizes; + public void partitionSizes(Map partSizes) { + this.partSizes = partSizes; } /** {@inheritDoc} */