66import time
77from RLTest .utils import Colors
88
9+ # Interval in seconds between status updates during cluster wait
10+ CLUSTER_STATUS_INTERVAL_SEC = 5
11+
912
1013class ClusterEnv (object ):
1114 def __init__ (self , ** kwargs ):
@@ -23,6 +26,7 @@ def __init__(self, **kwargs):
2326 self .protocol = kwargs .get ('protocol' , 2 )
2427 self .terminateRetries = kwargs .get ('terminateRetries' , None )
2528 self .terminateRetrySecs = kwargs .get ('terminateRetrySecs' , None )
29+ self .clusterStartTimeout = kwargs .pop ('clusterStartTimeout' , 40 )
2630 startPort = kwargs .pop ('port' , 10000 )
2731 totalRedises = self .shardsCount * (2 if useSlaves else 1 )
2832 randomizePorts = kwargs .pop ('randomizePorts' , False )
@@ -50,7 +54,8 @@ def getInformationBeforeDispose(self):
5054 def getInformationAfterDispose (self ):
5155 return [shard .getInformationAfterDispose () for shard in self .shards ]
5256
53- def _agreeOk (self ):
57+ def _countOk (self ):
58+ """Returns count of shards reporting cluster_state:ok"""
5459 ok = 0
5560 for shard in self .shards :
5661 con = shard .getConnection ()
@@ -61,9 +66,10 @@ def _agreeOk(self):
6166 continue
6267 if 'cluster_state:ok' in str (status ):
6368 ok += 1
64- return ok == len ( self . shards )
69+ return ok
6570
66- def _agreeSlots (self ):
71+ def _countAgreeSlots (self ):
72+ """Returns count of shards that agree on slots view"""
6773 ok = 0
6874 first_view = None
6975 for shard in self .shards :
@@ -77,35 +83,63 @@ def _agreeSlots(self):
7783 first_view = slots_view
7884 if slots_view == first_view :
7985 ok += 1
80- return ok == len ( self . shards )
86+ return ok
8187
82- def waitCluster (self , timeout_sec = 40 ):
88+ def waitCluster (self , timeout_sec = 40 , verbose = True ):
8389 st = time .time ()
90+ last_status_time = st
91+ total_shards = len (self .shards )
92+
93+ if verbose :
94+ print (Colors .Yellow ('Waiting for cluster to be ready (timeout: %d seconds, %d shards)...' %
95+ (timeout_sec , total_shards )))
8496
8597 while st + timeout_sec > time .time ():
86- if self ._agreeOk () and self ._agreeSlots ():
98+ ok_count = self ._countOk ()
99+ slots_count = self ._countAgreeSlots ()
100+
101+ if ok_count == total_shards and slots_count == total_shards :
102+ elapsed = time .time () - st
103+ if verbose :
104+ print (Colors .Green ('Cluster is ready after %.1f seconds' % elapsed ))
87105 for shard in self .shards :
88106 try :
89107 shard .getConnection ().execute_command ('SEARCH.CLUSTERREFRESH' )
90108 except Exception :
91109 pass
92110 return
93111
112+ # Print periodic status update
113+ now = time .time ()
114+ if verbose and (now - last_status_time ) >= CLUSTER_STATUS_INTERVAL_SEC :
115+ elapsed = now - st
116+ print (Colors .Yellow (' Cluster wait: %.1fs elapsed - %d/%d shards OK, %d/%d agree on slots...' %
117+ (elapsed , ok_count , total_shards , slots_count , total_shards )))
118+ last_status_time = now
119+
94120 time .sleep (0.1 )
95121 raise RuntimeError (
96122 "Cluster OK wait loop timed out after %s seconds" % timeout_sec )
97123
98124 def startEnv (self , masters = True , slaves = True ):
99125 if self .envIsUp == True :
100126 return # env is already up
127+
128+ total_shards = len (self .shards )
129+ print (Colors .Yellow ('Starting cluster with %d shards...' % total_shards ))
130+
101131 try :
102- for shard in self .shards :
132+ for i , shard in enumerate ( self .shards ) :
103133 shard .startEnv (masters , slaves )
104- except Exception :
134+ print (Colors .Yellow (' Started shard %d/%d' % (i + 1 , total_shards )))
135+ except Exception as e :
136+ print (Colors .Bred ('Error starting shard %d: %s' % (i + 1 , str (e ))))
137+ print (Colors .Bred ('Stopping all shards...' ))
105138 for shard in self .shards :
106139 shard .stopEnv ()
107140 raise
108141
142+ print (Colors .Yellow ('Configuring cluster topology...' ))
109143 slots_per_node = int (16384 / len (self .shards )) + 1
110144 for i , shard in enumerate (self .shards ):
111145 con = shard .getConnection ()
@@ -121,10 +155,14 @@ def startEnv(self, masters=True, slaves=True):
121155 try :
122156 con .execute_command ('CLUSTER' , 'ADDSLOTS' , * (str (x )
123157 for x in range (start_slot , end_slot )))
124- except Exception :
125- pass
158+ except Exception as e :
159+ print (Colors .Bred (' Error assigning slots %d-%d to shard %d: %s' %
160+ (start_slot , end_slot - 1 , i + 1 , str (e ))))
126161
127- self .waitCluster ()
162+ print (Colors .Yellow (' Configured shard %d/%d (slots %d-%d)' %
163+ (i + 1 , total_shards , start_slot , min (end_slot - 1 , 16383 ))))
164+
165+ self .waitCluster (timeout_sec = self .clusterStartTimeout )
128166 self .envIsUp = True
129167 self .envIsHealthy = True
130168
0 commit comments