4444import java .util .concurrent .locks .Lock ;
4545import java .util .concurrent .locks .ReentrantReadWriteLock ;
4646import java .util .function .Predicate ;
47+ import java .util .function .ToDoubleFunction ;
4748import java .util .logging .Level ;
4849import java .util .logging .Logger ;
4950import java .util .stream .IntStream ;
@@ -57,6 +58,7 @@ public final class KeyRangeCache {
5758
5859 private static final int MAX_LOCAL_REPLICA_DISTANCE = 5 ;
5960 private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000 ;
61+ private static final double LOCAL_LEADER_SELECTION_COST_MULTIPLIER = 0.5D ;
6062
6163 /** Determines how to handle ranges that span multiple splits. */
6264 public enum RangeMode {
@@ -828,16 +830,20 @@ private TabletSnapshot selectTablet(
828830 List <SkippedTabletDetail > skippedTabletDetails ,
829831 Map <String , ChannelEndpoint > resolvedEndpoints ,
830832 SelectionStats selectionStats ) {
831- if (!preferLeader ) {
832- return selectLatencyAwareTablet (
833+ if (!preferLeader || hintBuilder .getOperationUid () > 0L ) {
834+ TabletSnapshot preferredLeader =
835+ preferLeader ? localLeaderForScoreBias (snapshot , hasDirectedReadOptions ) : null ;
836+ return selectScoreAwareTablet (
833837 snapshot ,
834838 directedReadOptions ,
835839 hintBuilder ,
836840 excludedEndpoints ,
837841 skippedTabletUids ,
838842 skippedTabletDetails ,
839843 resolvedEndpoints ,
840- selectionStats );
844+ selectionStats ,
845+ preferredLeader ,
846+ preferLeader ? "leader_preferred_latency_score" : "latency_score" );
841847 }
842848
843849 boolean checkedLeader = false ;
@@ -884,18 +890,32 @@ private TabletSnapshot selectTablet(
884890 return null ;
885891 }
886892
887- private TabletSnapshot selectLatencyAwareTablet (
893+ @ javax .annotation .Nullable
894+ private TabletSnapshot localLeaderForScoreBias (
895+ GroupSnapshot snapshot , boolean hasDirectedReadOptions ) {
896+ if (!hasDirectedReadOptions
897+ && snapshot .hasLeader ()
898+ && snapshot .leader ().distance <= MAX_LOCAL_REPLICA_DISTANCE ) {
899+ return snapshot .leader ();
900+ }
901+ return null ;
902+ }
903+
904+ private TabletSnapshot selectScoreAwareTablet (
888905 GroupSnapshot snapshot ,
889906 DirectedReadOptions directedReadOptions ,
890907 RoutingHint .Builder hintBuilder ,
891908 Predicate <String > excludedEndpoints ,
892909 Set <Long > skippedTabletUids ,
893910 List <SkippedTabletDetail > skippedTabletDetails ,
894911 Map <String , ChannelEndpoint > resolvedEndpoints ,
895- SelectionStats selectionStats ) {
912+ SelectionStats selectionStats ,
913+ @ javax .annotation .Nullable TabletSnapshot preferredLeader ,
914+ String selectionReason ) {
896915 long operationUid = hintBuilder .getOperationUid ();
897916 List <TabletSnapshot > eligibleTablets = new ArrayList <>();
898917 List <ChannelEndpoint > eligibleEndpoints = new ArrayList <>();
918+ Map <String , TabletSnapshot > endpointByAddress = new HashMap <>();
899919 int scoredCandidates = 0 ;
900920
901921 for (TabletSnapshot tablet : snapshot .tablets ) {
@@ -921,6 +941,7 @@ private TabletSnapshot selectLatencyAwareTablet(
921941 }
922942 eligibleTablets .add (tablet );
923943 eligibleEndpoints .add (endpoint );
944+ endpointByAddress .put (endpoint .getAddress (), tablet );
924945 if (EndpointLatencyRegistry .hasScore (operationUid , tablet .serverAddress )) {
925946 scoredCandidates ++;
926947 }
@@ -936,6 +957,7 @@ private TabletSnapshot selectLatencyAwareTablet(
936957 snapshot ,
937958 eligibleTablets ,
938959 operationUid ,
960+ selectionCostLookup (operationUid , preferredLeader ),
939961 "single_candidate" ,
940962 selected ,
941963 scoredCandidates );
@@ -946,16 +968,15 @@ private TabletSnapshot selectLatencyAwareTablet(
946968 eligibleTablets .stream ()
947969 .min (
948970 Comparator .comparingDouble (
949- tablet ->
950- EndpointLatencyRegistry .getSelectionCost (
951- operationUid , tablet .serverAddress )))
971+ tablet -> selectionCost (operationUid , tablet , preferredLeader )))
952972 .orElse (eligibleTablets .get (0 ));
953973 selectionStats .selectionDetail =
954974 buildSelectionDetail (
955975 snapshot ,
956976 eligibleTablets ,
957977 operationUid ,
958- "latency_score" ,
978+ selectionCostLookup (operationUid , preferredLeader ),
979+ selectionReason ,
959980 selected ,
960981 scoredCandidates );
961982 return selected ;
@@ -965,15 +986,17 @@ private TabletSnapshot selectLatencyAwareTablet(
965986 replicaSelector .select (
966987 eligibleEndpoints ,
967988 endpoint ->
968- EndpointLatencyRegistry .getSelectionCost (operationUid , endpoint .getAddress ()));
989+ selectionCost (
990+ operationUid , endpointByAddress .get (endpoint .getAddress ()), preferredLeader ));
969991 if (selectedEndpoint == null ) {
970992 TabletSnapshot selected = eligibleTablets .get (0 );
971993 selectionStats .selectionDetail =
972994 buildSelectionDetail (
973995 snapshot ,
974996 eligibleTablets ,
975997 operationUid ,
976- "latency_score" ,
998+ selectionCostLookup (operationUid , preferredLeader ),
999+ selectionReason ,
9771000 selected ,
9781001 scoredCandidates );
9791002 return selected ;
@@ -986,7 +1009,8 @@ private TabletSnapshot selectLatencyAwareTablet(
9861009 snapshot ,
9871010 eligibleTablets ,
9881011 operationUid ,
989- "latency_score" ,
1012+ selectionCostLookup (operationUid , preferredLeader ),
1013+ selectionReason ,
9901014 selected ,
9911015 scoredCandidates );
9921016 return selected ;
@@ -995,10 +1019,35 @@ private TabletSnapshot selectLatencyAwareTablet(
9951019 TabletSnapshot selected = eligibleTablets .get (0 );
9961020 selectionStats .selectionDetail =
9971021 buildSelectionDetail (
998- snapshot , eligibleTablets , operationUid , "latency_score" , selected , scoredCandidates );
1022+ snapshot ,
1023+ eligibleTablets ,
1024+ operationUid ,
1025+ selectionCostLookup (operationUid , preferredLeader ),
1026+ selectionReason ,
1027+ selected ,
1028+ scoredCandidates );
9991029 return selected ;
10001030 }
10011031
1032+ private ToDoubleFunction <TabletSnapshot > selectionCostLookup (
1033+ long operationUid , @ javax .annotation .Nullable TabletSnapshot preferredLeader ) {
1034+ return tablet -> selectionCost (operationUid , tablet , preferredLeader );
1035+ }
1036+
1037+ private double selectionCost (
1038+ long operationUid ,
1039+ @ javax .annotation .Nullable TabletSnapshot tablet ,
1040+ @ javax .annotation .Nullable TabletSnapshot preferredLeader ) {
1041+ if (tablet == null ) {
1042+ return Double .MAX_VALUE ;
1043+ }
1044+ double cost = EndpointLatencyRegistry .getSelectionCost (operationUid , tablet .serverAddress );
1045+ if (preferredLeader != null && tablet == preferredLeader ) {
1046+ return cost * LOCAL_LEADER_SELECTION_COST_MULTIPLIER ;
1047+ }
1048+ return cost ;
1049+ }
1050+
10021051 @ javax .annotation .Nullable
10031052 private TabletSnapshot selectRandomExcludedOrCoolingDownTablet (
10041053 GroupSnapshot snapshot ,
@@ -1166,30 +1215,26 @@ private SelectionDetail buildSelectionDetail(
11661215 GroupSnapshot snapshot ,
11671216 List <TabletSnapshot > eligibleTablets ,
11681217 long operationUid ,
1218+ ToDoubleFunction <TabletSnapshot > selectionCostLookup ,
11691219 String selectionReason ,
11701220 TabletSnapshot selected ,
11711221 int scoredCandidates ) {
11721222 double bestScore = Double .MAX_VALUE ;
11731223 for (TabletSnapshot tablet : eligibleTablets ) {
1174- bestScore =
1175- Math .min (
1176- bestScore ,
1177- EndpointLatencyRegistry .getSelectionCost (operationUid , tablet .serverAddress ));
1224+ bestScore = Math .min (bestScore , selectionCostLookup .applyAsDouble (tablet ));
11781225 }
11791226
11801227 StringBuilder alternatives = new StringBuilder ();
11811228 int appended = 0 ;
1182- double selectedScore =
1183- EndpointLatencyRegistry .getSelectionCost (operationUid , selected .serverAddress );
1229+ double selectedScore = selectionCostLookup .applyAsDouble (selected );
11841230 for (TabletSnapshot tablet : eligibleTablets ) {
11851231 if (tablet == selected || appended >= 4 ) {
11861232 continue ;
11871233 }
11881234 if (alternatives .length () > 0 ) {
11891235 alternatives .append (", " );
11901236 }
1191- double candidateScore =
1192- EndpointLatencyRegistry .getSelectionCost (operationUid , tablet .serverAddress );
1237+ double candidateScore = selectionCostLookup .applyAsDouble (tablet );
11931238 alternatives
11941239 .append (endpointLabel (snapshot , tablet ))
11951240 .append ("=" )
0 commit comments