Skip to content

Commit d3b492f

Browse files
lingjue@ubuntua-TODO-rov
authored andcommitted
fix bug: issue #3595 and add unit test
1 parent 167b88f commit d3b492f

2 files changed

Lines changed: 39 additions & 3 deletions

File tree

src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -615,10 +615,14 @@ private boolean isStale(ConnectionKey connectionKey) {
615615
return false;
616616
}
617617

618-
if (connectionKey.host != null && partitions.getPartition(connectionKey.host, connectionKey.port) != null) {
619-
return false;
620-
}
618+
RedisClusterNode node = partitions.getPartition(connectionKey.host, connectionKey.port);
621619

620+
if (connectionKey.host != null && node != null) {
621+
if (connectionKey.connectionIntent == ConnectionIntent.READ && node.getRole().isUpstream()) {
622+
return true;
623+
}
624+
return connectionKey.connectionIntent == ConnectionIntent.WRITE && node.getRole().isReplica();
625+
}
622626
return true;
623627
}
624628

src/test/java/io/lettuce/core/cluster/PooledClusterConnectionProviderUnitTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,4 +461,36 @@ void shouldFallbackToRandomSlotWhenNoMasters() {
461461
verify(clusterEventListener).onUncoveredSlot(anyInt());
462462
}
463463

464+
@Test
465+
void shouldCloseReadConnectionsIfNodeBecomesMaster() {
466+
467+
when(clientMock.connectToNodeAsync(eq(StringCodec.UTF8), eq("localhost:2"), any(), any()))
468+
.thenReturn(ConnectionFuture.from(socketAddressMock, CompletableFuture.completedFuture(nodeConnectionMock)));
469+
470+
AsyncCommand<String, String, String> async = new AsyncCommand<>(new Command<>(CommandType.READONLY, null, null));
471+
async.complete();
472+
473+
when(asyncCommandsMock.readOnly()).thenReturn(async);
474+
475+
sut.setReadFrom(ReadFrom.REPLICA);
476+
477+
StatefulRedisConnection<String, String> readConnection = sut.getConnection(ConnectionIntent.READ, 1);
478+
assertThat(readConnection).isSameAs(nodeConnectionMock);
479+
480+
RedisClusterNode master = partitions.getPartition(0);
481+
RedisClusterNode replica = partitions.getPartition(1);
482+
483+
RedisClusterNode promotedReplica = new RedisClusterNode(replica.getUri(), replica.getNodeId(), true, null,
484+
replica.getPingSentTimestamp(), replica.getPongReceivedTimestamp(), replica.getConfigEpoch(),
485+
replica.getSlots(), Collections.singleton(RedisClusterNode.NodeFlag.UPSTREAM));
486+
487+
Partitions newPartitions = new Partitions();
488+
newPartitions.add(master);
489+
newPartitions.add(promotedReplica);
490+
491+
sut.setPartitions(newPartitions);
492+
493+
verify(channelHandlerMock).closeAsync();
494+
}
495+
464496
}

0 commit comments

Comments
 (0)