Skip to content

Commit a06488e

Browse files
committed
IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap
1 parent 5901b7f commit a06488e

11 files changed

Lines changed: 307 additions & 85 deletions

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
3535
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
3636
import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
37+
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
3738
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
3839
import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
3940
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
@@ -102,6 +103,7 @@
102103
import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
103104
import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
104105
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
106+
import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
105107
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
106108
import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
107109
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -112,6 +114,8 @@
112114
import org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
113115
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
114116
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
117+
import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
118+
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
115119
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
116120
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
117121
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -191,6 +195,7 @@
191195
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
192196
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
193197
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
198+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
194199
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
195200
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
196201
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -200,6 +205,9 @@
200205
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
201206
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
202207
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
208+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
209+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
210+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
203211
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
204212
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
205213
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -445,6 +453,11 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
445453
factory.register(GridCacheOperationMessage.TYPE_CODE, GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
446454
factory.register(BinaryMetadataVersionInfo.TYPE_CODE, BinaryMetadataVersionInfo::new,
447455
new BinaryMetadataVersionInfoSerializer());
456+
factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer());
457+
factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
458+
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
459+
new IgniteDhtPartitionsToReloadMapSerializer());
460+
factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer());
448461

449462
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
450463
// [120..123] - DR

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
9191
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
9292
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
93+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
9394
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
9495
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
9596
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1440,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
14401441
}
14411442

14421443
if (!partsSizes.isEmpty())
1443-
m.partitionSizes(cctx, partsSizes);
1444+
m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new));
14441445

14451446
return m;
14461447
}
@@ -1771,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17711772

17721773
boolean updated = false;
17731774

1774-
Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
1775+
Map<Integer, PartitionSizesMap> partsSizes = msg.partitionSizes();
17751776

17761777
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
17771778
Integer grpId = entry.getKey();
@@ -1781,11 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17811782
GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology();
17821783

