Skip to content

Commit a023e88

Browse files
committed
WIP
1 parent fc0ceba commit a023e88

File tree

7 files changed

+39
-146
lines changed

7 files changed

+39
-146
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@
189189
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPairSerializer;
190190
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
191191
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMapSerializer;
192-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
193-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer;
194192
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
195193
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessageSerializer;
196194
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -547,8 +545,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
547545
factory.register(506, CachePartitionFullCountersMap::new,
548546
new CachePartitionFullCountersMapSerializer());
549547
factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
550-
factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new,
551-
new IgniteDhtPartitionHistorySuppliersMapSerializer());
552548
factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
553549
factory.register(518, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
554550
factory.register(519, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
8888
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
8989
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
90-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
90+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
9191
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
9292
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
9393
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1348,7 +1348,7 @@ private void sendAllPartitions(
13481348
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
13491349
@Nullable final GridDhtPartitionExchangeId exchId,
13501350
@Nullable GridCacheVersion lastVer,
1351-
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
1351+
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
13521352
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload
13531353
) {
13541354
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
@@ -1369,7 +1369,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
13691369
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
13701370
@Nullable final GridDhtPartitionExchangeId exchId,
13711371
@Nullable GridCacheVersion lastVer,
1372-
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
1372+
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
13731373
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload,
13741374
Collection<CacheGroupContext> grps
13751375
) {

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
324324

325325
/** */
326326
@GridToStringExclude
327-
private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
327+
private Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new HashMap<>();
328328

329329
/** Set of nodes that cannot be used for wal rebalancing due to some reason. */
330330
private final Set<UUID> exclusionsFromHistoricalRebalance = ConcurrentHashMap.newKeySet();
@@ -587,11 +587,23 @@ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
587587
* @return List of IDs of history supplier nodes or empty list if these doesn't exist.
588588
*/
589589
public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
590-
List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince);
590+
if (partHistSuppliers == null)
591+
return Collections.emptyList();
591592

592-
histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);
593+
synchronized (partHistSuppliers) {
594+
List<UUID> histSuppliers = new ArrayList<>();
593595

594-
return histSuppliers;
596+
for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> e : partHistSuppliers.entrySet()) {
597+
Long historyCounter = e.getValue().get(new GroupPartitionIdPair(grpId, partId));
598+
599+
if (historyCounter != null && historyCounter <= cntrSince)
600+
histSuppliers.add(e.getKey());
601+
}
602+
603+
histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);
604+
605+
return histSuppliers;
606+
}
595607
}
596608

597609
/**
@@ -2436,7 +2448,11 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {
24362448
// Create and destroy caches and cache proxies.
24372449
cctx.cache().onExchangeDone(this, err);
24382450

2439-
Map<GroupPartitionIdPair, Long> locReserved = partHistSuppliers.getReservations(cctx.localNodeId());
2451+
Map<GroupPartitionIdPair, Long> locReserved;
2452+
2453+
synchronized (partHistSuppliers) {
2454+
locReserved = partHistSuppliers == null ? null : partHistSuppliers.get(cctx.localNodeId());
2455+
}
24402456

24412457
if (locReserved != null) {
24422458
boolean success = cctx.database().reserveHistoryForPreloading(locReserved);
@@ -3544,7 +3560,11 @@ private void findCounterForReservation(
35443560
break;
35453561

35463562
if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) {
3547-
partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
3563+
synchronized (partHistSuppliers) {
3564+
Map<GroupPartitionIdPair, Long> nodeMap = partHistSuppliers.computeIfAbsent(ownerId, k -> new HashMap<>());
3565+
3566+
nodeMap.put(new GroupPartitionIdPair(grpId, p), ceilingMinReserved);
3567+
}
35483568

35493569
haveHistory.add(p);
35503570

@@ -4635,8 +4655,9 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46354655

46364656
assert partHistSuppliers.isEmpty();
46374657

4638-
partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? msg.partitionHistorySuppliers() :
4639-
IgniteDhtPartitionHistorySuppliersMap.empty());
4658+
synchronized (partHistSuppliers) {
4659+
partHistSuppliers = msg.partitionHistorySuppliers() == null ? null : msg.partitionHistorySuppliers();
4660+
}
46404661

46414662
// Reserve at least 2 threads for system operations.
46424663
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
7676
@Order(3)
7777
@Compress
7878
@GridToStringInclude
79-
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
79+
Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers;
8080

8181
/** Partitions that must be cleared and re-loaded. */
8282
@Order(4)
@@ -143,7 +143,7 @@ public GridDhtPartitionsFullMessage() {
143143
public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
144144
@Nullable GridCacheVersion lastVer,
145145
@NotNull AffinityTopologyVersion topVer,
146-
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
146+
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
147147
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload) {
148148
super(id, lastVer);
149149

@@ -326,7 +326,7 @@ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
326326
/**
327327
* @return Partitions history suppliers.
328328
*/
329-
public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
329+
public Map<UUID, Map<GroupPartitionIdPair, Long>> partitionHistorySuppliers() {
330330
return partHistSuppliers;
331331
}
332332

@@ -444,9 +444,6 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
444444
if (parts == null)
445445
parts = new HashMap<>();
446446

447-
if (partHistSuppliers == null)
448-
partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
449-
450447
errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e -> e.error());
451448
}
452449

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java

Lines changed: 0 additions & 119 deletions
This file was deleted.

modules/core/src/main/resources/META-INF/classnames.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1192,7 +1192,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
11921192
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2
11931193
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
11941194
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
1195-
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
11961195
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
11971196
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
11981197
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl

modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
3030
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
3131
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
32-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
3332
import org.apache.ignite.internal.util.typedef.internal.U;
3433
import org.apache.ignite.plugin.extensions.communication.Message;
3534
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -107,13 +106,13 @@ public void testWriteReadHugeMessage() {
107106

108107
/** */
109108
private GridDhtPartitionsFullMessage fullMessage() {
110-
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
109+
Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new HashMap<>();
111110
Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();
112111

113112
for (int i = 0; i < 500; i++) {
114113
UUID uuid = UUID.randomUUID();
115114

116-
partHistSuppliers.put(uuid, i, i + 1, i + 2);
115+
partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i + 1), i + 2L));
117116
partsToReload.put(uuid, Map.of(i, Set.of(i + 1)));
118117
}
119118

@@ -122,8 +121,8 @@ private GridDhtPartitionsFullMessage fullMessage() {
122121

123122
/** */
124123
private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtPartitionsFullMessage actual) {
125-
Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = U.field(expected.partitionHistorySuppliers(), "map");
126-
Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = U.field(actual.partitionHistorySuppliers(), "map");
124+
Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = expected.partitionHistorySuppliers();
125+
Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = actual.partitionHistorySuppliers();
127126

128127
assertEquals(expHistSuppliers.size(), actHistSuppliers.size());
129128

0 commit comments

Comments
 (0)