From 6941fb73cfc7091f2ec43a43d9a7cd9ff73cf7ed Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Wed, 11 Mar 2026 15:14:57 +0400 Subject: [PATCH 1/7] IGNITE-28013 Fix stale leaseholder ID handling in LeaseUpdater and lease batch serialization --- .../placementdriver/LeaseUpdater.java | 22 +++ .../placementdriver/TopologyTracker.java | 22 +++ .../placementdriver/LeaseUpdaterTest.java | 149 ++++++++++++++++-- 3 files changed, 178 insertions(+), 15 deletions(-) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index b5cbf6501444..e9afa2ed49b4 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -486,11 +486,33 @@ private void updateLeaseBatchInternal() { : lease.proposedCandidate(); InternalClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedLeaseholder); + boolean leaseholderIdIsInLogicalTopology = lease.getLeaseholderId() != null + && topologyTracker.containsNodeId(lease.getLeaseholderId()); + boolean hasStaleLeaseholderId = lease.isAccepted() && !leaseholderIdIsInLogicalTopology; boolean canBeProlonged = lease.isAccepted() && lease.isProlongable() + && !hasStaleLeaseholderId && candidate != null && candidate.id().equals(lease.getLeaseholderId()); + if (hasStaleLeaseholderId) { + LOG.info("Leaseholder has left the logical topology, creating a new lease [groupId={}, lease={}, candidate={}]", + grpId, lease, candidate); + + if (candidate == null) { + logGroupWithoutCandidateOnce(grpId, true, stableAssignments, pendingAssignments); + continue; + } + + Lease newLease = writeNewLease(grpId, candidate, renewedLeases); + + boolean force = Objects.equals(lease.getLeaseholder(), candidate.name()); + + toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force)); + + continue; + } + // The lease is expired or close to this, trying to prolong if possible or create a new one. if (lease.getExpirationTime().getPhysical() < outdatedLeaseThreshold) { boolean isLeaseOutdated = isLeaseOutdated(lease); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java index 1c1f9dd7a3ee..99a728443805 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.placementdriver; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; @@ -106,6 +107,27 @@ public void stopTrack() { return null; } + /** + * Returns {@code true} if the logical topology snapshot contains a node with the given transient node id. + * + * @param nodeId Node id. + */ + public boolean containsNodeId(UUID nodeId) { + LogicalTopologySnapshot logicalTopologySnap0 = topologySnapRef.get(); + + if (logicalTopologySnap0 == null || CollectionUtils.nullOrEmpty(logicalTopologySnap0.nodes())) { + return false; + } + + for (LogicalNode node : logicalTopologySnap0.nodes()) { + if (node.id().equals(nodeId)) { + return true; + } + } + + return false; + } + LogicalTopologySnapshot currentTopologySnapshot() { return topologySnapRef.get(); } diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java index e23632ddef1a..deaa3cb4deaa 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java @@ -39,8 +39,11 @@ import static org.mockito.Mockito.when; import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -119,6 +123,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest { @Mock MetaStorageManager metaStorageManager; + @Mock + private LeaseTracker leaseTracker; + /** Lease updater for tests. */ private LeaseUpdater leaseUpdater; @@ -127,6 +134,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest { /** Closure to get a lease that is passed in Meta storage. */ private volatile Consumer renewLeaseConsumer = null; + /** Closure to get a lease batch that is passed in Meta storage. */ + private volatile Consumer renewLeaseBatchConsumer = null; + private static ZonePartitionId replicationGroupId(int objectId, int partId) { return new ZonePartitionId(objectId, partId); } @@ -142,7 +152,6 @@ private static ByteArray pendingAssignmentsQueueKey(ZonePartitionId groupId) { @BeforeEach void setUp( @Mock ClusterService clusterService, - @Mock LeaseTracker leaseTracker, @Mock MessagingService messagingService ) { mockStableAssignments(Set.of(Assignment.forPeer(stableNode.name()))); @@ -163,6 +172,7 @@ void setUp( lenient().when(metaStorageManager.invoke(any(Condition.class), any(Operation.class), any(Operation.class))) .thenAnswer(invocation -> { Consumer leaseConsumer = renewLeaseConsumer; + Consumer leaseBatchConsumer = renewLeaseBatchConsumer; if (leaseConsumer != null) { OperationImpl op = invocation.getArgument(1); @@ -173,6 +183,14 @@ void setUp( leaseConsumer.accept(lease); } + if (leaseBatchConsumer != null) { + OperationImpl op = invocation.getArgument(1); + + LeaseBatch batch = LeaseBatch.fromBytes(toByteArray(op.value())); + + leaseBatchConsumer.accept(batch); + } + return trueCompletedFuture(); }); @@ -337,6 +355,80 @@ public void testLeaseRenew() throws Exception { assertTrue(lease.getExpirationTime().compareTo(renewedLease.getExpirationTime()) < 0); assertEquals(lease.getLeaseholder(), renewedLease.getLeaseholder()); } + + @Test + public void testStaleLeaseholderIdDoesNotMixOldAndCurrentNodeIdsInBatch() throws Exception { + List currentTopologyNodes = IntStream.range(0, 8) + .mapToObj(i -> new LogicalNode( + randomUUID(), + "node-" + i, + NetworkAddress.from("127.0.0.1:" + (11_000 + i)) + )) + .collect(Collectors.toList()); + + when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new LogicalTopologySnapshot(1, currentTopologyNodes))); + + Map> stableAssignmentsByGroup = new LinkedHashMap<>(); + Map staleLeasesByGroup = new LinkedHashMap<>(); + + List leaseholdersByGroup = List.of(0, 0, 1, 2, 3, 4, 5, 6, 7).stream() + .map(currentTopologyNodes::get) + .collect(Collectors.toList()); + + long now = System.currentTimeMillis(); + UUID staleNode0Id = randomUUID(); + + for (int i = 0; i < leaseholdersByGroup.size(); i++) { + ZonePartitionId grpId = replicationGroupId(1, i); + LogicalNode logicalNode = leaseholdersByGroup.get(i); + + stableAssignmentsByGroup.put(grpId, Set.of(Assignment.forPeer(logicalNode.name()))); + + boolean expiredLease = i == 0; + UUID leaseholderId = i == 1 + ? staleNode0Id + : logicalNode.id(); + HybridTimestamp expirationTime = new HybridTimestamp(expiredLease ? now - 1_000 : now + 60_000, 0); + + staleLeasesByGroup.put( + grpId, + new Lease( + logicalNode.name(), + leaseholderId, + new HybridTimestamp(now - 10_000, 0), + expirationTime, + true, + true, + null, + grpId + ) + ); + } + + mockStableAssignments(stableAssignmentsByGroup); + mockPendingAssignments(Map.of()); + + Leases currentLeases = new Leases(staleLeasesByGroup, BYTE_EMPTY_ARRAY); + + lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases); + lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation -> + currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0)))); + + initAndActivateLeaseUpdater(); + + LeaseBatch renewedBatch = awaitForLeaseBatch(5_000); + + assertEquals(leaseholdersByGroup.size(), renewedBatch.leases().size()); + assertEquals(8, renewedBatch.leases().stream().map(Lease::getLeaseholder).distinct().count()); + assertEquals(8, renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count()); + + Map currentNodeIdsByName = currentTopologyNodes.stream() + .collect(Collectors.toMap(LogicalNode::name, LogicalNode::id)); + + for (Lease lease : renewedBatch.leases()) { + assertEquals(currentNodeIdsByName.get(lease.getLeaseholder()), lease.getLeaseholderId()); + } + } @Test public void testLeaseAmongPendings() throws Exception { @@ -382,29 +474,43 @@ private void mockTopology(Set stable, Set pending) { } private void mockPendingAssignments(Set assignments) { - Entry pendingEntry = new EntryImpl( - pendingAssignmentsQueueKey(replicationGroupId(1, 0)).bytes(), - AssignmentsQueue.toBytes(Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), assignments.toArray(Assignment[]::new))), - 1, - new HybridClockImpl().now() - ); + mockPendingAssignments(Map.of(replicationGroupId(1, 0), assignments)); + } + + private void mockPendingAssignments(Map> assignmentsByGroup) { + List pendingEntries = assignmentsByGroup.entrySet().stream() + .map(entry -> new EntryImpl( + pendingAssignmentsQueueKey(entry.getKey()).bytes(), + AssignmentsQueue.toBytes( + Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), entry.getValue().toArray(Assignment[]::new)) + ), + 1, + new HybridClockImpl().now() + )) + .collect(Collectors.toList()); byte[] prefixBytes = ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES; when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), anyLong())) - .thenReturn(Cursor.fromIterable(List.of(pendingEntry))); + .thenReturn(Cursor.fromIterable(pendingEntries)); } private void mockStableAssignments(Set assignments) { - Entry stableEntry = new EntryImpl( - stableAssignmentsKey(replicationGroupId(1, 0)).bytes(), - Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), assignments.toArray(Assignment[]::new)).toBytes(), - 1, - new HybridClockImpl().now() - ); + mockStableAssignments(Map.of(replicationGroupId(1, 0), assignments)); + } + + private void mockStableAssignments(Map> assignmentsByGroup) { + List stableEntries = assignmentsByGroup.entrySet().stream() + .map(entry -> new EntryImpl( + stableAssignmentsKey(entry.getKey()).bytes(), + Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), entry.getValue().toArray(Assignment[]::new)).toBytes(), + 1, + new HybridClockImpl().now() + )) + .collect(Collectors.toList()); byte[] prefixBytes = ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES; when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), anyLong())) - .thenReturn(Cursor.fromIterable(List.of(stableEntry))); + .thenReturn(Cursor.fromIterable(stableEntries)); } private void initAndActivateLeaseUpdater() { @@ -481,6 +587,19 @@ private Lease awaitForLease(boolean needAccepted, @Nullable Lease previousLease, return renewedLease.get(); } + private LeaseBatch awaitForLeaseBatch(long timeoutMillis) throws InterruptedException { + AtomicReference renewedBatch = new AtomicReference<>(); + + renewLeaseBatchConsumer = batch -> { + renewedBatch.set(batch); + renewLeaseBatchConsumer = null; + }; + + assertTrue(IgniteTestUtils.waitForCondition(() -> renewedBatch.get() != null, timeoutMillis)); + + return renewedBatch.get(); + } + /** * Gets a lease updater tread. * From e6d1f86d75620d0fc3b09808f8c49c69e39740d0 Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Tue, 17 Mar 2026 12:10:02 +0400 Subject: [PATCH 2/7] IGNITE-28013 We don't need to override non-expired leases, so to fix the issue we change serialization. Moreover, added test to check that non-expired leases are not overriden. --- .../placementdriver/LeaseUpdater.java | 23 ---- .../placementdriver/TopologyTracker.java | 17 +-- .../leases/LeaseBatchSerializer.java | 2 +- .../leases/NodesDictionary.java | 4 + .../placementdriver/LeaseUpdaterTest.java | 109 ++++++++++++++++-- .../leases/LeaseBatchSerializerTest.java | 16 +++ 6 files changed, 122 insertions(+), 49 deletions(-) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index e9afa2ed49b4..6816eed83f5a 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -486,33 +486,10 @@ private void updateLeaseBatchInternal() { : lease.proposedCandidate(); InternalClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedLeaseholder); - boolean leaseholderIdIsInLogicalTopology = lease.getLeaseholderId() != null - && topologyTracker.containsNodeId(lease.getLeaseholderId()); - boolean hasStaleLeaseholderId = lease.isAccepted() && !leaseholderIdIsInLogicalTopology; - boolean canBeProlonged = lease.isAccepted() && lease.isProlongable() - && !hasStaleLeaseholderId && candidate != null && candidate.id().equals(lease.getLeaseholderId()); - if (hasStaleLeaseholderId) { - LOG.info("Leaseholder has left the logical topology, creating a new lease [groupId={}, lease={}, candidate={}]", - grpId, lease, candidate); - - if (candidate == null) { - logGroupWithoutCandidateOnce(grpId, true, stableAssignments, pendingAssignments); - continue; - } - - Lease newLease = writeNewLease(grpId, candidate, renewedLeases); - - boolean force = Objects.equals(lease.getLeaseholder(), candidate.name()); - - toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force)); - - continue; - } - // The lease is expired or close to this, trying to prolong if possible or create a new one. if (lease.getExpirationTime().getPhysical() < outdatedLeaseThreshold) { boolean isLeaseOutdated = isLeaseOutdated(lease); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java index f67af1ffbf55..1c1f9dd7a3ee 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.placementdriver; -import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -93,19 +91,6 @@ public void stopTrack() { * @return Cluster node or {@code null} if topology has no a node with the consistent id. */ public @Nullable InternalClusterNode nodeByConsistentId(String consistentId) { - return findNode(node -> node.name().equals(consistentId)); - } - - /** - * Returns {@code true} if the logical topology snapshot contains a node with the given transient node id. - * - * @param nodeId Node id. - */ - public boolean containsNodeId(UUID nodeId) { - return findNode(node -> node.id().equals(nodeId)) != null; - } - - private @Nullable InternalClusterNode findNode(Predicate predicate) { LogicalTopologySnapshot logicalTopologySnap0 = topologySnapRef.get(); if (logicalTopologySnap0 == null || CollectionUtils.nullOrEmpty(logicalTopologySnap0.nodes())) { @@ -113,7 +98,7 @@ public boolean containsNodeId(UUID nodeId) { } for (LogicalNode node : logicalTopologySnap0.nodes()) { - if (predicate.test(node)) { + if (node.name().equals(consistentId)) { return node; } } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java index 6359bd2b527a..8238dc3a7a8a 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java @@ -356,7 +356,7 @@ private static int packNodesInfo(int holderNodeIndex, int proposedCandidateNameI private static boolean holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) { // Up to 8 names means that for name index it's enough to have 3 bits, same for node index, so, in sum, they // require up to 6 bits, and we have 7 bits in a varint byte. - return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE; + return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE && dictionary.nodeCount() <= MAX_NODES_FOR_COMPACT_MODE; } private static int flags( diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java index 967a0a4fa273..f1913bf8c2ce 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java @@ -132,6 +132,10 @@ int nameCount() { return nameIndexToName.size(); } + int nodeCount() { + return nodeIndexToId.size(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java index deaa3cb4deaa..d6fa84c684fc 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java @@ -38,8 +38,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.List; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -157,7 +157,7 @@ void setUp( mockStableAssignments(Set.of(Assignment.forPeer(stableNode.name()))); mockPendingAssignments(Set.of(Assignment.forPeer(pendingNode.name()))); - when(messagingService.invoke(anyString(), any(), anyLong())) + lenient().when(messagingService.invoke(anyString(), any(), anyLong())) .then(i -> completedFuture(PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(true).build())); when(clusterService.messagingService()).thenReturn(messagingService); @@ -357,7 +357,7 @@ public void testLeaseRenew() throws Exception { } @Test - public void testStaleLeaseholderIdDoesNotMixOldAndCurrentNodeIdsInBatch() throws Exception { + public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch() throws Exception { List currentTopologyNodes = IntStream.range(0, 8) .mapToObj(i -> new LogicalNode( randomUUID(), @@ -420,14 +420,105 @@ public void testStaleLeaseholderIdDoesNotMixOldAndCurrentNodeIdsInBatch() throws assertEquals(leaseholdersByGroup.size(), renewedBatch.leases().size()); assertEquals(8, renewedBatch.leases().stream().map(Lease::getLeaseholder).distinct().count()); - assertEquals(8, renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count()); + assertEquals(9, renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count()); - Map currentNodeIdsByName = currentTopologyNodes.stream() - .collect(Collectors.toMap(LogicalNode::name, LogicalNode::id)); + Map renewedLeaseByGroup = renewedBatch.leases().stream() + .collect(Collectors.toMap(Lease::replicationGroupId, lease -> lease)); - for (Lease lease : renewedBatch.leases()) { - assertEquals(currentNodeIdsByName.get(lease.getLeaseholder()), lease.getLeaseholderId()); - } + Lease expiredLeaseWithCurrentId = renewedLeaseByGroup.get(replicationGroupId(1, 0)); + assertEquals(leaseholdersByGroup.get(0).id(), expiredLeaseWithCurrentId.getLeaseholderId()); + + Lease nonExpiredLeaseWithStaleId = renewedLeaseByGroup.get(replicationGroupId(1, 1)); + assertEquals(staleNode0Id, nonExpiredLeaseWithStaleId.getLeaseholderId()); + } + + @Test + public void testNonExpiredAcceptedLeasesKeepLeaseholderIdentity() throws Exception { + List currentTopologyNodes = IntStream.range(0, 3) + .mapToObj(i -> new LogicalNode( + randomUUID(), + "node-" + i, + NetworkAddress.from("127.0.0.1:" + (11_000 + i)) + )) + .collect(Collectors.toList()); + + when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new LogicalTopologySnapshot(1, currentTopologyNodes))); + + long now = System.currentTimeMillis(); + UUID staleNode1Id = randomUUID(); + + ZonePartitionId expiredGrpId = replicationGroupId(1, 0); + ZonePartitionId nonExpiredStaleIdGrpId = replicationGroupId(1, 1); + ZonePartitionId nonExpiredCurrentIdGrpId = replicationGroupId(1, 2); + + Map> stableAssignmentsByGroup = Map.of( + expiredGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(0).name())), + nonExpiredStaleIdGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(1).name())), + nonExpiredCurrentIdGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(2).name())) + ); + + mockStableAssignments(stableAssignmentsByGroup); + mockPendingAssignments(Map.of()); + + Lease expiredLease = new Lease( + currentTopologyNodes.get(0).name(), + currentTopologyNodes.get(0).id(), + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now - 1_000, 0), + true, + true, + null, + expiredGrpId + ); + + Lease nonExpiredLeaseWithStaleId = new Lease( + currentTopologyNodes.get(1).name(), + staleNode1Id, + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now + 60_000, 0), + true, + true, + null, + nonExpiredStaleIdGrpId + ); + + Lease nonExpiredLeaseWithCurrentId = new Lease( + currentTopologyNodes.get(2).name(), + currentTopologyNodes.get(2).id(), + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now + 60_000, 0), + true, + true, + null, + nonExpiredCurrentIdGrpId + ); + + Map leasesByGroup = Map.of( + expiredGrpId, expiredLease, + nonExpiredStaleIdGrpId, nonExpiredLeaseWithStaleId, + nonExpiredCurrentIdGrpId, nonExpiredLeaseWithCurrentId + ); + + Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY); + + lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases); + lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation -> + currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0)))); + + initAndActivateLeaseUpdater(); + + LeaseBatch renewedBatch = awaitForLeaseBatch(5_000); + + Map renewedLeaseByGroup = renewedBatch.leases().stream() + .collect(Collectors.toMap(Lease::replicationGroupId, lease -> lease)); + + Lease renewedNonExpiredWithStaleId = renewedLeaseByGroup.get(nonExpiredStaleIdGrpId); + assertEquals(nonExpiredLeaseWithStaleId.getLeaseholder(), renewedNonExpiredWithStaleId.getLeaseholder()); + assertEquals(nonExpiredLeaseWithStaleId.getLeaseholderId(), renewedNonExpiredWithStaleId.getLeaseholderId()); + + Lease renewedNonExpiredWithCurrentId = renewedLeaseByGroup.get(nonExpiredCurrentIdGrpId); + assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholder(), renewedNonExpiredWithCurrentId.getLeaseholder()); + assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholderId(), renewedNonExpiredWithCurrentId.getLeaseholderId()); } @Test diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java index e359c152fc37..56fc731dd9e5 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java @@ -202,6 +202,22 @@ void batchWithExactly8NodeNames() { verifySerializationAndDeserializationGivesSameResult(originalBatch); } + @Test + void batchWithExactly8NodeNamesAndMoreThan8NodeIds() { + List originalLeases = IntStream.range(0, 8) + .mapToObj(n -> { + String nodeName = "node" + n; + return tableLease(nodeName, randomUUID(), nodeName, n); + }) + .collect(toList()); + + originalLeases.add(tableLease("node0", randomUUID(), "node0", 8)); + + LeaseBatch originalBatch = new LeaseBatch(originalLeases); + + verifySerializationAndDeserializationGivesSameResult(originalBatch); + } + @Test void batchWithMoreThan8NodeNames() { List originalLeases = IntStream.range(0, 9) From b692fc2aa0b480b380e582f7bb57e0e85d3a6761 Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Tue, 17 Mar 2026 12:42:40 +0400 Subject: [PATCH 3/7] IGNITE-28013 Fixed checkstyle --- .../ignite/internal/placementdriver/LeaseUpdaterTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java index d6fa84c684fc..58181434cbe0 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java @@ -355,7 +355,7 @@ public void testLeaseRenew() throws Exception { assertTrue(lease.getExpirationTime().compareTo(renewedLease.getExpirationTime()) < 0); assertEquals(lease.getLeaseholder(), renewedLease.getLeaseholder()); } - + @Test public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch() throws Exception { List currentTopologyNodes = IntStream.range(0, 8) @@ -650,8 +650,8 @@ private Lease awaitForLease(boolean needAccepted) throws InterruptedException { * Waits for lease write to Meta storage. * * @param needAccepted Whether to wait only for accepted lease. - * @param previousLease Previous lease. If not null, then wait for any lease having expiration time other than the previous has (i.e. - * either another lease or prolonged lease). + * @param previousLease Previous lease. If not null, then wait for any lease having expiration time other than the previous has + * (i.e. either another lease or prolonged lease). * @param timeoutMillis Timeout in milliseconds to wait for lease. * @return A lease. * @throws InterruptedException if the wait is interrupted. From 8dd90e0ce6ce4e3c022ad62876214684ce128b44 Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Thu, 19 Mar 2026 18:11:38 +0400 Subject: [PATCH 4/7] IGNITE-28013 Added backwards-compatibility for serialization. --- .../leases/LeaseBatchSerializer.java | 27 ++++++++- .../leases/LeaseBatchSerializerTest.java | 59 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java index 8238dc3a7a8a..67e1ed265492 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java @@ -141,6 +141,15 @@ public class LeaseBatchSerializer extends VersionedSerializer { /** Mask to extract lease holder index from compact representation. */ private static final int COMPACT_HOLDER_INDEX_MASK = (1 << BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1; + private static final byte PROTOCOL_V1 = 1; + + private static final byte PROTOCOL_V2 = 2; + + @Override + protected byte getProtocolVersion() { + return PROTOCOL_V2; + } + @Override protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) throws IOException { long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch); @@ -378,6 +387,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws long minExpirationTimePhysical = in.readVarInt(); HybridTimestamp commonExpirationTime = new HybridTimestamp(minExpirationTimePhysical + in.readVarInt(), in.readVarIntAsInt()); NodesDictionary nodesDictionary = NodesDictionary.readFrom(in); + boolean canReadNodesInfoCompactly = holderIdAndProposedCandidateFitIn1ByteForRead(protoVer, nodesDictionary); List leases = new ArrayList<>(); @@ -385,6 +395,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, TablePartitionId::new @@ -395,6 +406,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, ZonePartitionId::new @@ -408,6 +420,7 @@ private static void readPartitionedGroupLeases( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, List leases, IgniteDataInput in, GroupIdFactory groupIdFactory @@ -420,6 +433,7 @@ private static void readPartitionedGroupLeases( minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, groupIdFactory, @@ -432,6 +446,7 @@ private static int readLeasesForObject( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, List leases, IgniteDataInput in, GroupIdFactory groupIdFactory, @@ -447,6 +462,7 @@ private static int readLeasesForObject( minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, in, groupIdFactory ); @@ -464,6 +480,7 @@ private static int readLeasesForObject( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, IgniteDataInput in, GroupIdFactory groupIdFactory ) throws IOException { @@ -477,7 +494,7 @@ private static int readLeasesForObject( int holderNodeIndex; int proposedCandidateNodeIndex = -1; - if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) { + if (canReadNodesInfoCompactly) { int nodesInfo = in.readVarIntAsInt(); holderNodeIndex = unpackHolderNodeIndex(nodesInfo); @@ -538,6 +555,14 @@ private static boolean flagSet(int flags, int mask) { return (flags & mask) != 0; } + private static boolean holderIdAndProposedCandidateFitIn1ByteForRead(byte protoVer, NodesDictionary dictionary) { + if (protoVer == PROTOCOL_V1) { + return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE; + } + + return holderIdAndProposedCandidateFitIn1Byte(dictionary); + } + @FunctionalInterface private interface GroupIdFactory { PartitionGroupId create(int objectId, int partitionId); diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java index 56fc731dd9e5..9e56a32c4291 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; +import java.io.IOException; import java.time.LocalDateTime; import java.time.Month; import java.time.ZoneOffset; @@ -38,6 +39,7 @@ import org.apache.ignite.internal.replicator.PartitionGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput; import org.apache.ignite.internal.versioned.VersionedSerialization; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -316,6 +318,63 @@ void v1CanBeDeserialized(String serializedString, List expectedLeases) { ); } + @Test + void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws IOException { + byte[] bytes = v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding(); + + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); + + Lease expectedLease = new Lease( + "node0", + new UUID(0, 1), + new HybridTimestamp(900, 0), + new HybridTimestamp(1000, 0), + true, + true, + "node1", + new TablePartitionId(1, 0) + ); + + assertThat(restoredBatch.leases(), containsInAnyOrder(expectedLease)); + assertEquals(List.of("node1"), restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())); + } + + private static byte[] v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding() throws IOException { + try (IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(256)) { + // Header written by VersionedSerializer.writeExternal(): MAGIC(0x43BEEF00) + protocolVersion(1). + out.writeInt(0x43BEEF01); + + // Lease batch header. + out.writeVarInt(1000); // minExpirationTimePhysical + out.writeVarInt(0); // commonExpirationTimePhysicalDelta + out.writeVarInt(0); // commonExpirationTimeLogical + + // Nodes dictionary: 8 names. + out.writeVarInt(8); + for (int i = 0; i < 8; i++) { + out.writeUTF("node" + i); + } + + // Nodes dictionary: 9 node IDs, with names cycling over 8 entries. + out.writeVarInt(9); + for (int i = 0; i < 9; i++) { + out.writeUuid(new UUID(0, i + 1)); + out.writeVarInt(i % 8); + } + + // Table section with one object and one lease. + out.writeVarInt(1); // tableCount + out.writeVarInt(1); // objectId delta + out.writeVarInt(1); // leaseCount + out.write(0x07); // accepted + prolongable + hasProposedCandidate + out.writeVarInt(8); // compact nodes info: holder=0, proposedCandidate=1 + out.writeVarInt(100); // period (expirationPhysical - startPhysical) + out.writeVarInt(0); // startLogical + + return out.array(); + } + } + @SuppressWarnings("unused") private String v1LeaseBatchAsBase64WithTablePartitions() { HybridTimestamp baseTs = baseTs(); From f68aab72d779630ad1b5e67b93d9393efc50bb9f Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Tue, 31 Mar 2026 14:43:13 +0400 Subject: [PATCH 5/7] IGNITE-28013 Fixed pr comments --- .../leases/LeaseBatchSerializer.java | 2 + .../leases/LeaseBatchSerializerTest.java | 69 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java index 67e1ed265492..772d782755da 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java @@ -557,6 +557,8 @@ private static boolean flagSet(int flags, int mask) { private static boolean holderIdAndProposedCandidateFitIn1ByteForRead(byte protoVer, NodesDictionary dictionary) { if (protoVer == PROTOCOL_V1) { + // In V1 format, we assumed that name and node tables have the same size, + // so compact-mode eligibility was determined only by the name table size. return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE; } diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java index 9e56a32c4291..a2a83c17bf00 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java @@ -29,6 +29,7 @@ import java.time.LocalDateTime; import java.time.Month; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.UUID; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.replicator.PartitionGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput; import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput; import org.apache.ignite.internal.versioned.VersionedSerialization; import org.junit.jupiter.api.Test; @@ -318,6 +320,71 @@ void v1CanBeDeserialized(String serializedString, List expectedLeases) { ); } + @Test + void v2CanBeDeserialized() { + LeaseBatch originalBatch = new LeaseBatch(createLeases(baseTs(), TablePartitionId::new)); + + byte[] bytes = VersionedSerialization.toBytes(originalBatch, serializer); + + // VersionedSerializer header stores protocol version in the first byte. + assertEquals(2, bytes[0] & 0xFF); + + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); + + assertThat(restoredBatch.leases(), containsInAnyOrder(originalBatch.leases().toArray())); + assertEquals( + originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()), + restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()) + ); + } + + @Test + void v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() throws IOException { + List originalLeases = new ArrayList<>(); + + for (int i = 0; i < 8; i++) { + originalLeases.add(tableLease("node" + i, new UUID(0, i + 1), null, i)); + } + + originalLeases.add(tableLease("node2", new UUID(0, 9), "node3", 8)); + + byte[] bytes = VersionedSerialization.toBytes(new LeaseBatch(originalLeases), serializer); + + try (IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(bytes)) { + in.readInt(); // Header written by VersionedSerializer. + + in.readVarInt(); // minExpirationTimePhysical + in.readVarInt(); // commonExpirationTimePhysicalDelta + in.readVarInt(); // commonExpirationTimeLogical + + assertEquals(8, in.readVarIntAsInt()); // nameCount + for (int i = 0; i < 8; i++) { + in.readUTF(); + } + + assertEquals(9, in.readVarIntAsInt()); // nodeCount + for (int i = 0; i < 9; i++) { + in.readUuid(); + in.readVarIntAsInt(); + } + + assertEquals(1, in.readVarIntAsInt()); // table object count + assertEquals(1, in.readVarIntAsInt()); // objectId delta + assertEquals(9, in.readVarIntAsInt()); // partition count + + for (int i = 0; i < 8; i++) { + in.readUnsignedByte(); // flags + in.readVarIntAsInt(); // holder index + in.readVarInt(); // period + in.readVarIntAsInt(); // start logical + } + + assertEquals(0x07, in.readUnsignedByte()); // accepted + prolongable + hasProposedCandidate + assertEquals(8, in.readVarIntAsInt()); // holder index + assertEquals(3, in.readVarIntAsInt()); // proposed candidate name index + } + } + @Test void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws IOException { byte[] bytes = v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding(); @@ -340,6 +407,8 @@ void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws IOExceptio } private static byte[] v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding() throws IOException { + // Crafted manually to reproduce a V1-only layout: V1 allowed compact nodes info based only on nameCount, + // while the current serializer writes V2 and would not emit compact encoding when nodeCount > 8. try (IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(256)) { // Header written by VersionedSerializer.writeExternal(): MAGIC(0x43BEEF00) + protocolVersion(1). out.writeInt(0x43BEEF01); From 422e0ea364e9745b3f2dfa6641f6540e0685c005 Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Mon, 6 Apr 2026 16:02:14 +0400 Subject: [PATCH 6/7] IGNITE-28013 Fixed pr comment, remove Disabled. --- .../leases/LeaseBatchSerializerTest.java | 44 ++++--------------- ...ePartitionsRecoveryByFilterUpdateTest.java | 2 - 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java index a2a83c17bf00..a6ee8fe24e02 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.replicator.PartitionGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; -import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput; import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput; import org.apache.ignite.internal.versioned.VersionedSerialization; import org.junit.jupiter.api.Test; @@ -339,7 +338,7 @@ void v2CanBeDeserialized() { } @Test - void v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() throws IOException { + void v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() { List originalLeases = new ArrayList<>(); for (int i = 0; i < 8; i++) { @@ -348,41 +347,16 @@ void v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() originalLeases.add(tableLease("node2", new UUID(0, 9), "node3", 8)); - byte[] bytes = VersionedSerialization.toBytes(new LeaseBatch(originalLeases), serializer); - - try (IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(bytes)) { - in.readInt(); // Header written by VersionedSerializer. - - in.readVarInt(); // minExpirationTimePhysical - in.readVarInt(); // commonExpirationTimePhysicalDelta - in.readVarInt(); // commonExpirationTimeLogical - - assertEquals(8, in.readVarIntAsInt()); // nameCount - for (int i = 0; i < 8; i++) { - in.readUTF(); - } - - assertEquals(9, in.readVarIntAsInt()); // nodeCount - for (int i = 0; i < 9; i++) { - in.readUuid(); - in.readVarIntAsInt(); - } - - assertEquals(1, in.readVarIntAsInt()); // table object count - assertEquals(1, in.readVarIntAsInt()); // objectId delta - assertEquals(9, in.readVarIntAsInt()); // partition count + LeaseBatch originalBatch = new LeaseBatch(originalLeases); + byte[] bytes = VersionedSerialization.toBytes(originalBatch, serializer); - for (int i = 0; i < 8; i++) { - in.readUnsignedByte(); // flags - in.readVarIntAsInt(); // holder index - in.readVarInt(); // period - in.readVarIntAsInt(); // start logical - } + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); - assertEquals(0x07, in.readUnsignedByte()); // accepted + prolongable + hasProposedCandidate - assertEquals(8, in.readVarIntAsInt()); // holder index - assertEquals(3, in.readVarIntAsInt()); // proposed candidate name index - } + assertThat(restoredBatch.leases(), containsInAnyOrder(originalBatch.leases().toArray())); + assertEquals( + originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()), + restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()) + ); } @Test diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java index 582bee318008..6d4e941551ef 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java @@ -254,7 +254,6 @@ void testSeveralHaResetsAndSomeNodeRestart() throws Exception { * @throws Exception If failed. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316") void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throws Exception { for (int i = 1; i < 8; i++) { startNode(i, CUSTOM_NODES_CONFIG); @@ -316,7 +315,6 @@ void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throw * @throws Exception If failed. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316") void testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() throws Exception { for (int i = 1; i < 8; i++) { startNode(i, CUSTOM_NODES_CONFIG); From 445a625dc23560efa785d2bf55b1aa5012eb4a78 Mon Sep 17 00:00:00 2001 From: Anton Laletin Date: Mon, 6 Apr 2026 20:09:17 +0400 Subject: [PATCH 7/7] IGNITE-28013 Added comment --- .../internal/placementdriver/leases/LeaseBatchSerializer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java index 772d782755da..11c82e47946d 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java @@ -365,6 +365,8 @@ private static int packNodesInfo(int holderNodeIndex, int proposedCandidateNameI private static boolean holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) { // Up to 8 names means that for name index it's enough to have 3 bits, same for node index, so, in sum, they // require up to 6 bits, and we have 7 bits in a varint byte. + // We need to check both: name count (for proposed candidate index) and node count (for holder node index), + // as these can diverge when nodes restart with new UUIDs but the same name. return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE && dictionary.nodeCount() <= MAX_NODES_FOR_COMPACT_MODE; }