17831784
if (top != null) {
1785+
PartitionSizesMap sizesMap = partsSizes.get(grpId);
1786+
17841787
updated |= top.update(null,
17851788
entry.getValue(),
17861789
null,
17871790
msg.partsToReload(cctx.localNodeId(), grpId),
1788-
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
1791+
sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(),
17891792
msg.topologyVersion(),
17901793
null,
17911794
null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import org.apache.ignite.internal.Order;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.jetbrains.annotations.Nullable;
25+
26+
/** Partition reload map for cache. */
27+
public class CachePartitionsToReloadMap implements Message {
28+
/** Type code. */
29+
public static final short TYPE_CODE = 507;
30+
31+
/** Partition reload map for cache. */
32+
@Order(value = 0, method = "cachePartitions")
33+
private Map<Integer, PartitionsToReload> map;
34+
35+
/**
36+
* @return Partition reload map for cache.
37+
*/
38+
public Map<Integer, PartitionsToReload> cachePartitions() {
39+
return map;
40+
}
41+
42+
/**
43+
* @param map Partition reload map for cache.
44+
*/
45+
public void cachePartitions(Map<Integer, PartitionsToReload> map) {
46+
this.map = map;
47+
}
48+
49+
/**
50+
* @param cacheId Cache id.
51+
* @return Partitions to reload for this cache.
52+
*/
53+
public @Nullable PartitionsToReload get(int cacheId) {
54+
if (map == null)
55+
return null;
56+
57+
return map.get(cacheId);
58+
}
59+
60+
/**
61+
* @param cacheId Cache id.
62+
* @param parts Partitions to reload.
63+
*/
64+
public void put(int cacheId, PartitionsToReload parts) {
65+
if (map == null)
66+
map = new HashMap<>();
67+
68+
map.put(cacheId, parts);
69+
}
70+
71+
/** {@inheritDoc} */
72+
@Override public short directType() {
73+
return TYPE_CODE;
74+
}
75+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4654,7 +4654,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46544654
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
46554655

46564656
try {
4657-
Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
4657+
Map<Integer, PartitionSizesMap> partsSizes = msg.partitionSizes();
46584658

46594659
doInParallel(
46604660
parallelismLvl,
@@ -4665,11 +4665,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46654665
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
46664666

46674667
if (grp != null) {
4668+
PartitionSizesMap sizesMap = partsSizes.get(grpId);
4669+
46684670
grp.topology().update(resTopVer,
46694671
msg.partitions().get(grpId),
46704672
cntrMap,
46714673
msg.partsToReload(cctx.localNodeId(), grpId),
4672-
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
4674+
sizesMap != null ? sizesMap.partitionSizes() : Collections.emptyMap(),
46734675
null,
46744676
this,
46754677
msg.lostPartitions(grpId));

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 13 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.stream.Collectors;
3030
import java.util.stream.IntStream;
3131
import org.apache.ignite.IgniteCheckedException;
32-
import org.apache.ignite.IgniteException;
3332
import org.apache.ignite.cluster.ClusterNode;
3433
import org.apache.ignite.internal.GridDirectMap;
3534
import org.apache.ignite.internal.GridDirectTransient;
@@ -91,14 +90,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
9190

9291
/** Partitions that must be cleared and re-loaded. */
9392
@GridToStringInclude
94-
@GridDirectTransient
9593
private IgniteDhtPartitionsToReloadMap partsToReload;
9694

97-
/** Serialized partitions that must be cleared and re-loaded. */
98-
private byte[] partsToReloadBytes;
99-
100-
/** Serialized partitions sizes. */
101-
private byte[] partsSizesBytes;
95+
/** Partition sizes. */
96+
private Map<Integer, PartitionSizesMap> partsSizes;
10297

10398
/** Topology version. */
10499
private AffinityTopologyVersion topVer;
@@ -188,8 +183,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
188183
cp.partHistSuppliers = partHistSuppliers;
189184
cp.partHistSuppliersBytes = partHistSuppliersBytes;
190185
cp.partsToReload = partsToReload;
191-
cp.partsToReloadBytes = partsToReloadBytes;
192-
cp.partsSizesBytes = partsSizesBytes;
186+
cp.partsSizes = partsSizes;
193187
cp.topVer = topVer;
194188
cp.errs = errs;
195189
cp.errsBytes = errsBytes;
@@ -361,7 +355,7 @@ public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
361355
/**
362356
*
363357
*/
364-
public Set<Integer> partsToReload(UUID nodeId, int grpId) {
358+
public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
365359
if (partsToReload == null)
366360
return Collections.emptySet();
367361

@@ -371,41 +365,20 @@ public Set<Integer> partsToReload(UUID nodeId, int grpId) {
371365
/**
372366
* Supplies partition sizes map for all cache groups.
373367
*
374-
* @param ctx Cache context.
375368
* @param partsSizes Partitions sizes map.
376369
*/
377-
public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) {
378-
try {
379-
byte[] marshalled = U.marshal(ctx, partsSizes);
380-
381-
if (compressed())
382-
marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
383-
384-
partsSizesBytes = marshalled;
385-
}
386-
catch (IgniteCheckedException ex) {
387-
throw new IgniteException(ex);
388-
}
370+
public void partitionSizes(Map<Integer, PartitionSizesMap> partsSizes) {
371+
this.partsSizes = partsSizes;
389372
}
390373

391374
/**
392-
* Returns partition sizes map for all cache groups.
393-
*
394-
* @param ctx Cache context.
395375
* @return Partition sizes map (grpId, (partId, partSize)).
396376
*/
397-
public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) {
398-
if (partsSizesBytes == null)
377+
public Map<Integer, PartitionSizesMap> partitionSizes() {
378+
if (partsSizes == null)
399379
return Collections.emptyMap();
400380

401-
try {
402-
return compressed()
403-
? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader())
404-
: U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader());
405-
}
406-
catch (IgniteCheckedException ex) {
407-
throw new IgniteException(ex);
408-
}
381+
return partsSizes;
409382
}
410383

411384
/**
@@ -443,7 +416,6 @@ public void rebalanced(boolean rebalanced) {
443416
boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
444417
(partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) ||
445418
(partHistSuppliers != null && partHistSuppliersBytes == null) ||
446-
(partsToReload != null && partsToReloadBytes == null) ||
447419
(!F.isEmpty(errs) && errsBytes == null);
448420

449421
if (marshal) {
@@ -461,9 +433,6 @@ public void rebalanced(boolean rebalanced) {
461433
if (partHistSuppliers != null && partHistSuppliersBytes == null)
462434
objectsToMarshall.add(partHistSuppliers);
463435

464-
if (partsToReload != null && partsToReloadBytes == null)
465-
objectsToMarshall.add(partsToReload);
466-
467436
if (!F.isEmpty(errs) && errsBytes == null)
468437
objectsToMarshall.add(errs);
469438

@@ -493,9 +462,6 @@ public void rebalanced(boolean rebalanced) {
493462
if (partHistSuppliers != null && partHistSuppliersBytes == null)
494463
partHistSuppliersBytes = iter.next();
495464

496-
if (partsToReload != null && partsToReloadBytes == null)
497-
partsToReloadBytes = iter.next();
498-
499465
if (!F.isEmpty(errs) && errsBytes == null)
500466
errsBytes = iter.next();
501467
}
@@ -535,9 +501,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
535501
if (partHistSuppliersBytes != null && partHistSuppliers == null)
536502
objectsToUnmarshall.add(partHistSuppliersBytes);
537503

538-
if (partsToReloadBytes != null && partsToReload == null)
539-
objectsToUnmarshall.add(partsToReloadBytes);
540-
541504
if (errsBytes != null && errs == null)
542505
objectsToUnmarshall.add(errsBytes);
543506

@@ -593,9 +556,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
593556
if (partHistSuppliersBytes != null && partHistSuppliers == null)
594557
partHistSuppliers = (IgniteDhtPartitionHistorySuppliersMap)iter.next();
595558

596-
if (partsToReloadBytes != null && partsToReload == null)
597-
partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
598-
599559
if (errsBytes != null && errs == null)
600560
errs = (Map<UUID, Exception>)iter.next();
601561

@@ -685,13 +645,13 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
685645
writer.incrementState();
686646

687647
case 15:
688-
if (!writer.writeByteArray(partsSizesBytes))
648+
if (!writer.writeMap(partsSizes, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
689649
return false;
690650

691651
writer.incrementState();
692652

693653
case 16:
694-
if (!writer.writeByteArray(partsToReloadBytes))
654+
if (!writer.writeMessage(partsToReload))
695655
return false;
696656

697657
writer.incrementState();
@@ -794,15 +754,15 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
794754
reader.incrementState();
795755

796756
case 15:
797-
partsSizesBytes = reader.readByteArray();
757+
partsSizes = reader.readMap(MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
798758

799759
if (!reader.isLastRead())
800760
return false;
801761

802762
reader.incrementState();
803763

804764
case 16:
805-
partsToReloadBytes = reader.readByteArray();
765+
partsToReload = reader.readMessage();
806766

807767
if (!reader.isLastRead())
808768
return false;
@@ -880,8 +840,6 @@ public void cleanUp() {
880840
partCntrs = null;
881841
partCntrsBytes = null;
882842
partHistSuppliersBytes = null;
883-
partsToReloadBytes = null;
884-
partsSizesBytes = null;
885843
errsBytes = null;
886844
}
887845
}

0 commit comments

Comments
 (0)