Skip to content

Commit 3cb1342

Browse files
authored
IGNITE-28404 Fix the stopping of lease prolongation during lease negotiation (#7931)
1 parent c0a5d0a commit 3cb1342

File tree

7 files changed

+108
-9
lines changed

7 files changed

+108
-9
lines changed

modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private CompletableFuture<HybridTimestamp> denyLease(ReplicationGroupId grpId, L
261261

262262
leaseNegotiator.cancelAgreement(grpId);
263263

264-
Leases leasesCurrent = leaseTracker.leasesLatest();
264+
Leases leasesCurrent = leaseTracker.latestLeases();
265265

266266
Collection<Lease> currentLeases = leasesCurrent.leaseByGroupId().values();
267267

@@ -440,7 +440,7 @@ private void waitForInflight() {
440440

441441
leases = new Leases(newLeasesMap, entry.value());
442442
} else {
443-
leases = leaseTracker.leasesLatest();
443+
leases = leaseTracker.latestLeases();
444444
}
445445
}
446446

@@ -815,6 +815,21 @@ private void processMessageInternal(String sender, PlacementDriverActorMessage m
815815
clusterService.messagingService().respond(sender, response, correlationId);
816816
}
817817
});
818+
} else {
819+
// Return non null value to prevent retries from non-leaseholder.
820+
long time = clockService.currentLong();
821+
822+
LOG.info("Stop lease prolongation message was received from non-leaseholder "
823+
+ "[groupId={}, sender={}, leaseholder={}, time={}]", grpId, sender, lease.getLeaseholder(), time);
824+
825+
if (correlationId != null) {
826+
StopLeaseProlongationMessageResponse response = PLACEMENT_DRIVER_MESSAGES_FACTORY
827+
.stopLeaseProlongationMessageResponse()
828+
.deniedLeaseExpirationTimeLong(time)
829+
.build();
830+
831+
clusterService.messagingService().respond(sender, response, correlationId);
832+
}
818833
}
819834
} else {
820835
LOG.warn("Unknown message type [msg={}]", msg.getClass().getSimpleName());

modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public Lease getLease(ReplicationGroupId grpId) {
181181
}
182182

183183
/** Returns collection of latest leases, ordered by replication group. Shows all latest leases including expired ones. */
184-
public Leases leasesLatest() {
184+
public Leases latestLeases() {
185185
return leases;
186186
}
187187

modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private int numberOfLeases(boolean accepted) {
9797
int count = 0;
9898
HybridTimestamp now = clockService.current();
9999

100-
for (Lease lease : leaseTracker.leasesLatest().leaseByGroupId().values()) {
100+
for (Lease lease : leaseTracker.latestLeases().leaseByGroupId().values()) {
101101
// Expired leases can be ignored.
102102
if (lease != null && accepted == lease.isAccepted() && clockService.before(lease.getExpirationTime(), now)) {
103103
count++;

modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ void setUp(
162162

163163
when(clusterService.messagingService()).thenReturn(messagingService);
164164

165-
lenient().when(leaseTracker.leasesLatest()).thenReturn(leases);
165+
lenient().when(leaseTracker.latestLeases()).thenReturn(leases);
166166
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i -> Lease.emptyLease(i.getArgument(0)));
167167

168168
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new Revisions(1, -1)));
@@ -410,7 +410,7 @@ public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch() throws E
410410

411411
Leases currentLeases = new Leases(staleLeasesByGroup, BYTE_EMPTY_ARRAY);
412412

413-
lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
413+
lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
414414
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation ->
415415
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0))));
416416

@@ -501,7 +501,7 @@ public void testNonExpiredAcceptedLeasesKeepLeaseholderIdentity() throws Excepti
501501

502502
Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY);
503503

504-
lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
504+
lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
505505
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation ->
506506
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0))));
507507

modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ CompletableFuture<Void> weakStopReplica(
283283
// If is primary, turning off the primary first.
284284
context.replicaState = ReplicaState.RESTART_PLANNED;
285285

286+
LOG.info("Stopping lease prolongation due to partition restart [groupId={}].", groupId);
287+
286288
return replicaManager.stopLeaseProlongation(groupId, null)
287289
.thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation));
288290
} else {
@@ -319,6 +321,8 @@ private CompletableFuture<Void> stopReplica(
319321
// These is some probability that the replica would be reserved after the previous lease is expired and before this method
320322
// is called, so reservation state needs to be checked again.
321323
if (context.reservedForPrimary) {
324+
LOG.info("Stopping lease prolongation due to replica stop [groupId={}].", groupId);
325+
322326
return replicaManager.stopLeaseProlongation(groupId, null)
323327
.thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation));
324328
}

