6060import org .slf4j .LoggerFactory ;
6161
6262import java .util .ArrayList ;
63+ import java .util .Collections ;
6364import java .util .HashMap ;
65+ import java .util .HashSet ;
6466import java .util .List ;
6567import java .util .Map ;
6668import java .util .Objects ;
7072import java .util .concurrent .TimeUnit ;
7173import java .util .concurrent .atomic .AtomicBoolean ;
7274import java .util .concurrent .atomic .AtomicInteger ;
75+ import java .util .concurrent .locks .ReentrantLock ;
7376import java .util .concurrent .locks .ReentrantReadWriteLock ;
74- import java .util .function .BiConsumer ;
7577import java .util .stream .Collectors ;
7678
7779/** The RouteBalancer guides the cluster RegionGroups' leader distribution and routing priority. */
@@ -104,13 +106,17 @@ public class RouteBalancer implements IClusterStatusSubscriber {
104106 ProcedureManager .PROCEDURE_WAIT_TIME_OUT - TimeUnit .SECONDS .toMillis (2 ),
105107 TimeUnit .SECONDS .toMillis (10 ));
106108 private static final long WAIT_PRIORITY_INTERVAL = 10 ;
109+ private static final long RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS = TimeUnit .SECONDS .toMillis (10 );
107110
108111 private final IManager configManager ;
109112 // For generating optimal Region leader distribution
110- private final AbstractLeaderBalancer leaderBalancer ;
113+ private final AbstractLeaderBalancer schemaLeaderBalancer ;
114+ private final AbstractLeaderBalancer dataLeaderBalancer ;
111115 // For generating optimal cluster Region routing priority
112116 private final IPriorityBalancer priorityRouter ;
113117
118+ private final ReentrantLock schemaRegionLeaderBalanceLock ;
119+ private final ReentrantLock dataRegionLeaderBalanceLock ;
114120 private final ReentrantReadWriteLock priorityMapLock ;
115121 // Map<RegionGroupId, Region priority>
116122 // The client requests are preferentially routed to the Region with the lowest index in the
@@ -121,29 +127,16 @@ public class RouteBalancer implements IClusterStatusSubscriber {
121127 private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 * 1000L * 1000L * 1000L ;
122128 private final Map <TConsensusGroupId , Long > lastFailedTimeForLeaderBalance ;
123129
124- private final Map <Integer , List <String >> lastBalancedOldLeaderId2RegionMap ;
125- private Map <TConsensusGroupId , Integer > lastDataRegion2OldLeaderMap ;
126- private Set <TConsensusGroupId > lastBalancedDataRegionSet ;
127-
128130 public RouteBalancer (IManager configManager ) {
129131 this .configManager = configManager ;
132+ this .schemaRegionLeaderBalanceLock = new ReentrantLock ();
133+ this .dataRegionLeaderBalanceLock = new ReentrantLock ();
130134 this .priorityMapLock = new ReentrantReadWriteLock ();
131135 this .regionPriorityMap = new TreeMap <>();
132- this .lastFailedTimeForLeaderBalance = new TreeMap <>();
133- this .lastBalancedOldLeaderId2RegionMap = new ConcurrentHashMap <>();
136+ this .lastFailedTimeForLeaderBalance = new ConcurrentHashMap <>();
134137
135- switch (CONF .getLeaderDistributionPolicy ()) {
136- case AbstractLeaderBalancer .GREEDY_POLICY :
137- this .leaderBalancer = new GreedyLeaderBalancer ();
138- break ;
139- case AbstractLeaderBalancer .HASH_POLICY :
140- this .leaderBalancer = new HashLeaderBalancer ();
141- break ;
142- case AbstractLeaderBalancer .CFD_POLICY :
143- default :
144- this .leaderBalancer = new CostFlowSelectionLeaderBalancer ();
145- break ;
146- }
138+ this .schemaLeaderBalancer = createLeaderBalancer ();
139+ this .dataLeaderBalancer = createLeaderBalancer ();
147140
148141 switch (CONF .getRoutePriorityPolicy ()) {
149142 case IPriorityBalancer .GREEDY_POLICY :
@@ -156,23 +149,108 @@ public RouteBalancer(IManager configManager) {
156149 }
157150 }
158151
152+ private static class DataRegionLeaderBalanceResult {
153+
154+ private final Map <TConsensusGroupId , Integer > dataRegion2OldLeaderMap ;
155+ private final Set <TConsensusGroupId > balancedDataRegionSet ;
156+ private final Map <Integer , List <String >> balancedOldLeaderId2RegionMap ;
157+
158+ private DataRegionLeaderBalanceResult (
159+ Map <TConsensusGroupId , Integer > dataRegion2OldLeaderMap ,
160+ Set <TConsensusGroupId > balancedDataRegionSet ,
161+ Map <Integer , List <String >> balancedOldLeaderId2RegionMap ) {
162+ this .dataRegion2OldLeaderMap = dataRegion2OldLeaderMap ;
163+ this .balancedDataRegionSet = balancedDataRegionSet ;
164+ this .balancedOldLeaderId2RegionMap = balancedOldLeaderId2RegionMap ;
165+ }
166+
167+ private static DataRegionLeaderBalanceResult empty () {
168+ return new DataRegionLeaderBalanceResult (
169+ Collections .emptyMap (), Collections .emptySet (), Collections .emptyMap ());
170+ }
171+
172+ private boolean hasBalancedDataRegion () {
173+ return !balancedDataRegionSet .isEmpty ();
174+ }
175+ }
176+
177+ private AbstractLeaderBalancer createLeaderBalancer () {
178+ switch (CONF .getLeaderDistributionPolicy ()) {
179+ case AbstractLeaderBalancer .GREEDY_POLICY :
180+ return new GreedyLeaderBalancer ();
181+ case AbstractLeaderBalancer .HASH_POLICY :
182+ return new HashLeaderBalancer ();
183+ case AbstractLeaderBalancer .CFD_POLICY :
184+ default :
185+ return new CostFlowSelectionLeaderBalancer ();
186+ }
187+ }
188+
159189 /** Balance cluster RegionGroup leader distribution through configured algorithm. */
160- private synchronized void balanceRegionLeader () {
190+ private DataRegionLeaderBalanceResult balanceRegionLeader () {
191+ DataRegionLeaderBalanceResult result = DataRegionLeaderBalanceResult .empty ();
161192 if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION ) {
162- balanceRegionLeader (TConsensusGroupType .SchemaRegion , SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS );
193+ balanceRegionLeader (TConsensusGroupType .SchemaRegion );
163194 }
164195 if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION ) {
165- balanceRegionLeader (TConsensusGroupType .DataRegion , DATA_REGION_CONSENSUS_PROTOCOL_CLASS );
196+ result = balanceRegionLeader (TConsensusGroupType .DataRegion );
166197 }
198+ return result ;
167199 }
168200
169- private void balanceRegionLeader (
170- TConsensusGroupType regionGroupType , String consensusProtocolClass ) {
201+ private DataRegionLeaderBalanceResult balanceRegionLeader (Set <TConsensusGroupId > regionGroupIds ) {
202+ DataRegionLeaderBalanceResult result = DataRegionLeaderBalanceResult .empty ();
203+ if (regionGroupIds .stream ()
204+ .anyMatch (
205+ regionGroupId -> TConsensusGroupType .SchemaRegion .equals (regionGroupId .getType ()))
206+ && IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION ) {
207+ balanceRegionLeader (TConsensusGroupType .SchemaRegion );
208+ }
209+ if (regionGroupIds .stream ()
210+ .anyMatch (
211+ regionGroupId -> TConsensusGroupType .DataRegion .equals (regionGroupId .getType ()))
212+ && IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION ) {
213+ result = balanceRegionLeader (TConsensusGroupType .DataRegion );
214+ }
215+ return result ;
216+ }
217+
218+ private DataRegionLeaderBalanceResult balanceRegionLeader (TConsensusGroupType regionGroupType ) {
219+ final ReentrantLock leaderBalanceLock ;
220+ final AbstractLeaderBalancer targetLeaderBalancer ;
221+ final String consensusProtocolClass ;
222+ switch (regionGroupType ) {
223+ case SchemaRegion :
224+ leaderBalanceLock = schemaRegionLeaderBalanceLock ;
225+ targetLeaderBalancer = schemaLeaderBalancer ;
226+ consensusProtocolClass = SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS ;
227+ break ;
228+ case DataRegion :
229+ leaderBalanceLock = dataRegionLeaderBalanceLock ;
230+ targetLeaderBalancer = dataLeaderBalancer ;
231+ consensusProtocolClass = DATA_REGION_CONSENSUS_PROTOCOL_CLASS ;
232+ break ;
233+ default :
234+ return DataRegionLeaderBalanceResult .empty ();
235+ }
236+
237+ leaderBalanceLock .lock ();
238+ try {
239+ return balanceRegionLeader (regionGroupType , consensusProtocolClass , targetLeaderBalancer );
240+ } finally {
241+ leaderBalanceLock .unlock ();
242+ }
243+ }
244+
245+ private DataRegionLeaderBalanceResult balanceRegionLeader (
246+ TConsensusGroupType regionGroupType ,
247+ String consensusProtocolClass ,
248+ AbstractLeaderBalancer targetLeaderBalancer ) {
171249 // Collect the latest data and generate the optimal leader distribution
172250 Map <TConsensusGroupId , Integer > currentLeaderMap =
173251 getLoadManager ().getLoadCache ().getRegionLeaderMap (regionGroupType );
174252 Map <TConsensusGroupId , Integer > optimalLeaderMap =
175- leaderBalancer .generateOptimalLeaderDistribution (
253+ targetLeaderBalancer .generateOptimalLeaderDistribution (
176254 getLoadManager ().getLoadCache ().getCurrentDatabaseRegionGroupMap (regionGroupType ),
177255 getLoadManager ().getLoadCache ().getCurrentRegionLocationMap (regionGroupType ),
178256 currentLeaderMap ,
@@ -185,6 +263,7 @@ private void balanceRegionLeader(
185263 DataNodeAsyncRequestContext <TRegionLeaderChangeReq , TRegionLeaderChangeResp > clientHandler =
186264 new DataNodeAsyncRequestContext <>(CnToDnAsyncRequestType .CHANGE_REGION_LEADER );
187265 Map <TConsensusGroupId , ConsensusGroupHeartbeatSample > successTransferMap = new TreeMap <>();
266+ Map <Integer , List <String >> balancedOldLeaderId2RegionMap = new HashMap <>();
188267 optimalLeaderMap .forEach (
189268 (regionGroupId , newLeaderId ) -> {
190269 if (ConsensusFactory .RATIS_CONSENSUS .equals (consensusProtocolClass )
@@ -214,7 +293,7 @@ private void balanceRegionLeader(
214293 regionGroupId , new ConsensusGroupHeartbeatSample (currentTime , newLeaderId ));
215294 // Prepare data for flushOldLeader
216295 if (oldLeaderId != -1 ) {
217- lastBalancedOldLeaderId2RegionMap .compute (
296+ balancedOldLeaderId2RegionMap .compute (
218297 oldLeaderId ,
219298 (k , v ) -> {
220299 if (v == null ) {
@@ -259,18 +338,20 @@ private void balanceRegionLeader(
259338 });
260339 if (requestId .get () > 0 ) {
261340 // Don't retry ChangeLeader request
262- CnToDnInternalServiceAsyncRequestManager .getInstance ().sendAsyncRequest (clientHandler );
341+ CnToDnInternalServiceAsyncRequestManager .getInstance ()
342+ .sendAsyncRequest (clientHandler , 1 , RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS );
263343 for (int i = 0 ; i < requestId .get (); i ++) {
264- if (clientHandler .getResponseMap ().get (i ).getStatus ().getCode ()
265- == TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
344+ TRegionLeaderChangeResp response = clientHandler .getResponseMap ().get (i );
345+ if (response != null
346+ && response .getStatus ().getCode () == TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
266347 successTransferMap .put (
267348 clientHandler .getRequest (i ).getRegionId (),
268349 new ConsensusGroupHeartbeatSample (
269- clientHandler . getResponseMap (). get ( i ) .getConsensusLogicalTimestamp (),
350+ response .getConsensusLogicalTimestamp (),
270351 clientHandler .getRequest (i ).getNewLeaderNode ().getDataNodeId ()));
271352 } else {
272353 lastFailedTimeForLeaderBalance .put (
273- clientHandler .getRequest (i ).getRegionId (), currentTime );
354+ clientHandler .getRequest (i ).getRegionId (), System . nanoTime () );
274355 LOGGER .error (
275356 ManagerMessages .LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE ,
276357 clientHandler .getRequest (i ).getRegionId (),
@@ -283,58 +364,59 @@ private void balanceRegionLeader(
283364
284365 // Prepare data for invalidSchemaCacheOfOldLeaders
285366 if (regionGroupType .equals (TConsensusGroupType .DataRegion )) {
286- lastBalancedDataRegionSet = successTransferMap .keySet ();
287- lastDataRegion2OldLeaderMap = currentLeaderMap ;
367+ return new DataRegionLeaderBalanceResult (
368+ new HashMap <>(currentLeaderMap ),
369+ new HashSet <>(successTransferMap .keySet ()),
370+ balancedOldLeaderId2RegionMap );
288371 }
372+ return DataRegionLeaderBalanceResult .empty ();
289373 }
290374
291- private void invalidateSchemaCacheOfOldLeaders () {
292- BiConsumer <Map <TConsensusGroupId , Integer >, Set <TConsensusGroupId >> consumer =
293- (oldLeaderMap , successTransferSet ) -> {
294- final DataNodeAsyncRequestContext <String , TSStatus > invalidateSchemaCacheRequestHandler =
295- new DataNodeAsyncRequestContext <>(CnToDnAsyncRequestType .INVALIDATE_LAST_CACHE );
296- final AtomicInteger requestIndex = new AtomicInteger (0 );
297- oldLeaderMap .entrySet ().stream ()
298- .filter (entry -> successTransferSet .contains (entry .getKey ()))
299- .forEach (
300- entry -> {
301- // set target
302- final Integer dataNodeId = entry .getValue ();
303- if (dataNodeId == -1 ) {
304- return ;
305- }
306- final TDataNodeLocation dataNodeLocation =
307- getNodeManager ().getRegisteredDataNode (dataNodeId ).getLocation ();
308- if (dataNodeLocation == null ) {
309- LOGGER .warn (ManagerMessages .DATANODELOCATION_IS_NULL_DATANODEID , dataNodeId );
310- return ;
311- }
312- invalidateSchemaCacheRequestHandler .putNodeLocation (
313- requestIndex .get (), dataNodeLocation );
314- // set req
315- final TConsensusGroupId consensusGroupId = entry .getKey ();
316- final String database =
317- getPartitionManager ().getRegionDatabase (consensusGroupId );
318- invalidateSchemaCacheRequestHandler .putRequest (requestIndex .get (), database );
319- requestIndex .incrementAndGet ();
320- });
321- CnToDnInternalServiceAsyncRequestManager .getInstance ()
322- .sendAsyncRequest (invalidateSchemaCacheRequestHandler );
323- };
324-
325- if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION ) {
326- consumer .accept (lastDataRegion2OldLeaderMap , lastBalancedDataRegionSet );
375+ private void invalidateSchemaCacheOfOldLeaders (
376+ DataRegionLeaderBalanceResult leaderBalanceResult ) {
377+ if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
378+ || !leaderBalanceResult .hasBalancedDataRegion ()) {
379+ return ;
327380 }
381+
382+ final DataNodeAsyncRequestContext <String , TSStatus > invalidateSchemaCacheRequestHandler =
383+ new DataNodeAsyncRequestContext <>(CnToDnAsyncRequestType .INVALIDATE_LAST_CACHE );
384+ final AtomicInteger requestIndex = new AtomicInteger (0 );
385+ leaderBalanceResult .dataRegion2OldLeaderMap .entrySet ().stream ()
386+ .filter (entry -> leaderBalanceResult .balancedDataRegionSet .contains (entry .getKey ()))
387+ .forEach (
388+ entry -> {
389+ // set target
390+ final Integer dataNodeId = entry .getValue ();
391+ if (dataNodeId == -1 ) {
392+ return ;
393+ }
394+ final TDataNodeLocation dataNodeLocation =
395+ getNodeManager ().getRegisteredDataNode (dataNodeId ).getLocation ();
396+ if (dataNodeLocation == null ) {
397+ LOGGER .warn (ManagerMessages .DATANODELOCATION_IS_NULL_DATANODEID , dataNodeId );
398+ return ;
399+ }
400+ invalidateSchemaCacheRequestHandler .putNodeLocation (
401+ requestIndex .get (), dataNodeLocation );
402+ // set req
403+ final TConsensusGroupId consensusGroupId = entry .getKey ();
404+ final String database = getPartitionManager ().getRegionDatabase (consensusGroupId );
405+ invalidateSchemaCacheRequestHandler .putRequest (requestIndex .get (), database );
406+ requestIndex .incrementAndGet ();
407+ });
408+ CnToDnInternalServiceAsyncRequestManager .getInstance ()
409+ .sendAsyncRequest (invalidateSchemaCacheRequestHandler );
328410 }
329411
330- private void flushOldLeaderIfIoTV2 () {
412+ private void flushOldLeaderIfIoTV2 (DataRegionLeaderBalanceResult leaderBalanceResult ) {
331413 if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
332- || !Objects .equals (
333- DATA_REGION_CONSENSUS_PROTOCOL_CLASS , ConsensusFactory . IOT_CONSENSUS_V2 )) {
414+ || !Objects .equals (DATA_REGION_CONSENSUS_PROTOCOL_CLASS , ConsensusFactory . IOT_CONSENSUS_V2 )
415+ || leaderBalanceResult . balancedOldLeaderId2RegionMap . isEmpty ( )) {
334416 return ;
335417 }
336418
337- BiConsumer < Integer , List < String >> consumer =
419+ leaderBalanceResult . balancedOldLeaderId2RegionMap . forEach (
338420 (oldLeaderId , regionGroupIds ) -> {
339421 TDataNodeConfiguration configuration =
340422 getNodeManager ().getRegisteredDataNode (oldLeaderId );
@@ -358,21 +440,18 @@ private void flushOldLeaderIfIoTV2() {
358440 oldLeaderId ,
359441 regionGroupIds );
360442 }
361- };
362- lastBalancedOldLeaderId2RegionMap .forEach (consumer );
363- // after flush, clear map for next balance
364- lastBalancedOldLeaderId2RegionMap .clear ();
443+ });
365444 }
366445
367- private synchronized void handleBalanceAction () {
368- invalidateSchemaCacheOfOldLeaders ();
369- flushOldLeaderIfIoTV2 ();
446+ private void handleBalanceAction (DataRegionLeaderBalanceResult leaderBalanceResult ) {
447+ invalidateSchemaCacheOfOldLeaders (leaderBalanceResult );
448+ flushOldLeaderIfIoTV2 (leaderBalanceResult );
370449 }
371450
372- public synchronized void balanceRegionLeaderAndPriority () {
373- balanceRegionLeader ();
451+ public void balanceRegionLeaderAndPriority () {
452+ DataRegionLeaderBalanceResult leaderBalanceResult = balanceRegionLeader ();
374453 balanceRegionPriority ();
375- handleBalanceAction ();
454+ handleBalanceAction (leaderBalanceResult );
376455 }
377456
378457 /** Balance cluster RegionGroup route priority through configured algorithm. */
@@ -544,18 +623,19 @@ private LoadManager getLoadManager() {
544623
545624 @ Override
546625 public void onNodeStatisticsChanged (NodeStatisticsChangeEvent event ) {
547- balanceRegionLeader ();
626+ handleBalanceAction ( balanceRegionLeader () );
548627 }
549628
550629 @ Override
551630 public void onRegionGroupStatisticsChanged (RegionGroupStatisticsChangeEvent event ) {
552- balanceRegionLeader ();
631+ handleBalanceAction ( balanceRegionLeader (event . getDifferentRegionGroupStatisticsMap (). keySet ()) );
553632 }
554633
555634 @ Override
556635 public void onConsensusGroupStatisticsChanged (ConsensusGroupStatisticsChangeEvent event ) {
557- balanceRegionLeader ();
636+ DataRegionLeaderBalanceResult leaderBalanceResult =
637+ balanceRegionLeader (event .getDifferentConsensusGroupStatisticsMap ().keySet ());
558638 balanceRegionPriority ();
559- handleBalanceAction ();
639+ handleBalanceAction (leaderBalanceResult );
560640 }
561641}
0 commit comments