@@ -461,4 +461,36 @@ void shouldFallbackToRandomSlotWhenNoMasters() {
461461 verify (clusterEventListener ).onUncoveredSlot (anyInt ());
462462 }
463463
464+ @ Test
465+ void shouldCloseReadConnectionsIfNodeBecomesMaster () {
466+
467+ when (clientMock .connectToNodeAsync (eq (StringCodec .UTF8 ), eq ("localhost:2" ), any (), any ()))
468+ .thenReturn (ConnectionFuture .from (socketAddressMock , CompletableFuture .completedFuture (nodeConnectionMock )));
469+
470+ AsyncCommand <String , String , String > async = new AsyncCommand <>(new Command <>(CommandType .READONLY , null , null ));
471+ async .complete ();
472+
473+ when (asyncCommandsMock .readOnly ()).thenReturn (async );
474+
475+ sut .setReadFrom (ReadFrom .REPLICA );
476+
477+ StatefulRedisConnection <String , String > readConnection = sut .getConnection (ConnectionIntent .READ , 1 );
478+ assertThat (readConnection ).isSameAs (nodeConnectionMock );
479+
480+ RedisClusterNode master = partitions .getPartition (0 );
481+ RedisClusterNode replica = partitions .getPartition (1 );
482+
483+ RedisClusterNode promotedReplica = new RedisClusterNode (replica .getUri (), replica .getNodeId (), true , null ,
484+ replica .getPingSentTimestamp (), replica .getPongReceivedTimestamp (), replica .getConfigEpoch (),
485+ replica .getSlots (), Collections .singleton (RedisClusterNode .NodeFlag .UPSTREAM ));
486+
487+ Partitions newPartitions = new Partitions ();
488+ newPartitions .add (master );
489+ newPartitions .add (promotedReplica );
490+
491+ sut .setPartitions (newPartitions );
492+
493+ verify (channelHandlerMock ).closeAsync ();
494+ }
495+
464496}
0 commit comments