diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index b5cbf6501444..6816eed83f5a 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -486,7 +486,6 @@ private void updateLeaseBatchInternal() { : lease.proposedCandidate(); InternalClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedLeaseholder); - boolean canBeProlonged = lease.isAccepted() && lease.isProlongable() && candidate != null && candidate.id().equals(lease.getLeaseholderId()); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java index 6359bd2b527a..11c82e47946d 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java @@ -141,6 +141,15 @@ public class LeaseBatchSerializer extends VersionedSerializer { /** Mask to extract lease holder index from compact representation. */ private static final int COMPACT_HOLDER_INDEX_MASK = (1 << BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1; + private static final byte PROTOCOL_V1 = 1; + + private static final byte PROTOCOL_V2 = 2; + + @Override + protected byte getProtocolVersion() { + return PROTOCOL_V2; + } + @Override protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) throws IOException { long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch); @@ -356,7 +365,9 @@ private static int packNodesInfo(int holderNodeIndex, int proposedCandidateNameI private static boolean holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) { // Up to 8 names means that for name index it's enough to have 3 bits, same for node index, so, in sum, they // require up to 6 bits, and we have 7 bits in a varint byte. - return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE; + // We need to check both: name count (for proposed candidate index) and node count (for holder node index), + // as these can diverge when nodes restart with new UUIDs but the same name. + return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE && dictionary.nodeCount() <= MAX_NODES_FOR_COMPACT_MODE; } private static int flags( @@ -378,6 +389,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws long minExpirationTimePhysical = in.readVarInt(); HybridTimestamp commonExpirationTime = new HybridTimestamp(minExpirationTimePhysical + in.readVarInt(), in.readVarIntAsInt()); NodesDictionary nodesDictionary = NodesDictionary.readFrom(in); + boolean canReadNodesInfoCompactly = holderIdAndProposedCandidateFitIn1ByteForRead(protoVer, nodesDictionary); List leases = new ArrayList<>(); @@ -385,6 +397,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, TablePartitionId::new @@ -395,6 +408,7 @@ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) throws minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, ZonePartitionId::new @@ -408,6 +422,7 @@ private static void readPartitionedGroupLeases( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, List leases, IgniteDataInput in, GroupIdFactory groupIdFactory @@ -420,6 +435,7 @@ private static void readPartitionedGroupLeases( minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, leases, in, groupIdFactory, @@ -432,6 +448,7 @@ private static int readLeasesForObject( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, List leases, IgniteDataInput in, GroupIdFactory groupIdFactory, @@ -447,6 +464,7 @@ private static int readLeasesForObject( minExpirationTimePhysical, commonExpirationTime, nodesDictionary, + canReadNodesInfoCompactly, in, groupIdFactory ); @@ -464,6 +482,7 @@ private static int readLeasesForObject( long minExpirationTimePhysical, HybridTimestamp commonExpirationTime, NodesDictionary nodesDictionary, + boolean canReadNodesInfoCompactly, IgniteDataInput in, GroupIdFactory groupIdFactory ) throws IOException { @@ -477,7 +496,7 @@ private static int readLeasesForObject( int holderNodeIndex; int proposedCandidateNodeIndex = -1; - if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) { + if (canReadNodesInfoCompactly) { int nodesInfo = in.readVarIntAsInt(); holderNodeIndex = unpackHolderNodeIndex(nodesInfo); @@ -538,6 +557,16 @@ private static boolean flagSet(int flags, int mask) { return (flags & mask) != 0; } + private static boolean holderIdAndProposedCandidateFitIn1ByteForRead(byte protoVer, NodesDictionary dictionary) { + if (protoVer == PROTOCOL_V1) { + // In V1 format, we assumed that name and node tables have the same size, + // so compact-mode eligibility was determined only by the name table size. + return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE; + } + + return holderIdAndProposedCandidateFitIn1Byte(dictionary); + } + @FunctionalInterface private interface GroupIdFactory { PartitionGroupId create(int objectId, int partitionId); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java index 967a0a4fa273..f1913bf8c2ce 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java @@ -132,6 +132,10 @@ int nameCount() { return nameIndexToName.size(); } + int nodeCount() { + return nodeIndexToId.size(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java index e23632ddef1a..58181434cbe0 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java @@ -38,9 +38,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -119,6 +123,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest { @Mock MetaStorageManager metaStorageManager; + @Mock + private LeaseTracker leaseTracker; + /** Lease updater for tests. */ private LeaseUpdater leaseUpdater; @@ -127,6 +134,9 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest { /** Closure to get a lease that is passed in Meta storage. */ private volatile Consumer renewLeaseConsumer = null; + /** Closure to get a lease batch that is passed in Meta storage. */ + private volatile Consumer renewLeaseBatchConsumer = null; + private static ZonePartitionId replicationGroupId(int objectId, int partId) { return new ZonePartitionId(objectId, partId); } @@ -142,13 +152,12 @@ private static ByteArray pendingAssignmentsQueueKey(ZonePartitionId groupId) { @BeforeEach void setUp( @Mock ClusterService clusterService, - @Mock LeaseTracker leaseTracker, @Mock MessagingService messagingService ) { mockStableAssignments(Set.of(Assignment.forPeer(stableNode.name()))); mockPendingAssignments(Set.of(Assignment.forPeer(pendingNode.name()))); - when(messagingService.invoke(anyString(), any(), anyLong())) + lenient().when(messagingService.invoke(anyString(), any(), anyLong())) .then(i -> completedFuture(PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(true).build())); when(clusterService.messagingService()).thenReturn(messagingService); @@ -163,6 +172,7 @@ void setUp( lenient().when(metaStorageManager.invoke(any(Condition.class), any(Operation.class), any(Operation.class))) .thenAnswer(invocation -> { Consumer leaseConsumer = renewLeaseConsumer; + Consumer leaseBatchConsumer = renewLeaseBatchConsumer; if (leaseConsumer != null) { OperationImpl op = invocation.getArgument(1); @@ -173,6 +183,14 @@ void setUp( leaseConsumer.accept(lease); } + if (leaseBatchConsumer != null) { + OperationImpl op = invocation.getArgument(1); + + LeaseBatch batch = LeaseBatch.fromBytes(toByteArray(op.value())); + + leaseBatchConsumer.accept(batch); + } + return trueCompletedFuture(); }); @@ -338,6 +356,171 @@ public void testLeaseRenew() throws Exception { assertEquals(lease.getLeaseholder(), renewedLease.getLeaseholder()); } + @Test + public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch() throws Exception { + List currentTopologyNodes = IntStream.range(0, 8) + .mapToObj(i -> new LogicalNode( + randomUUID(), + "node-" + i, + NetworkAddress.from("127.0.0.1:" + (11_000 + i)) + )) + .collect(Collectors.toList()); + + when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new LogicalTopologySnapshot(1, currentTopologyNodes))); + + Map> stableAssignmentsByGroup = new LinkedHashMap<>(); + Map staleLeasesByGroup = new LinkedHashMap<>(); + + List leaseholdersByGroup = List.of(0, 0, 1, 2, 3, 4, 5, 6, 7).stream() + .map(currentTopologyNodes::get) + .collect(Collectors.toList()); + + long now = System.currentTimeMillis(); + UUID staleNode0Id = randomUUID(); + + for (int i = 0; i < leaseholdersByGroup.size(); i++) { + ZonePartitionId grpId = replicationGroupId(1, i); + LogicalNode logicalNode = leaseholdersByGroup.get(i); + + stableAssignmentsByGroup.put(grpId, Set.of(Assignment.forPeer(logicalNode.name()))); + + boolean expiredLease = i == 0; + UUID leaseholderId = i == 1 + ? staleNode0Id + : logicalNode.id(); + HybridTimestamp expirationTime = new HybridTimestamp(expiredLease ? now - 1_000 : now + 60_000, 0); + + staleLeasesByGroup.put( + grpId, + new Lease( + logicalNode.name(), + leaseholderId, + new HybridTimestamp(now - 10_000, 0), + expirationTime, + true, + true, + null, + grpId + ) + ); + } + + mockStableAssignments(stableAssignmentsByGroup); + mockPendingAssignments(Map.of()); + + Leases currentLeases = new Leases(staleLeasesByGroup, BYTE_EMPTY_ARRAY); + + lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases); + lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation -> + currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0)))); + + initAndActivateLeaseUpdater(); + + LeaseBatch renewedBatch = awaitForLeaseBatch(5_000); + + assertEquals(leaseholdersByGroup.size(), renewedBatch.leases().size()); + assertEquals(8, renewedBatch.leases().stream().map(Lease::getLeaseholder).distinct().count()); + assertEquals(9, renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count()); + + Map renewedLeaseByGroup = renewedBatch.leases().stream() + .collect(Collectors.toMap(Lease::replicationGroupId, lease -> lease)); + + Lease expiredLeaseWithCurrentId = renewedLeaseByGroup.get(replicationGroupId(1, 0)); + assertEquals(leaseholdersByGroup.get(0).id(), expiredLeaseWithCurrentId.getLeaseholderId()); + + Lease nonExpiredLeaseWithStaleId = renewedLeaseByGroup.get(replicationGroupId(1, 1)); + assertEquals(staleNode0Id, nonExpiredLeaseWithStaleId.getLeaseholderId()); + } + + @Test + public void testNonExpiredAcceptedLeasesKeepLeaseholderIdentity() throws Exception { + List currentTopologyNodes = IntStream.range(0, 3) + .mapToObj(i -> new LogicalNode( + randomUUID(), + "node-" + i, + NetworkAddress.from("127.0.0.1:" + (11_000 + i)) + )) + .collect(Collectors.toList()); + + when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new LogicalTopologySnapshot(1, currentTopologyNodes))); + + long now = System.currentTimeMillis(); + UUID staleNode1Id = randomUUID(); + + ZonePartitionId expiredGrpId = replicationGroupId(1, 0); + ZonePartitionId nonExpiredStaleIdGrpId = replicationGroupId(1, 1); + ZonePartitionId nonExpiredCurrentIdGrpId = replicationGroupId(1, 2); + + Map> stableAssignmentsByGroup = Map.of( + expiredGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(0).name())), + nonExpiredStaleIdGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(1).name())), + nonExpiredCurrentIdGrpId, Set.of(Assignment.forPeer(currentTopologyNodes.get(2).name())) + ); + + mockStableAssignments(stableAssignmentsByGroup); + mockPendingAssignments(Map.of()); + + Lease expiredLease = new Lease( + currentTopologyNodes.get(0).name(), + currentTopologyNodes.get(0).id(), + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now - 1_000, 0), + true, + true, + null, + expiredGrpId + ); + + Lease nonExpiredLeaseWithStaleId = new Lease( + currentTopologyNodes.get(1).name(), + staleNode1Id, + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now + 60_000, 0), + true, + true, + null, + nonExpiredStaleIdGrpId + ); + + Lease nonExpiredLeaseWithCurrentId = new Lease( + currentTopologyNodes.get(2).name(), + currentTopologyNodes.get(2).id(), + new HybridTimestamp(now - 10_000, 0), + new HybridTimestamp(now + 60_000, 0), + true, + true, + null, + nonExpiredCurrentIdGrpId + ); + + Map leasesByGroup = Map.of( + expiredGrpId, expiredLease, + nonExpiredStaleIdGrpId, nonExpiredLeaseWithStaleId, + nonExpiredCurrentIdGrpId, nonExpiredLeaseWithCurrentId + ); + + Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY); + + lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases); + lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation -> + currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), Lease.emptyLease(invocation.getArgument(0)))); + + initAndActivateLeaseUpdater(); + + LeaseBatch renewedBatch = awaitForLeaseBatch(5_000); + + Map renewedLeaseByGroup = renewedBatch.leases().stream() + .collect(Collectors.toMap(Lease::replicationGroupId, lease -> lease)); + + Lease renewedNonExpiredWithStaleId = renewedLeaseByGroup.get(nonExpiredStaleIdGrpId); + assertEquals(nonExpiredLeaseWithStaleId.getLeaseholder(), renewedNonExpiredWithStaleId.getLeaseholder()); + assertEquals(nonExpiredLeaseWithStaleId.getLeaseholderId(), renewedNonExpiredWithStaleId.getLeaseholderId()); + + Lease renewedNonExpiredWithCurrentId = renewedLeaseByGroup.get(nonExpiredCurrentIdGrpId); + assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholder(), renewedNonExpiredWithCurrentId.getLeaseholder()); + assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholderId(), renewedNonExpiredWithCurrentId.getLeaseholderId()); + } + @Test public void testLeaseAmongPendings() throws Exception { when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new LogicalTopologySnapshot(1, List.of(pendingNode)))); @@ -382,29 +565,43 @@ private void mockTopology(Set stable, Set pending) { } private void mockPendingAssignments(Set assignments) { - Entry pendingEntry = new EntryImpl( - pendingAssignmentsQueueKey(replicationGroupId(1, 0)).bytes(), - AssignmentsQueue.toBytes(Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), assignments.toArray(Assignment[]::new))), - 1, - new HybridClockImpl().now() - ); + mockPendingAssignments(Map.of(replicationGroupId(1, 0), assignments)); + } + + private void mockPendingAssignments(Map> assignmentsByGroup) { + List pendingEntries = assignmentsByGroup.entrySet().stream() + .map(entry -> new EntryImpl( + pendingAssignmentsQueueKey(entry.getKey()).bytes(), + AssignmentsQueue.toBytes( + Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), entry.getValue().toArray(Assignment[]::new)) + ), + 1, + new HybridClockImpl().now() + )) + .collect(Collectors.toList()); byte[] prefixBytes = ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES; when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), anyLong())) - .thenReturn(Cursor.fromIterable(List.of(pendingEntry))); + .thenReturn(Cursor.fromIterable(pendingEntries)); } private void mockStableAssignments(Set assignments) { - Entry stableEntry = new EntryImpl( - stableAssignmentsKey(replicationGroupId(1, 0)).bytes(), - Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), assignments.toArray(Assignment[]::new)).toBytes(), - 1, - new HybridClockImpl().now() - ); + mockStableAssignments(Map.of(replicationGroupId(1, 0), assignments)); + } + + private void mockStableAssignments(Map> assignmentsByGroup) { + List stableEntries = assignmentsByGroup.entrySet().stream() + .map(entry -> new EntryImpl( + stableAssignmentsKey(entry.getKey()).bytes(), + Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), entry.getValue().toArray(Assignment[]::new)).toBytes(), + 1, + new HybridClockImpl().now() + )) + .collect(Collectors.toList()); byte[] prefixBytes = ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES; when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), anyLong())) - .thenReturn(Cursor.fromIterable(List.of(stableEntry))); + .thenReturn(Cursor.fromIterable(stableEntries)); } private void initAndActivateLeaseUpdater() { @@ -453,8 +650,8 @@ private Lease awaitForLease(boolean needAccepted) throws InterruptedException { * Waits for lease write to Meta storage. * * @param needAccepted Whether to wait only for accepted lease. - * @param previousLease Previous lease. If not null, then wait for any lease having expiration time other than the previous has (i.e. - * either another lease or prolonged lease). + * @param previousLease Previous lease. If not null, then wait for any lease having expiration time other than the previous has + * (i.e. either another lease or prolonged lease). * @param timeoutMillis Timeout in milliseconds to wait for lease. * @return A lease. * @throws InterruptedException if the wait is interrupted. @@ -481,6 +678,19 @@ private Lease awaitForLease(boolean needAccepted, @Nullable Lease previousLease, return renewedLease.get(); } + private LeaseBatch awaitForLeaseBatch(long timeoutMillis) throws InterruptedException { + AtomicReference renewedBatch = new AtomicReference<>(); + + renewLeaseBatchConsumer = batch -> { + renewedBatch.set(batch); + renewLeaseBatchConsumer = null; + }; + + assertTrue(IgniteTestUtils.waitForCondition(() -> renewedBatch.get() != null, timeoutMillis)); + + return renewedBatch.get(); + } + /** * Gets a lease updater tread. * diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java index e359c152fc37..a6ee8fe24e02 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java @@ -25,9 +25,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; +import java.io.IOException; import java.time.LocalDateTime; import java.time.Month; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.UUID; @@ -38,6 +40,7 @@ import org.apache.ignite.internal.replicator.PartitionGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput; import org.apache.ignite.internal.versioned.VersionedSerialization; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -202,6 +205,22 @@ void batchWithExactly8NodeNames() { verifySerializationAndDeserializationGivesSameResult(originalBatch); } + @Test + void batchWithExactly8NodeNamesAndMoreThan8NodeIds() { + List originalLeases = IntStream.range(0, 8) + .mapToObj(n -> { + String nodeName = "node" + n; + return tableLease(nodeName, randomUUID(), nodeName, n); + }) + .collect(toList()); + + originalLeases.add(tableLease("node0", randomUUID(), "node0", 8)); + + LeaseBatch originalBatch = new LeaseBatch(originalLeases); + + verifySerializationAndDeserializationGivesSameResult(originalBatch); + } + @Test void batchWithMoreThan8NodeNames() { List originalLeases = IntStream.range(0, 9) @@ -300,6 +319,105 @@ void v1CanBeDeserialized(String serializedString, List expectedLeases) { ); } + @Test + void v2CanBeDeserialized() { + LeaseBatch originalBatch = new LeaseBatch(createLeases(baseTs(), TablePartitionId::new)); + + byte[] bytes = VersionedSerialization.toBytes(originalBatch, serializer); + + // VersionedSerializer header stores protocol version in the first byte. + assertEquals(2, bytes[0] & 0xFF); + + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); + + assertThat(restoredBatch.leases(), containsInAnyOrder(originalBatch.leases().toArray())); + assertEquals( + originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()), + restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()) + ); + } + + @Test + void v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() { + List originalLeases = new ArrayList<>(); + + for (int i = 0; i < 8; i++) { + originalLeases.add(tableLease("node" + i, new UUID(0, i + 1), null, i)); + } + + originalLeases.add(tableLease("node2", new UUID(0, 9), "node3", 8)); + + LeaseBatch originalBatch = new LeaseBatch(originalLeases); + byte[] bytes = VersionedSerialization.toBytes(originalBatch, serializer); + + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); + + assertThat(restoredBatch.leases(), containsInAnyOrder(originalBatch.leases().toArray())); + assertEquals( + originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()), + restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()) + ); + } + + @Test + void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws IOException { + byte[] bytes = v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding(); + + LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, serializer); + + Lease expectedLease = new Lease( + "node0", + new UUID(0, 1), + new HybridTimestamp(900, 0), + new HybridTimestamp(1000, 0), + true, + true, + "node1", + new TablePartitionId(1, 0) + ); + + assertThat(restoredBatch.leases(), containsInAnyOrder(expectedLease)); + assertEquals(List.of("node1"), restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())); + } + + private static byte[] v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding() throws IOException { + // Crafted manually to reproduce a V1-only layout: V1 allowed compact nodes info based only on nameCount, + // while the current serializer writes V2 and would not emit compact encoding when nodeCount > 8. + try (IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(256)) { + // Header written by VersionedSerializer.writeExternal(): MAGIC(0x43BEEF00) + protocolVersion(1). + out.writeInt(0x43BEEF01); + + // Lease batch header. + out.writeVarInt(1000); // minExpirationTimePhysical + out.writeVarInt(0); // commonExpirationTimePhysicalDelta + out.writeVarInt(0); // commonExpirationTimeLogical + + // Nodes dictionary: 8 names. + out.writeVarInt(8); + for (int i = 0; i < 8; i++) { + out.writeUTF("node" + i); + } + + // Nodes dictionary: 9 node IDs, with names cycling over 8 entries. + out.writeVarInt(9); + for (int i = 0; i < 9; i++) { + out.writeUuid(new UUID(0, i + 1)); + out.writeVarInt(i % 8); + } + + // Table section with one object and one lease. + out.writeVarInt(1); // tableCount + out.writeVarInt(1); // objectId delta + out.writeVarInt(1); // leaseCount + out.write(0x07); // accepted + prolongable + hasProposedCandidate + out.writeVarInt(8); // compact nodes info: holder=0, proposedCandidate=1 + out.writeVarInt(100); // period (expirationPhysical - startPhysical) + out.writeVarInt(0); // startLogical + + return out.array(); + } + } + @SuppressWarnings("unused") private String v1LeaseBatchAsBase64WithTablePartitions() { HybridTimestamp baseTs = baseTs(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java index 582bee318008..6d4e941551ef 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java @@ -254,7 +254,6 @@ void testSeveralHaResetsAndSomeNodeRestart() throws Exception { * @throws Exception If failed. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316") void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throws Exception { for (int i = 1; i < 8; i++) { startNode(i, CUSTOM_NODES_CONFIG); @@ -316,7 +315,6 @@ void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() throw * @throws Exception If failed. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316") void testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() throws Exception { for (int i = 1; i < 8; i++) { startNode(i, CUSTOM_NODES_CONFIG);