Skip to content

Commit 4facea7

Browse files
CyrillKirill Sizov
andauthored
IGNITE-27162 Adapt MG services to use timeout (#7813)
Co-authored-by: Kirill Sizov <sizov.kirill.y@gmail.com>
1 parent 7894034 commit 4facea7

32 files changed

Lines changed: 599 additions & 407 deletions

File tree

modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
114114
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
115115
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
116+
import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
116117
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
117118
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
118119
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
@@ -1351,14 +1352,20 @@ private class Node {
13511352
RaftGroupOptionsConfigurer msRaftConfigurer =
13521353
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath());
13531354

1355+
var msRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory(
1356+
clusterService,
1357+
raftGroupEventsClientListener,
1358+
failureManager
1359+
);
1360+
13541361
metaStorageManager = new MetaStorageManagerImpl(
13551362
clusterService.staticLocalNode(),
13561363
cmgManager,
13571364
logicalTopologyService,
13581365
raftManager,
13591366
keyValueStorage,
13601367
hybridClock,
1361-
topologyAwareRaftGroupServiceFactory,
1368+
msRaftServiceFactory,
13621369
metricManager,
13631370
systemDistributedConfiguration,
13641371
msRaftConfigurer,

modules/metastorage/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ dependencies {
8585
testFixturesImplementation project(':ignite-core')
8686
testFixturesImplementation project(':ignite-configuration-system')
8787
testFixturesImplementation project(':ignite-raft-api')
88+
testFixturesImplementation project(':ignite-raft')
8889
testFixturesImplementation project(':ignite-replicator')
8990
testFixturesImplementation project(':ignite-rocksdb-common')
9091
testFixturesImplementation project(':ignite-vault')

modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.ignite.internal.close.ManuallyCloseable;
6464
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
6565
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
66+
import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
6667
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
6768
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
6869
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
@@ -105,7 +106,6 @@
105106
import org.apache.ignite.internal.raft.PeersAndLearners;
106107
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
107108
import org.apache.ignite.internal.raft.TestLozaFactory;
108-
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
109109
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
110110
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
111111
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -228,11 +228,10 @@ private static class Node implements ManuallyCloseable {
228228

229229
when(logicalTopologyService.validatedNodesOnLeader()).thenReturn(emptySetCompletedFuture());
230230

231-
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
231+
var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory(
232232
clusterService,
233-
logicalTopologyService,
234-
Loza.FACTORY,
235-
raftGroupEventsClientListener
233+
raftGroupEventsClientListener,
234+
new NoOpFailureManager()
236235
);
237236

238237
cmgManager = mock(ClusterManagementGroupManager.class);

modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.stream.Stream;
5959
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
6060
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
61+
import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
6162
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
6263
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
6364
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
@@ -91,8 +92,9 @@
9192
import org.apache.ignite.internal.raft.Loza;
9293
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
9394
import org.apache.ignite.internal.raft.TestLozaFactory;
94-
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
95+
import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory;
9596
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
97+
import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
9698
import org.apache.ignite.internal.raft.storage.LogStorageManager;
9799
import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
98100
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -182,11 +184,10 @@ void setUp(
182184

183185
when(logicalTopologyService.validatedNodesOnLeader()).thenReturn(emptySetCompletedFuture());
184186

185-
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
187+
var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory(
186188
clusterService,
187-
logicalTopologyService,
188-
Loza.FACTORY,
189-
raftGroupEventsClientListener
189+
raftGroupEventsClientListener,
190+
new NoOpFailureManager()
190191
);
191192

192193
ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);
@@ -284,7 +285,7 @@ void testMetaStorageStopClosesRaftService() {
284285

285286
assertThat(metaStorageManager.stopAsync(new ComponentContext()), willCompleteSuccessfully());
286287

287-
CompletableFuture<Entry> fut = svc.get(FOO_KEY);
288+
CompletableFuture<Entry> fut = svc.get(FOO_KEY, TimeAwareRaftGroupService.NO_TIMEOUT);
288289

289290
assertThat(fut, willThrowFast(NodeStoppingException.class));
290291
}
@@ -308,7 +309,7 @@ void testMetaStorageStopBeforeRaftServiceStarted() {
308309
raftManager,
309310
storage,
310311
clock,
311-
mock(TopologyAwareRaftGroupServiceFactory.class),
312+
mock(TimeAwareRaftGroupServiceFactory.class),
312313
new NoOpMetricManager(),
313314
mock(MetastorageRepairStorage.class),
314315
mock(MetastorageRepair.class),

modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
3838
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
3939
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
40+
import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
4041
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
4142
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
4243
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
@@ -65,8 +66,8 @@
6566
import org.apache.ignite.internal.raft.Peer;
6667
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
6768
import org.apache.ignite.internal.raft.TestLozaFactory;
68-
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
6969
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
70+
import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
7071
import org.apache.ignite.internal.raft.storage.LogStorageManager;
7172
import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
7273
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
@@ -203,11 +204,10 @@ class Node {
203204

204205
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
205206

206-
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
207+
var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory(
207208
clusterService,
208-
logicalTopologyService,
209-
Loza.FACTORY,
210-
raftGroupEventsClientListener
209+
raftGroupEventsClientListener,
210+
failureManager
211211
);
212212

213213
ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(basePath.resolve("metastorage"));
@@ -287,7 +287,8 @@ CompletableFuture<Set<String>> getMetaStorageLearners() {
287287
return metaStorageManager
288288
.metaStorageService()
289289
.thenApply(MetaStorageServiceImpl::raftGroupService)
290-
.thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners()))
290+
.thenCompose(service ->
291+
service.refreshMembers(false, TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(v -> service.learners()))
291292
.thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet()));
292293
}
293294
}

modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
5656
import org.apache.ignite.internal.network.InternalClusterNode;
5757
import org.apache.ignite.internal.raft.Peer;
58-
import org.apache.ignite.internal.raft.service.RaftGroupService;
58+
import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
5959
import org.junit.jupiter.api.Test;
6060
import org.junit.jupiter.api.extension.ExtendWith;
6161
import org.junit.jupiter.params.ParameterizedTest;
@@ -394,9 +394,9 @@ void testIdleSafeTimePropagationLeaderTransferred() throws Exception {
394394
}
395395

396396
private Node transferLeadership(Node firstNode, Node secondNode) {
397-
RaftGroupService svc = getMetastorageService(firstNode);
397+
TimeAwareRaftGroupService svc = getMetastorageService(firstNode);
398398

399-
CompletableFuture<Node> future = svc.refreshLeader()
399+
CompletableFuture<Node> future = svc.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT)
400400
.thenCompose(v -> {
401401
Peer leader = svc.leader();
402402

@@ -409,16 +409,16 @@ private Node transferLeadership(Node firstNode, Node secondNode) {
409409

410410
Node newLeaderNode = newLeader.consistentId().equals(firstNode.name()) ? firstNode : secondNode;
411411

412-
return svc.transferLeadership(newLeader).thenApply(unused -> newLeaderNode);
412+
return svc.transferLeadership(newLeader, TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(unused -> newLeaderNode);
413413
});
414414

415415
assertThat(future, willCompleteSuccessfully());
416416

417417
return future.join();
418418
}
419419

420-
private RaftGroupService getMetastorageService(Node node) {
421-
CompletableFuture<RaftGroupService> future = node.metaStorageManager.metaStorageService()
420+
private TimeAwareRaftGroupService getMetastorageService(Node node) {
421+
CompletableFuture<TimeAwareRaftGroupService> future = node.metaStorageManager.metaStorageService()
422422
.thenApply(MetaStorageServiceImpl::raftGroupService);
423423

424424
assertThat(future, willCompleteSuccessfully());

modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
2121
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyLong;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
2226

2327
import java.nio.charset.StandardCharsets;
2428
import java.nio.file.Path;
@@ -45,6 +49,7 @@
4549
import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
4650
import org.apache.ignite.internal.raft.service.RaftGroupListener;
4751
import org.apache.ignite.internal.raft.service.RaftGroupService;
52+
import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
4853
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
4954
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
5055
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -89,13 +94,13 @@ public void beforeFollowerStop(RaftGroupService service, RaftServer server) {
8994

9095
metaStorage = new MetaStorageServiceImpl(
9196
followerNode,
92-
service,
97+
wrapAsTimeAware(service),
9398
new IgniteSpinBusyLock(),
9499
server.options().getClock()
95100
);
96101

97102
// Put some data in the metastorage
98-
assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE), willCompleteSuccessfully());
103+
assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully());
99104

100105
// Check that data has been written successfully
101106
checkEntry(FIRST_KEY.bytes(), FIRST_VALUE, 1);
@@ -112,21 +117,21 @@ public void afterFollowerStop(RaftGroupService service, RaftServer server, int s
112117
}
113118

114119
// Remove the first key from the metastorage
115-
assertThat(metaStorage.remove(FIRST_KEY), willCompleteSuccessfully());
120+
assertThat(metaStorage.remove(FIRST_KEY, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully());
116121

117122
// Check that data has been removed
118123
checkEntry(FIRST_KEY.bytes(), null, 2);
119124

120125
// Put same data again
121-
assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE), willCompleteSuccessfully());
126+
assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully());
122127

123128
// Check that it has been written
124129
checkEntry(FIRST_KEY.bytes(), FIRST_VALUE, 3);
125130
}
126131

