@@ -97,43 +97,59 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
9797 });
9898 }
9999
100+ // Never hold mutex_ when calling prober_->probeAsync() — the callback may fire
101+ // synchronously (e.g. in tests), which would deadlock a non-recursive mutex.
102+
100103 void startProbeCheck () {
101- std::lock_guard<std::mutex> lock (mutex_);
102- if (!prober_ || !client_ || closed_) {
103- scheduleProbe ();
104- return ;
104+ bool onPrimary;
105+ {
106+ std::lock_guard<std::mutex> lock (mutex_);
107+ if (!prober_ || !client_ || closed_) {
108+ scheduleProbe ();
109+ return ;
110+ }
111+ onPrimary = (currentServiceUrl_ == primary_);
105112 }
106113
107- if (currentServiceUrl_ == primary_) {
108- // On primary — probe it to see if it's still up
114+ if (onPrimary) {
109115 probeCurrentForFailover ();
110116 } else {
111- // On secondary — probe it to see if it's still up, then check switch-back
112117 probeCurrentOnSecondary ();
113118 }
114119 }
115120
116121 // === When on primary: probe current, if down try secondaries ===
117122
118123 void probeCurrentForFailover () {
124+ std::string url;
125+ {
126+ std::lock_guard<std::mutex> lock (mutex_);
127+ url = currentServiceUrl_;
128+ }
129+
119130 auto self = shared_from_this ();
120- std::string url = currentServiceUrl_;
121- prober_->probeAsync (url, PROBE_TIMEOUT_MS , [self, url](bool success) {
122- std::lock_guard<std::mutex> lock (self->mutex_ );
131+ prober_->probeAsync (url, PROBE_TIMEOUT_MS , [self](bool success) {
123132 if (self->closed_ ) return ;
124133
125134 if (success) {
135+ std::lock_guard<std::mutex> lock (self->mutex_ );
126136 self->failedTimestamp_ = -1 ;
127137 self->scheduleProbe ();
128138 return ;
129139 }
130140
131- auto now = currentTimeMs ();
132- if (self->failedTimestamp_ == -1 ) {
133- self->failedTimestamp_ = now;
134- self->scheduleProbe ();
135- } else if (now - self->failedTimestamp_ >= self->failoverDelay_ .count ()) {
136- // Delay elapsed — try secondaries
141+ bool shouldTrySecondary = false ;
142+ {
143+ std::lock_guard<std::mutex> lock (self->mutex_ );
144+ auto now = currentTimeMs ();
145+ if (self->failedTimestamp_ == -1 ) {
146+ self->failedTimestamp_ = now;
147+ } else if (now - self->failedTimestamp_ >= self->failoverDelay_ .count ()) {
148+ shouldTrySecondary = true ;
149+ }
150+ }
151+
152+ if (shouldTrySecondary) {
137153 self->trySecondary (0 );
138154 } else {
139155 self->scheduleProbe ();
@@ -142,32 +158,40 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
142158 }
143159
144160 void trySecondary (size_t index) {
145- if (index >= secondary_.size ()) {
146- // No secondaries are available
147- scheduleProbe ();
148- return ;
161+ std::string target;
162+ {
163+ std::lock_guard<std::mutex> lock (mutex_);
164+ if (closed_ || index >= secondary_.size ()) {
165+ scheduleProbe ();
166+ return ;
167+ }
168+ target = secondary_[index];
149169 }
150170
151171 auto self = shared_from_this ();
152- std::string target = secondary_[index];
153172 prober_->probeAsync (target, PROBE_TIMEOUT_MS , [self, target, index](bool success) {
154- std::lock_guard<std::mutex> lock (self->mutex_ );
155173 if (self->closed_ ) return ;
156174
157175 if (success) {
158- auto now = currentTimeMs ();
159- LOG_INFO (" Current service " << self->currentServiceUrl_ << " has been down for "
160- << (now - self->failedTimestamp_ ) << " ms, switching to "
161- << target);
162- self->performSwitch (target);
163- self->failedTimestamp_ = -1 ;
176+ {
177+ std::lock_guard<std::mutex> lock (self->mutex_ );
178+ auto now = currentTimeMs ();
179+ LOG_INFO (" Current service " << self->currentServiceUrl_ << " has been down for "
180+ << (now - self->failedTimestamp_ )
181+ << " ms, switching to " << target);
182+ self->performSwitch (target);
183+ self->failedTimestamp_ = -1 ;
184+ }
164185 self->scheduleProbe ();
165186 } else {
166- auto now = currentTimeMs ();
167- LOG_WARN (" Current service " << self->currentServiceUrl_ << " has been down for "
168- << (now - self->failedTimestamp_ )
169- << " ms. Failed to switch to " << target
170- << " (not available), trying next." );
187+ {
188+ std::lock_guard<std::mutex> lock (self->mutex_ );
189+ auto now = currentTimeMs ();
190+ LOG_WARN (" Current service " << self->currentServiceUrl_ << " has been down for "
191+ << (now - self->failedTimestamp_ )
192+ << " ms. Failed to switch to " << target
193+ << " (not available), trying next." );
194+ }
171195 self->trySecondary (index + 1 );
172196 }
173197 });
@@ -176,53 +200,70 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
176200 // === When on secondary: probe current, then check primary for switch-back ===
177201
178202 void probeCurrentOnSecondary () {
203+ std::string url;
204+ {
205+ std::lock_guard<std::mutex> lock (mutex_);
206+ url = currentServiceUrl_;
207+ }
208+
179209 auto self = shared_from_this ();
180- std::string url = currentServiceUrl_;
181- prober_->probeAsync (url, PROBE_TIMEOUT_MS , [self, url](bool success) {
182- std::lock_guard<std::mutex> lock (self->mutex_ );
210+ prober_->probeAsync (url, PROBE_TIMEOUT_MS , [self](bool success) {
183211 if (self->closed_ ) return ;
184212
185- if (success) {
186- self->failedTimestamp_ = -1 ;
187- } else {
188- auto now = currentTimeMs ();
189- if (self->failedTimestamp_ == -1 ) {
190- self->failedTimestamp_ = now;
191- } else if (now - self->failedTimestamp_ >= self->failoverDelay_ .count ()) {
192- // Secondary is also down — try primary as recovery
193- self->tryPrimaryRecovery ();
194- return ;
213+ bool shouldTryPrimary = false ;
214+ {
215+ std::lock_guard<std::mutex> lock (self->mutex_ );
216+ if (success) {
217+ self->failedTimestamp_ = -1 ;
218+ } else {
219+ auto now = currentTimeMs ();
220+ if (self->failedTimestamp_ == -1 ) {
221+ self->failedTimestamp_ = now;
222+ } else if (now - self->failedTimestamp_ >= self->failoverDelay_ .count ()) {
223+ shouldTryPrimary = true ;
224+ }
195225 }
196226 }
197227
198- // Check if primary has recovered for switch-back
199- if (self->currentServiceUrl_ != self->primary_ ) {
200- self->probePrimaryForSwitchBack ();
228+ if (shouldTryPrimary) {
229+ self->tryPrimaryRecovery ();
201230 } else {
202- self->scheduleProbe ();
231+ // Check if primary has recovered for switch-back
232+ bool stillOnSecondary;
233+ {
234+ std::lock_guard<std::mutex> lock (self->mutex_ );
235+ stillOnSecondary = (self->currentServiceUrl_ != self->primary_ );
236+ }
237+ if (stillOnSecondary) {
238+ self->probePrimaryForSwitchBack ();
239+ } else {
240+ self->scheduleProbe ();
241+ }
203242 }
204243 });
205244 }
206245
207246 void tryPrimaryRecovery () {
208247 auto self = shared_from_this ();
209248 prober_->probeAsync (primary_, PROBE_TIMEOUT_MS , [self](bool success) {
210- std::lock_guard<std::mutex> lock (self->mutex_ );
211249 if (self->closed_ ) return ;
212250
213- if (success) {
214- auto now = currentTimeMs ();
215- LOG_INFO (" Secondary " << self->currentServiceUrl_ << " has been down for "
216- << (now - self->failedTimestamp_ )
217- << " ms, switching back to primary " << self->primary_ );
218- self->performSwitch (self->primary_ );
219- self->failedTimestamp_ = -1 ;
220- self->recoverTimestamp_ = -1 ;
221- } else {
222- LOG_ERROR (" Current service "
223- << self->currentServiceUrl_ << " has been down for "
224- << (currentTimeMs () - self->failedTimestamp_ ) << " ms. Primary "
225- << self->primary_ << " is also not available." );
251+ {
252+ std::lock_guard<std::mutex> lock (self->mutex_ );
253+ if (success) {
254+ auto now = currentTimeMs ();
255+ LOG_INFO (" Secondary " << self->currentServiceUrl_ << " has been down for "
256+ << (now - self->failedTimestamp_ )
257+ << " ms, switching back to primary " << self->primary_ );
258+ self->performSwitch (self->primary_ );
259+ self->failedTimestamp_ = -1 ;
260+ self->recoverTimestamp_ = -1 ;
261+ } else {
262+ LOG_ERROR (" Current service "
263+ << self->currentServiceUrl_ << " has been down for "
264+ << (currentTimeMs () - self->failedTimestamp_ ) << " ms. Primary "
265+ << self->primary_ << " is also not available." );
266+ }
226267 }
227268 self->scheduleProbe ();
228269 });
@@ -231,25 +272,25 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
231272 void probePrimaryForSwitchBack () {
232273 auto self = shared_from_this ();
233274 prober_->probeAsync (primary_, PROBE_TIMEOUT_MS , [self](bool success) {
234- std::lock_guard<std::mutex> lock (self->mutex_ );
235275 if (self->closed_ ) return ;
236276
237- if (!success) {
238- self->recoverTimestamp_ = -1 ;
239- self->scheduleProbe ();
240- return ;
241- }
242-
243- auto now = currentTimeMs ();
244- if (self->recoverTimestamp_ == -1 ) {
245- self->recoverTimestamp_ = now;
246- } else if (now - self->recoverTimestamp_ >= self->switchBackDelay_ .count ()) {
247- LOG_INFO (" Primary " << self->primary_ << " has been recovered for "
248- << (now - self->recoverTimestamp_ )
249- << " ms, switching back from secondary "
250- << self->currentServiceUrl_ );
251- self->performSwitch (self->primary_ );
252- self->recoverTimestamp_ = -1 ;
277+ {
278+ std::lock_guard<std::mutex> lock (self->mutex_ );
279+ if (!success) {
280+ self->recoverTimestamp_ = -1 ;
281+ } else {
282+ auto now = currentTimeMs ();
283+ if (self->recoverTimestamp_ == -1 ) {
284+ self->recoverTimestamp_ = now;
285+ } else if (now - self->recoverTimestamp_ >= self->switchBackDelay_ .count ()) {
286+ LOG_INFO (" Primary " << self->primary_ << " has been recovered for "
287+ << (now - self->recoverTimestamp_ )
288+ << " ms, switching back from secondary "
289+ << self->currentServiceUrl_ );
290+ self->performSwitch (self->primary_ );
291+ self->recoverTimestamp_ = -1 ;
292+ }
293+ }
253294 }
254295 self->scheduleProbe ();
255296 });
0 commit comments