3939import static org .mockito .Mockito .when ;
4040
4141import java .util .List ;
42+ import java .util .LinkedHashMap ;
43+ import java .util .Map ;
4244import java .util .Random ;
4345import java .util .Set ;
46+ import java .util .UUID ;
4447import java .util .concurrent .BrokenBarrierException ;
4548import java .util .concurrent .CyclicBarrier ;
4649import java .util .concurrent .TimeUnit ;
4952import java .util .concurrent .atomic .AtomicReference ;
5053import java .util .function .Consumer ;
5154import java .util .stream .Collectors ;
55+ import java .util .stream .IntStream ;
5256import java .util .stream .Stream ;
5357import org .apache .ignite .internal .cluster .management .topology .api .LogicalNode ;
5458import org .apache .ignite .internal .cluster .management .topology .api .LogicalTopologyService ;
@@ -119,6 +123,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest {
119123 @ Mock
120124 MetaStorageManager metaStorageManager ;
121125
126+ @ Mock
127+ private LeaseTracker leaseTracker ;
128+
122129 /** Lease updater for tests. */
123130 private LeaseUpdater leaseUpdater ;
124131
@@ -127,6 +134,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest {
127134 /** Closure to get a lease that is passed in Meta storage. */
128135 private volatile Consumer <Lease > renewLeaseConsumer = null ;
129136
137+ /** Closure to get a lease batch that is passed in Meta storage. */
138+ private volatile Consumer <LeaseBatch > renewLeaseBatchConsumer = null ;
139+
130140 private static ZonePartitionId replicationGroupId (int objectId , int partId ) {
131141 return new ZonePartitionId (objectId , partId );
132142 }
@@ -142,7 +152,6 @@ private static ByteArray pendingAssignmentsQueueKey(ZonePartitionId groupId) {
142152 @ BeforeEach
143153 void setUp (
144154 @ Mock ClusterService clusterService ,
145- @ Mock LeaseTracker leaseTracker ,
146155 @ Mock MessagingService messagingService
147156 ) {
148157 mockStableAssignments (Set .of (Assignment .forPeer (stableNode .name ())));
@@ -163,6 +172,7 @@ void setUp(
163172 lenient ().when (metaStorageManager .invoke (any (Condition .class ), any (Operation .class ), any (Operation .class )))
164173 .thenAnswer (invocation -> {
165174 Consumer <Lease > leaseConsumer = renewLeaseConsumer ;
175+ Consumer <LeaseBatch > leaseBatchConsumer = renewLeaseBatchConsumer ;
166176
167177 if (leaseConsumer != null ) {
168178 OperationImpl op = invocation .getArgument (1 );
@@ -173,6 +183,14 @@ void setUp(
173183 leaseConsumer .accept (lease );
174184 }
175185
186+ if (leaseBatchConsumer != null ) {
187+ OperationImpl op = invocation .getArgument (1 );
188+
189+ LeaseBatch batch = LeaseBatch .fromBytes (toByteArray (op .value ()));
190+
191+ leaseBatchConsumer .accept (batch );
192+ }
193+
176194 return trueCompletedFuture ();
177195 });
178196
@@ -337,6 +355,80 @@ public void testLeaseRenew() throws Exception {
337355 assertTrue (lease .getExpirationTime ().compareTo (renewedLease .getExpirationTime ()) < 0 );
338356 assertEquals (lease .getLeaseholder (), renewedLease .getLeaseholder ());
339357 }
358+
359+ @ Test
360+ public void testStaleLeaseholderIdDoesNotMixOldAndCurrentNodeIdsInBatch () throws Exception {
361+ List <LogicalNode > currentTopologyNodes = IntStream .range (0 , 8 )
362+ .mapToObj (i -> new LogicalNode (
363+ randomUUID (),
364+ "node-" + i ,
365+ NetworkAddress .from ("127.0.0.1:" + (11_000 + i ))
366+ ))
367+ .collect (Collectors .toList ());
368+
369+ when (topologyService .logicalTopologyOnLeader ()).thenReturn (completedFuture (new LogicalTopologySnapshot (1 , currentTopologyNodes )));
370+
371+ Map <ZonePartitionId , Set <Assignment >> stableAssignmentsByGroup = new LinkedHashMap <>();
372+ Map <ReplicationGroupId , Lease > staleLeasesByGroup = new LinkedHashMap <>();
373+
374+ List <LogicalNode > leaseholdersByGroup = List .of (0 , 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ).stream ()
375+ .map (currentTopologyNodes ::get )
376+ .collect (Collectors .toList ());
377+
378+ long now = System .currentTimeMillis ();
379+ UUID staleNode0Id = randomUUID ();
380+
381+ for (int i = 0 ; i < leaseholdersByGroup .size (); i ++) {
382+ ZonePartitionId grpId = replicationGroupId (1 , i );
383+ LogicalNode logicalNode = leaseholdersByGroup .get (i );
384+
385+ stableAssignmentsByGroup .put (grpId , Set .of (Assignment .forPeer (logicalNode .name ())));
386+
387+ boolean expiredLease = i == 0 ;
388+ UUID leaseholderId = i == 1
389+ ? staleNode0Id
390+ : logicalNode .id ();
391+ HybridTimestamp expirationTime = new HybridTimestamp (expiredLease ? now - 1_000 : now + 60_000 , 0 );
392+
393+ staleLeasesByGroup .put (
394+ grpId ,
395+ new Lease (
396+ logicalNode .name (),
397+ leaseholderId ,
398+ new HybridTimestamp (now - 10_000 , 0 ),
399+ expirationTime ,
400+ true ,
401+ true ,
402+ null ,
403+ grpId
404+ )
405+ );
406+ }
407+
408+ mockStableAssignments (stableAssignmentsByGroup );
409+ mockPendingAssignments (Map .of ());
410+
411+ Leases currentLeases = new Leases (staleLeasesByGroup , BYTE_EMPTY_ARRAY );
412+
413+ lenient ().when (leaseTracker .leasesLatest ()).thenReturn (currentLeases );
414+ lenient ().when (leaseTracker .getLease (any (ReplicationGroupId .class ))).thenAnswer (invocation ->
415+ currentLeases .leaseByGroupId ().getOrDefault (invocation .getArgument (0 ), Lease .emptyLease (invocation .getArgument (0 ))));
416+
417+ initAndActivateLeaseUpdater ();
418+
419+ LeaseBatch renewedBatch = awaitForLeaseBatch (5_000 );
420+
421+ assertEquals (leaseholdersByGroup .size (), renewedBatch .leases ().size ());
422+ assertEquals (8 , renewedBatch .leases ().stream ().map (Lease ::getLeaseholder ).distinct ().count ());
423+ assertEquals (8 , renewedBatch .leases ().stream ().map (Lease ::getLeaseholderId ).distinct ().count ());
424+
425+ Map <String , UUID > currentNodeIdsByName = currentTopologyNodes .stream ()
426+ .collect (Collectors .toMap (LogicalNode ::name , LogicalNode ::id ));
427+
428+ for (Lease lease : renewedBatch .leases ()) {
429+ assertEquals (currentNodeIdsByName .get (lease .getLeaseholder ()), lease .getLeaseholderId ());
430+ }
431+ }
340432
341433 @ Test
342434 public void testLeaseAmongPendings () throws Exception {
@@ -382,29 +474,43 @@ private void mockTopology(Set<Assignment> stable, Set<Assignment> pending) {
382474 }
383475
384476 private void mockPendingAssignments (Set <Assignment > assignments ) {
385- Entry pendingEntry = new EntryImpl (
386- pendingAssignmentsQueueKey (replicationGroupId (1 , 0 )).bytes (),
387- AssignmentsQueue .toBytes (Assignments .of (HybridTimestamp .MIN_VALUE .longValue (), assignments .toArray (Assignment []::new ))),
388- 1 ,
389- new HybridClockImpl ().now ()
390- );
477+ mockPendingAssignments (Map .of (replicationGroupId (1 , 0 ), assignments ));
478+ }
479+
480+ private void mockPendingAssignments (Map <ZonePartitionId , Set <Assignment >> assignmentsByGroup ) {
481+ List <Entry > pendingEntries = assignmentsByGroup .entrySet ().stream ()
482+ .map (entry -> new EntryImpl (
483+ pendingAssignmentsQueueKey (entry .getKey ()).bytes (),
484+ AssignmentsQueue .toBytes (
485+ Assignments .of (HybridTimestamp .MIN_VALUE .longValue (), entry .getValue ().toArray (Assignment []::new ))
486+ ),
487+ 1 ,
488+ new HybridClockImpl ().now ()
489+ ))
490+ .collect (Collectors .toList ());
391491
392492 byte [] prefixBytes = ZoneRebalanceUtil .PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES ;
393493 when (metaStorageManager .prefixLocally (eq (new ByteArray (prefixBytes )), anyLong ()))
394- .thenReturn (Cursor .fromIterable (List . of ( pendingEntry ) ));
494+ .thenReturn (Cursor .fromIterable (pendingEntries ));
395495 }
396496
397497 private void mockStableAssignments (Set <Assignment > assignments ) {
398- Entry stableEntry = new EntryImpl (
399- stableAssignmentsKey (replicationGroupId (1 , 0 )).bytes (),
400- Assignments .of (HybridTimestamp .MIN_VALUE .longValue (), assignments .toArray (Assignment []::new )).toBytes (),
401- 1 ,
402- new HybridClockImpl ().now ()
403- );
498+ mockStableAssignments (Map .of (replicationGroupId (1 , 0 ), assignments ));
499+ }
500+
501+ private void mockStableAssignments (Map <ZonePartitionId , Set <Assignment >> assignmentsByGroup ) {
502+ List <Entry > stableEntries = assignmentsByGroup .entrySet ().stream ()
503+ .map (entry -> new EntryImpl (
504+ stableAssignmentsKey (entry .getKey ()).bytes (),
505+ Assignments .of (HybridTimestamp .MIN_VALUE .longValue (), entry .getValue ().toArray (Assignment []::new )).toBytes (),
506+ 1 ,
507+ new HybridClockImpl ().now ()
508+ ))
509+ .collect (Collectors .toList ());
404510
405511 byte [] prefixBytes = ZoneRebalanceUtil .STABLE_ASSIGNMENTS_PREFIX_BYTES ;
406512 when (metaStorageManager .prefixLocally (eq (new ByteArray (prefixBytes )), anyLong ()))
407- .thenReturn (Cursor .fromIterable (List . of ( stableEntry ) ));
513+ .thenReturn (Cursor .fromIterable (stableEntries ));
408514 }
409515
410516 private void initAndActivateLeaseUpdater () {
@@ -481,6 +587,19 @@ private Lease awaitForLease(boolean needAccepted, @Nullable Lease previousLease,
481587 return renewedLease .get ();
482588 }
483589
590+ private LeaseBatch awaitForLeaseBatch (long timeoutMillis ) throws InterruptedException {
591+ AtomicReference <LeaseBatch > renewedBatch = new AtomicReference <>();
592+
593+ renewLeaseBatchConsumer = batch -> {
594+ renewedBatch .set (batch );
595+ renewLeaseBatchConsumer = null ;
596+ };
597+
598+ assertTrue (IgniteTestUtils .waitForCondition (() -> renewedBatch .get () != null , timeoutMillis ));
599+
600+ return renewedBatch .get ();
601+ }
602+
484603 /**
485604 * Gets a lease updater tread.
486605 *
0 commit comments