127132
@Override
128133
public void afterSnapshot(RaftGroupService service) {
129-
assertThat(metaStorage.put(SECOND_KEY, SECOND_VALUE), willCompleteSuccessfully());
134+
assertThat(metaStorage.put(SECOND_KEY, SECOND_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully());
130135
}
131136

132137
@Override
@@ -178,13 +183,21 @@ protected Marshaller commandsMarshaller(ClusterService clusterService) {
178183
}
179184

180185
private void checkEntry(byte[] expKey, byte @Nullable [] expValue, long expRevision) {
181-
CompletableFuture<Entry> future = metaStorage.get(new ByteArray(expKey));
186+
CompletableFuture<Entry> future = metaStorage.get(new ByteArray(expKey), TimeAwareRaftGroupService.NO_TIMEOUT);
182187

183188
assertThat(future, willCompleteSuccessfully());
184189

185190
TestMetasStorageUtils.checkEntry(future.join(), expKey, expValue, expRevision);
186191
}
187192

193+
private static TimeAwareRaftGroupService wrapAsTimeAware(RaftGroupService service) {
194+
TimeAwareRaftGroupService timeAwareService = mock(TimeAwareRaftGroupService.class);
195+
when(timeAwareService.run(any(), anyLong())).thenAnswer(
196+
invocation -> service.run(invocation.getArgument(0), invocation.getArgument(1)));
197+
when(timeAwareService.clusterService()).thenReturn(service.clusterService());
198+
return timeAwareService;
199+
}
200+
188201
private static InternalClusterNode getNode(RaftServer server) {
189202
return server.clusterService().staticLocalNode();
190203
}

0 commit comments

Comments
 (0)