Skip to content

Commit d68c615

Browse files
IGNITE-26580 Finish test, fix bug in an MDC filter
1 parent a19689b commit d68c615

2 files changed

Lines changed: 152 additions & 27 deletions

File tree

modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.HashMap;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.Optional;
2423
import org.apache.ignite.cluster.ClusterNode;
2524
import org.apache.ignite.lang.IgniteBiPredicate;
2625

@@ -50,8 +49,11 @@ public MdcAffinityBackupFilter(int dcsCount, int backups) {
5049

5150
/** {@inheritDoc} */
5251
@Override public boolean apply(ClusterNode node, List<ClusterNode> list) {
53-
if (list.size() == 1) //account for primary node which is assigned beforehand
52+
if (list.size() == 1) { //list contains only primary node, thus we started new assignment round.
53+
partsDistrMap.replaceAll((e, v) -> -1);
54+
5455
partsDistrMap.put(list.get(0).dataCenterId(), 1);
56+
}
5557

5658
String candidateDcId = node.dataCenterId();
5759
Integer candDcPartsCopies = partsDistrMap.get(candidateDcId);
@@ -72,11 +74,6 @@ public MdcAffinityBackupFilter(int dcsCount, int backups) {
7274
}
7375
}
7476

75-
Optional<Integer> sum = partsDistrMap.values().stream().reduce(Integer::sum);
76-
77-
if (sum.isPresent() && sum.get() == primaryAndBackups)
78-
partsDistrMap.replaceAll((e, v) -> -1);
79-
8077
return res;
8178
}
8279
}

modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java

Lines changed: 148 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package org.apache.ignite.cache.affinity.rendezvous;
1919

2020
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.stream.Collectors;
2125
import org.apache.ignite.IgniteCache;
2226
import org.apache.ignite.IgniteCheckedException;
2327
import org.apache.ignite.IgniteSystemProperties;
2428
import org.apache.ignite.cache.affinity.Affinity;
25-
import org.apache.ignite.cache.affinity.AffinityFunction;
2629
import org.apache.ignite.cluster.ClusterNode;
27-
import org.apache.ignite.configuration.CacheConfiguration;
2830
import org.apache.ignite.configuration.IgniteConfiguration;
2931
import org.apache.ignite.internal.IgniteEx;
3032
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -33,48 +35,174 @@
3335
/** */
3436
public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
3537
/** */
36-
private static final String DC_0_ID = "DC_0";
38+
private static final int PARTS_CNT = 1024;
3739

3840
/** */
39-
private static final String DC_1_ID = "DC_1";
41+
private int backups;
42+
43+
/** */
44+
private String[] dcIds;
45+
46+
/** {@inheritDoc} */
47+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
48+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
49+
50+
cfg.setCacheConfiguration(defaultCacheConfiguration()
51+
.setBackups(backups)
52+
.setAffinity(
53+
new RendezvousAffinityFunction(false, PARTS_CNT)
54+
.setAffinityBackupFilter(new MdcAffinityBackupFilter(dcIds.length, backups))));
55+
56+
return cfg;
57+
}
4058

4159
/** {@inheritDoc} */
4260
@Override protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException {
4361
return super.optimize(cfg).setIncludeProperties((String[])null);
4462
}
4563

64+
/** {@inheritDoc} */
65+
@Override protected void afterTest() throws Exception {
66+
super.afterTest();
67+
68+
stopAllGrids();
69+
}
70+
4671
/**
47-
* Verifies that partition copies are assigned evenly to all data centers.
72+
* Verifies that partition copies are assigned evenly across a cluster in two data centers.
73+
* <p/>
74+
* When a node from one data center is stopped, partition distribution is that data center should stay uniform.
4875
*
4976
* @throws Exception If failed.
5077
*/
5178
@Test
52-
public void testBasicDistribution() throws Exception {
53-
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, DC_0_ID);
54-
startGrids(2);
79+
public void test2DcDistribution() throws Exception {
80+
dcIds = new String[] {"DC_0", "DC_1"};
81+
int nodesPerDc = 4;
82+
backups = 3;
5583

56-
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, DC_1_ID);
57-
startGrid(2);
58-
IgniteEx srv = startGrid(3);
84+
IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
5985

60-
CacheConfiguration ccfg = defaultCacheConfiguration()
61-
.setBackups(3)
62-
.setAffinity(
63-
new RendezvousAffinityFunction(false, 4)
64-
.setAffinityBackupFilter(new MdcAffinityBackupFilter(2, 3)));
86+
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
87+
88+
//stopping one node in DC_1 should not compromise distribution as there are additional nodes in the same DC
89+
stopGrid(5);
90+
91+
awaitPartitionMapExchange();
6592

