Skip to content

Commit 12cc4d8

Browse files
IGNITE-26580 Add documentation for MDC affinity backup filter
1 parent d68c615 commit 12cc4d8

2 files changed

Lines changed: 96 additions & 32 deletions

File tree

modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,92 @@
2323
import org.apache.ignite.cluster.ClusterNode;
2424
import org.apache.ignite.lang.IgniteBiPredicate;
2525

26-
/** */
26+
/**
27+
* Multi-data center affinity backup filter that ensures each partition's data is distributed across multiple data centers,
28+
* providing high availability and fault tolerance. This implementation guarantees at least one copy of the data in each
29+
* data center and attempts to maintain the configured backup factor without discarding copies.
30+
* <p>
31+
* The filter works by grouping nodes based on their data center identification attribute (@see {@link ClusterNode#dataCenterId()})
32+
* and ensuring that for every partition, at least one node from each data center is included in the primary-backup set.
33+
* <p>
34+
* The filter will discard backup copies only if the number of available nodes in a given data center is less
35+
* than the number of copies assigned to that data center.
36+
* For example, if a partition has 4 copies (1 primary and 3 backups) and the cluster has 2 data centers,
37+
* than 2 copies are assigned to each data center. The only scenario when just a single copy is assigned to a node in a data center is when
38+
* the number of nodes in that data center is one.
39+
* <p>
40+
* This class is constructed with a number of data centers the cluster spans and a number of backups of the cache this filter is applied to.
41+
* Implementation expects that all copies can be spread evenly across all data centers. In other words, (backups + 1) is divisible by
42+
* number of data centers without remainder. Uneven distributions of copies are not supported.
43+
* <p>
44+
* Warning: Ensure that all nodes have a consistent and valid data center identifier attribute. Missing or inconsistent values
45+
* may lead to unexpected placement of data.
46+
* </pre>
47+
* <h2 class="header">Spring Example</h2>
48+
* Create a partitioned cache template where each data center has at least one copy of the data, and the backup count is maintained.
49+
* <pre name="code" class="xml">
50+
* &lt;property name="cacheConfiguration"&gt;
51+
* &lt;list&gt;
52+
* &lt;bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration"&gt;
53+
* &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
54+
* &lt;property name="cacheMode" value="PARTITIONED" /&gt;
55+
* &lt;property name="backups" value="3" /&gt;
56+
* &lt;property name="affinity"&gt;
57+
* &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
58+
* &lt;property name="affinityBackupFilter"&gt;
59+
* &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter"&gt;
60+
* &lt;constructor-arg value="2"/&gt; <!-- dcsNumber -->
61+
* &lt;constructor-arg value="3"/&gt; <!-- backups, the same as in the cache template -->
62+
* &lt;/bean&gt;
63+
* &lt;/property&gt;
64+
* &lt;/bean&gt;
65+
* &lt;/property&gt;
66+
* &lt;/bean&gt;
67+
* &lt;/list&gt;
68+
* &lt;/property&gt;
69+
* </pre>
70+
* <p>
71+
* With more backups, additional replicas can be distributed across different data centers to further improve redundancy.
72+
*/
2773
public class MdcAffinityBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
2874
/** */
2975
private static final long serialVersionUID = 1L;
3076

31-
/** */
32-
private final int dcsCount;
77+
/** Number of data centers. */
78+
private final int dcsNum;
3379

34-
/** */
80+
/** Number of copies of each partition, including primary. */
3581
private final int primaryAndBackups;
3682

37-
/** */
83+
/** Map is used to optimize the time it takes to perform a partition assignment procedure. */
3884
private final Map<String, Integer> partsDistrMap;
3985