modules/rest/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ dependencies {
9797
integrationTestImplementation project(':ignite-table')
9898
integrationTestImplementation project(':ignite-transactions')
9999
integrationTestImplementation project(':ignite-eventlog')
100+
integrationTestImplementation project(':ignite-placement-driver-api')
100101
integrationTestImplementation testFixtures(project(':ignite-core'))
101102
integrationTestImplementation testFixtures(project(':ignite-runner'))
102103
integrationTestImplementation testFixtures(project(':ignite-cluster-management'))

modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import static io.micronaut.http.HttpStatus.BAD_REQUEST;
2121
import static io.micronaut.http.HttpStatus.OK;
22+
import static java.util.concurrent.TimeUnit.SECONDS;
2223
import static java.util.stream.Collectors.toSet;
2324
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
2425
import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
2526
import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
2627
import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
28+
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
2729
import static org.hamcrest.MatcherAssert.assertThat;
2830
import static org.hamcrest.Matchers.allOf;
2931

@@ -36,20 +38,33 @@
3638
import java.util.Collection;
3739
import java.util.List;
3840
import java.util.Set;
41+
import java.util.concurrent.CompletableFuture;
3942
import java.util.stream.Collectors;
4043
import org.apache.ignite.Ignite;
4144
import org.apache.ignite.internal.ClusterConfiguration;
4245
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
46+
import org.apache.ignite.internal.app.IgniteImpl;
47+
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
48+
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
49+
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
50+
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
51+
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
52+
import org.apache.ignite.internal.replicator.ZonePartitionId;
4353
import org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
54+
import org.apache.ignite.internal.table.TableImpl;
55+
import org.apache.ignite.internal.wrapper.Wrappers;
4456
import org.hamcrest.Matcher;
4557
import org.hamcrest.Matchers;
4658
import org.junit.jupiter.api.BeforeAll;
59+
import org.junit.jupiter.api.BeforeEach;
4760
import org.junit.jupiter.api.Disabled;
4861
import org.junit.jupiter.api.Test;
4962

5063
/** Test for disaster recovery restart partitions command. */
5164
@MicronautTest
5265
public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPerClassIntegrationTest {
66+
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
67+
5368
private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
5469

5570
private static final String FIRST_ZONE = "first_ZONE";
@@ -65,10 +80,17 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
6580
@BeforeAll
6681
public void setUp() {
6782
sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']", FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
68-
sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME,
83+
sql(String.format("CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME,
6984
FIRST_ZONE));
7085
}
7186

87+
@BeforeEach
88+
public void beforeEach() {
89+
for (IgniteImpl node : runningNodesList()) {
90+
node.stopDroppingMessages();
91+
}
92+
}
93+
7294
@Test
7395
public void testRestartPartitionZoneNotFound() {
7496
String unknownZone = "unknown_zone";
@@ -135,7 +157,6 @@ public void testRestartAllPartitions() {
135157
}
136158

137159
@Test
138-
@Disabled("https://issues.apache.org/jira/browse/IGNITE-26377")
139160
public void testRestartSpecifiedPartitions() {
140161
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0, 1));
141162

@@ -151,6 +172,64 @@ public void testRestartPartitionsByNodes() {
151172
assertThat(client.toBlocking().exchange(post), hasStatus(OK));
152173
}
153174

175+
@Test
176+
public void testRestartPartitionDuringLeaseNegotiation() {
177+
IgniteImpl node = anyNode();
178+
179+
int zoneId = Wrappers.unwrap(node.tables().table(TABLE_NAME), TableImpl.class).zoneId();
180+
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
181+
182+
CompletableFuture<ReplicaMeta> primaryReplicaFut = anyNode().placementDriver().awaitPrimaryReplica(
183+
partId,
184+
node.clock().now(),
185+
10,
186+
SECONDS
187+
);
188+
189+
assertThat(primaryReplicaFut, willCompleteSuccessfully());
190+
191+
log.info("Test: primary replica [groupId={}, leaseholder={}]", partId, primaryReplicaFut.join().getLeaseholder());
192+
193+
CompletableFuture<?> newNegotiationFuture = new CompletableFuture<>();
194+
195+
for (IgniteImpl n : runningNodesList()) {
196+
n.dropMessages((recp, msg) -> {
197+
if (msg instanceof LeaseGrantedMessage) {
198+
LeaseGrantedMessage lgm = (LeaseGrantedMessage) msg;
199+
if (lgm.groupId().equals(partId)) {
200+
log.info("Test: new negotiation begins [groupId={}, leaseholder={}]", lgm.groupId(), recp);
201+
newNegotiationFuture.complete(null);
202+
}
203+
}
204+
205+
if (msg instanceof LeaseGrantedMessageResponse) {
206+
log.info("Test: lease negotiation tries to finish [accepted={}]", ((LeaseGrantedMessageResponse) msg).accepted());
207+
return true;
208+
}
209+
210+
return false;
211+
});
212+
}
213+
214+
StopLeaseProlongationMessage stopMsg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
215+
.groupId(partId)
216+
.build();
217+
218+
for (IgniteImpl n : runningNodesList()) {
219+
for (IgniteImpl recp : runningNodesList()) {
220+
n.clusterService().messagingService().invoke(recp.clusterService().topologyService().localMember(), stopMsg, 3000);
221+
}
222+
}
223+
224+
assertThat(newNegotiationFuture, willCompleteSuccessfully());
225+
226+
log.info("Test: partition restart");
227+
228+
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0));
229+
230+
assertThat(client.toBlocking().exchange(post), hasStatus(OK));
231+
}
232+
154233
private static Set<String> nodeNames(int count) {
155234
return CLUSTER.runningNodes()
156235
.map(Ignite::name)

0 commit comments

Comments
 (0)