66-
IgniteCache cache = srv.getOrCreateCache(ccfg);
93+
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
6794

68-
checkPartitions(srv, cache, 3);
95+
//stopping another node in DC_1 should not compromise distribution as well
96+
stopGrid(6);
97+
98+
awaitPartitionMapExchange();
99+
100+
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
69101
}
70102

71-
/** */
72-
private void checkPartitions(IgniteEx srv, IgniteCache cache, int backups) throws IgniteCheckedException {) {
103+
/**
104+
* Verifies that partition copies are assigned evenly across a cluster in three data centers.
105+
*
106+
* @throws Exception If failed.
107+
*/
108+
@Test
109+
public void test3DcDistribution() throws Exception {
110+
dcIds = new String[] {"DC_0", "DC_1", "DC_2"};
111+
int nodesPerDc = 2;
112+
backups = 5;
113+
114+
IgniteEx srv = startClusterAcrossDataCenters(dcIds, 2);
115+
116+
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
117+
}
118+
119+
/** Starts specified number of nodes in each DC. */
120+
private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) throws Exception {
121+
int nodeIdx = 0;
122+
IgniteEx lastNode = null;
123+
124+
for (String dcId : dcIds) {
125+
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId);
126+
127+
for (int i = 0; i < nodesPerDc; i++)
128+
lastNode = startGrid(nodeIdx++);
129+
}
130+
131+
return lastNode;
132+
}
133+
134+
/**
135+
* Checks that copies of each partition are distributed evenly across data centers.
136+
* If checkUniformity is true, then also checks that number of copies per node is close enough to ideal value (less than 10% variation).
137+
*/
138+
private void checkPartitionsDistribution(
139+
IgniteEx srv,
140+
String[] dcIds,
141+
int nodesPerDc,
142+
int backups
143+
) {
144+
IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
145+
73146
int partCnt = cacheConfiguration(srv.configuration(), cache.getName()).getAffinity().partitions();
74147
Affinity aff = affinity(cache);
148+
int expectedCopiesPerNode = (backups + 1) / dcIds.length;
149+
150+
Map<ClusterNode, Integer> overallCopiesPerNode = new HashMap<>();
151+
int[] copiesPerNode = new int[dcIds.length * nodesPerDc];
75152

76153
for (int i = 0; i < partCnt; i++) {
154+
int[] partCopiesPerDc = new int[dcIds.length];
155+
77156
Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
157+
158+
//calculate actual number of copies in each data center
159+
//aggregate copies per each node
160+
for (ClusterNode node : nodes) {
161+
copiesPerNode[(int)(node.order() - 1)]++;
162+
163+
overallCopiesPerNode.compute(node, (k, v) -> {
164+
if (v == null)
165+
return 1;
166+
else
167+
return v + 1;
168+
});
169+
170+
for (int j = 0; j < dcIds.length; j++) {
171+
if (node.dataCenterId().equals(dcIds[j])) {
172+
partCopiesPerDc[j]++;
173+
break;
174+
}
175+
}
176+
}
177+
178+
//check that each data center has expected number of copies
179+
for (int dcIdx = 0; dcIdx < dcIds.length; dcIdx++) {
180+
assertEquals(String.format("Unexpected number of copies of partition %d in data center %s", i, dcIds[dcIdx]),
181+
expectedCopiesPerNode,
182+
partCopiesPerDc[dcIdx]);
183+
}
184+
}
185+
186+
for (String dcId : dcIds) {
187+
long nodesInDc = overallCopiesPerNode.entrySet().stream().filter(e -> e.getKey().dataCenterId().equals(dcId)).count();
188+
189+
double idealCopiesPerNode = (double)((PARTS_CNT * (backups + 1)) / (dcIds.length * nodesInDc));
190+
191+
List<Integer> numOfCopiesPerNode = overallCopiesPerNode.entrySet().stream()
192+
.filter(e -> e.getKey().dataCenterId().equals(dcId)).map(Map.Entry::getValue).collect(Collectors.toList());
193+
194+
for (int copiesOnNode : numOfCopiesPerNode) {
195+
double deviation = (Math.abs(copiesOnNode - idealCopiesPerNode) / idealCopiesPerNode);
196+
197+
assertTrue(
198+
String.format("Too big deviation from ideal distribution: partitions assigned = %d, " +
199+
"ideal partitions assigned = %d, deviation = %d",
200+
copiesOnNode,
201+
(int)idealCopiesPerNode,
202+
(int)(deviation * 100)
203+
),
204+
deviation < 0.1);
205+
}
78206
}
79207
}
80208
}

0 commit comments

Comments
 (0)