4086
/**
41-
* @param dcsCount
42-
* @param backups
87+
* @param dcsNum Number of data centers.
88+
* @param backups Number of backups.
4389
*/
44-
public MdcAffinityBackupFilter(int dcsCount, int backups) {
45-
this.dcsCount = dcsCount;
46-
partsDistrMap = new HashMap<>(dcsCount + 1);
90+
public MdcAffinityBackupFilter(int dcsNum, int backups) {
91+
this.dcsNum = dcsNum;
92+
partsDistrMap = new HashMap<>(dcsNum + 1);
4793
primaryAndBackups = backups + 1;
4894
}
4995

50-
/** {@inheritDoc} */
51-
@Override public boolean apply(ClusterNode node, List<ClusterNode> list) {
52-
if (list.size() == 1) { //list contains only primary node, thus we started new assignment round.
96+
/**
97+
* Defines a predicate which returns {@code true} if a node is acceptable for a backup
98+
* or {@code false} otherwise.
99+
* An acceptable node is the one that belongs to a data center that has some additional copies of partition to assign to.
100+
* @param candidate A node that is a candidate for becoming a backup node for a partition.
101+
* @param previouslySelected A list of primary/backup nodes already chosen for a partition.
102+
* The primary is first.
103+
*/
104+
@Override public boolean apply(ClusterNode candidate, List<ClusterNode> previouslySelected) {
105+
if (previouslySelected.size() == 1) { //list contains only primary node, thus we started new assignment round.
53106
partsDistrMap.replaceAll((e, v) -> -1);
54107

55-
partsDistrMap.put(list.get(0).dataCenterId(), 1);
108+
partsDistrMap.put(previouslySelected.get(0).dataCenterId(), 1);
56109
}
57110

58-
String candidateDcId = node.dataCenterId();
111+
String candidateDcId = candidate.dataCenterId();
59112
Integer candDcPartsCopies = partsDistrMap.get(candidateDcId);
60113
boolean res = false;
61114

@@ -65,7 +118,7 @@ public MdcAffinityBackupFilter(int dcsCount, int backups) {
65118
res = true;
66119
}
67120
else {
68-
int partCopiesPerDc = primaryAndBackups / dcsCount;
121+
int partCopiesPerDc = primaryAndBackups / dcsNum;
69122

70123
if (candDcPartsCopies < partCopiesPerDc) {
71124
partsDistrMap.put(candidateDcId, candDcPartsCopies + 1);

modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
3333
import org.junit.Test;
3434

35-
/** */
35+
/**
36+
* Verifies behaviour of {@link MdcAffinityBackupFilter} - guarantees that each DC has at least one copy of every partition,
37+
* distribution uniformity.
38+
*/
3639
public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
3740
/** */
3841
private static final int PARTS_CNT = 1024;
@@ -83,21 +86,21 @@ public void test2DcDistribution() throws Exception {
8386

8487
IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
8588

86-
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
89+
verifyDistributionProperties(srv, dcIds, nodesPerDc, backups);
8790

8891
//stopping one node in DC_1 should not compromise distribution as there are additional nodes in the same DC
8992
stopGrid(5);
9093

9194
awaitPartitionMapExchange();
9295

93-
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
96+
verifyDistributionProperties(srv, dcIds, nodesPerDc, backups);
9497

9598
//stopping another node in DC_1 should not compromise distribution as well
9699
stopGrid(6);
97100

98101
awaitPartitionMapExchange();
99102

100-
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
103+
verifyDistributionProperties(srv, dcIds, nodesPerDc, backups);
101104
}
102105

103106
/**
@@ -113,7 +116,7 @@ public void test3DcDistribution() throws Exception {
113116

114117
IgniteEx srv = startClusterAcrossDataCenters(dcIds, 2);
115118

116-
checkPartitionsDistribution(srv, dcIds, nodesPerDc, backups);
119+
verifyDistributionProperties(srv, dcIds, nodesPerDc, backups);
117120
}
118121

119122
/** Starts specified number of nodes in each DC. */
@@ -132,10 +135,9 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t
132135
}
133136

134137
/**
135-
* Checks that copies of each partition are distributed evenly across data centers.
136-
* If checkUniformity is true, then also checks that number of copies per node is close enough to ideal value (less than 10% variation).
138+
* Checks that copies of each partition are distributed evenly across data centers and copies are spread evenly across nodes.
137139
*/
138-
private void checkPartitionsDistribution(
140+
private void verifyDistributionProperties(
139141
IgniteEx srv,
140142
String[] dcIds,
141143
int nodesPerDc,
@@ -150,10 +152,10 @@ private void checkPartitionsDistribution(
150152
Map<ClusterNode, Integer> overallCopiesPerNode = new HashMap<>();
151153
int[] copiesPerNode = new int[dcIds.length * nodesPerDc];
152154

153-
for (int i = 0; i < partCnt; i++) {
155+
for (int partId = 0; partId < partCnt; partId++) {
154156
int[] partCopiesPerDc = new int[dcIds.length];
155157

156-
Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
158+
Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(partId);
157159

158160
//calculate actual number of copies in each data center
159161
//aggregate copies per each node
@@ -175,14 +177,23 @@ private void checkPartitionsDistribution(
175177
}
176178
}
177179

178-
//check that each data center has expected number of copies
179-
for (int dcIdx = 0; dcIdx < dcIds.length; dcIdx++) {
180-
assertEquals(String.format("Unexpected number of copies of partition %d in data center %s", i, dcIds[dcIdx]),
181-
expectedCopiesPerNode,
182-
partCopiesPerDc[dcIdx]);
183-
}
180+
verifyCopyInEachDcGuarantee(partId, expectedCopiesPerNode, partCopiesPerDc);
184181
}
185182

183+
verifyDistributionUniformity(dcIds, overallCopiesPerNode);
184+
}
185+
186+
/** */
187+
private void verifyCopyInEachDcGuarantee(int partId, int expectedCopiesPerNode, int[] partCopiesPerDc) {
188+
for (int dcIdx = 0; dcIdx < dcIds.length; dcIdx++) {
189+
assertEquals(String.format("Unexpected number of copies of partition %d in data center %s", partId, dcIds[dcIdx]),
190+
expectedCopiesPerNode,
191+
partCopiesPerDc[dcIdx]);
192+
}
193+
}
194+
195+
/** */
196+
private void verifyDistributionUniformity(String[] dcIds, Map<ClusterNode, Integer> overallCopiesPerNode) {
186197
for (String dcId : dcIds) {
187198
long nodesInDc = overallCopiesPerNode.entrySet().stream().filter(e -> e.getKey().dataCenterId().equals(dcId)).count();
188199

0 commit comments

Comments
 (0)