Skip to content

Commit e8e4cc4

Browse files
committed
IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap
1 parent 7bc011c commit e8e4cc4

8 files changed

Lines changed: 193 additions & 47 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
3535
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
3636
import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
37+
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
3738
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
3839
import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
3940
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
@@ -102,6 +103,7 @@
102103
import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
103104
import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
104105
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
106+
import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
105107
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
106108
import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
107109
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -112,6 +114,7 @@
112114
import org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
113115
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
114116
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
117+
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
115118
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
116119
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
117120
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -191,6 +194,7 @@
191194
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
192195
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
193196
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
197+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
194198
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
195199
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
196200
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -200,6 +204,8 @@
200204
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
201205
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
202206
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
207+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
208+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
203209
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
204210
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
205211
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -445,6 +451,10 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
445451
factory.register(GridCacheOperationMessage.TYPE_CODE, GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
446452
factory.register(BinaryMetadataVersionInfo.TYPE_CODE, BinaryMetadataVersionInfo::new,
447453
new BinaryMetadataVersionInfoSerializer());
454+
factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer());
455+
factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
456+
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
457+
new IgniteDhtPartitionsToReloadMapSerializer());
448458

449459
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
450460
// [120..123] - DR
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import org.apache.ignite.internal.Order;
24+
import org.apache.ignite.plugin.extensions.communication.Message;
25+
import org.jetbrains.annotations.Nullable;
26+
27+
/** Partition reload map for cache. */
28+
public class CachePartitionsToReloadMap implements Message {
29+
/** Type code. */
30+
public static final short TYPE_CODE = 507;
31+
32+
/** Partition reload map for cache. */
33+
@Order(value = 0, method = "cachePartitions")
34+
private Map<Integer, PartitionsToReload> map;
35+
36+
/**
37+
* @return Partition reload map for cache.
38+
*/
39+
public Map<Integer, PartitionsToReload> cachePartitions() {
40+
return map;
41+
}
42+
43+
/**
44+
* @param map Partition reload map for cache.
45+
*/
46+
public void cachePartitions(Map<Integer, PartitionsToReload> map) {
47+
this.map = map;
48+
}
49+
50+
/**
51+
* @param cacheId Cache id.
52+
* @return Partitions to reload for this cache.
53+
*/
54+
public @Nullable PartitionsToReload get(int cacheId) {
55+
if (map == null)
56+
return null;
57+
58+
return map.get(cacheId);
59+
}
60+
61+
/**
62+
* @param cacheId Cache id.
63+
* @param parts Partitions to reload.
64+
*/
65+
public void put(int cacheId, PartitionsToReload parts) {
66+
if (map == null)
67+
map = new HashMap<>();
68+
69+
map.put(cacheId, parts);
70+
}
71+
72+
/** {@inheritDoc} */
73+
@Override public short directType() {
74+
return TYPE_CODE;
75+
}
76+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
9191

9292
/** Partitions that must be cleared and re-loaded. */
9393
@GridToStringInclude
94-
@GridDirectTransient
9594
private IgniteDhtPartitionsToReloadMap partsToReload;
9695

97-
/** Serialized partitions that must be cleared and re-loaded. */
98-
private byte[] partsToReloadBytes;
99-
10096
/** Serialized partitions sizes. */
10197
private byte[] partsSizesBytes;
10298

@@ -188,7 +184,6 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
188184
cp.partHistSuppliers = partHistSuppliers;
189185
cp.partHistSuppliersBytes = partHistSuppliersBytes;
190186
cp.partsToReload = partsToReload;
191-
cp.partsToReloadBytes = partsToReloadBytes;
192187
cp.partsSizesBytes = partsSizesBytes;
193188
cp.topVer = topVer;
194189
cp.errs = errs;
@@ -361,7 +356,7 @@ public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
361356
/**
362357
*
363358
*/
364-
public Set<Integer> partsToReload(UUID nodeId, int grpId) {
359+
public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
365360
if (partsToReload == null)
366361
return Collections.emptySet();
367362

@@ -443,7 +438,6 @@ public void rebalanced(boolean rebalanced) {
443438
boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
444439
(partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) ||
445440
(partHistSuppliers != null && partHistSuppliersBytes == null) ||
446-
(partsToReload != null && partsToReloadBytes == null) ||
447441
(!F.isEmpty(errs) && errsBytes == null);
448442

449443
if (marshal) {
@@ -461,9 +455,6 @@ public void rebalanced(boolean rebalanced) {
461455
if (partHistSuppliers != null && partHistSuppliersBytes == null)
462456
objectsToMarshall.add(partHistSuppliers);
463457

464-
if (partsToReload != null && partsToReloadBytes == null)
465-
objectsToMarshall.add(partsToReload);
466-
467458
if (!F.isEmpty(errs) && errsBytes == null)
468459
objectsToMarshall.add(errs);
469460

@@ -493,9 +484,6 @@ public void rebalanced(boolean rebalanced) {
493484
if (partHistSuppliers != null && partHistSuppliersBytes == null)
494485
partHistSuppliersBytes = iter.next();
495486

496-
if (partsToReload != null && partsToReloadBytes == null)
497-
partsToReloadBytes = iter.next();
498-
499487
if (!F.isEmpty(errs) && errsBytes == null)
500488
errsBytes = iter.next();
501489
}
@@ -535,9 +523,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
535523
if (partHistSuppliersBytes != null && partHistSuppliers == null)
536524
objectsToUnmarshall.add(partHistSuppliersBytes);
537525

538-
if (partsToReloadBytes != null && partsToReload == null)
539-
objectsToUnmarshall.add(partsToReloadBytes);
540-
541526
if (errsBytes != null && errs == null)
542527
objectsToUnmarshall.add(errsBytes);
543528

@@ -593,9 +578,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
593578
if (partHistSuppliersBytes != null && partHistSuppliers == null)
594579
partHistSuppliers = (IgniteDhtPartitionHistorySuppliersMap)iter.next();
595580

596-
if (partsToReloadBytes != null && partsToReload == null)
597-
partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
598-
599581
if (errsBytes != null && errs == null)
600582
errs = (Map<UUID, Exception>)iter.next();
601583

@@ -691,7 +673,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
691673
writer.incrementState();
692674

693675
case 16:
694-
if (!writer.writeByteArray(partsToReloadBytes))
676+
if (!writer.writeMessage(partsToReload))
695677
return false;
696678

697679
writer.incrementState();
@@ -802,7 +784,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
802784
reader.incrementState();
803785

804786
case 16:
805-
partsToReloadBytes = reader.readByteArray();
787+
partsToReload = reader.readMessage();
806788

807789
if (!reader.isLastRead())
808790
return false;
@@ -880,7 +862,6 @@ public void cleanUp() {
880862
partCntrs = null;
881863
partCntrsBytes = null;
882864
partHistSuppliersBytes = null;
883-
partsToReloadBytes = null;
884865
partsSizesBytes = null;
885866
errsBytes = null;
886867
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,46 @@
1818

1919
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
2020

21-
import java.io.Serializable;
21+
import java.util.Collection;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24-
import java.util.HashSet;
2524
import java.util.Map;
26-
import java.util.Set;
2725
import java.util.UUID;
26+
import org.apache.ignite.internal.Order;
2827
import org.apache.ignite.internal.util.typedef.internal.S;
28+
import org.apache.ignite.plugin.extensions.communication.Message;
2929

3030
/**
3131
* Partition reload map.
3232
*/
33-
public class IgniteDhtPartitionsToReloadMap implements Serializable {
34-
/** */
35-
private static final long serialVersionUID = 0L;
33+
public class IgniteDhtPartitionsToReloadMap implements Message {
34+
/** Type code. */
35+
public static final short TYPE_CODE = 508;
3636

3737
/** */
38-
private Map<UUID, Map<Integer, Set<Integer>>> map;
38+
@Order(value = 0, method = "partitionsToReload")
39+
private Map<UUID, CachePartitionsToReloadMap> map;
3940

4041
/**
4142
* @param nodeId Node ID.
4243
* @param cacheId Cache ID.
4344
* @return Collection of partitions to reload.
4445
*/
45-
public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
46+
public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
4647
if (map == null)
4748
return Collections.emptySet();
4849

49-
Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
50+
CachePartitionsToReloadMap nodeMap = map.get(nodeId);
5051

5152
if (nodeMap == null)
5253
return Collections.emptySet();
5354

54-
Set<Integer> parts = nodeMap.get(cacheId);
55+
PartitionsToReload partsToReload = nodeMap.get(cacheId);
5556

56-
if (parts == null)
57+
if (partsToReload == null)
5758
return Collections.emptySet();
5859

59-
return parts;
60+
return partsToReload.partitions();
6061
}
6162

6263
/**
@@ -68,18 +69,12 @@ public synchronized void put(UUID nodeId, int cacheId, int partId) {
6869
if (map == null)
6970
map = new HashMap<>();
7071

71-
Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
72-
73-
if (nodeMap == null) {
74-
nodeMap = new HashMap<>();
75-
76-
map.put(nodeId, nodeMap);
77-
}
72+
CachePartitionsToReloadMap nodeMap = map.computeIfAbsent(nodeId, k -> new CachePartitionsToReloadMap());
7873

79-
Set<Integer> parts = nodeMap.get(cacheId);
74+
PartitionsToReload parts = nodeMap.get(cacheId);
8075

8176
if (parts == null) {
82-
parts = new HashSet<>();
77+
parts = new PartitionsToReload();
8378

8479
nodeMap.put(cacheId, parts);
8580
}
@@ -98,4 +93,23 @@ public synchronized boolean isEmpty() {
9893
@Override public String toString() {
9994
return S.toString(IgniteDhtPartitionsToReloadMap.class, this);
10095
}
96+
97+
/**
98+
* @return Partition reload map.
99+
*/
100+
public Map<UUID, CachePartitionsToReloadMap> partitionsToReload() {
101+
return map;
102+
}
103+
104+
/**
105+
* @param map Partition reload map.
106+
*/
107+
public void partitionsToReload(Map<UUID, CachePartitionsToReloadMap> map) {
108+
this.map = map;
109+
}
110+
111+
/** {@inheritDoc} */
112+
@Override public short directType() {
113+
return TYPE_CODE;
114+
}
101115
}

0 commit comments

Comments
 (0)