Skip to content

Commit ac3149d

Browse files
committed
schema-agreement: retry busy control connection
1 parent e6e8e07 commit ac3149d

2 files changed

Lines changed: 28 additions & 6 deletions

File tree

cassandra/cluster.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4312,7 +4312,23 @@ def _wait_for_schema_agreement(self, connection=None, preloaded_results=None, wa
43124312
"response during schema agreement check: %s", timeout)
43134313
elapsed = self._time.time() - start
43144314
continue
4315-
except (ConnectionException, ConnectionBusy) as exc:
4315+
except ConnectionBusy as exc:
4316+
elapsed = self._time.time() - start
4317+
if schema_mismatches is None:
4318+
fallback_wait = total_timeout - elapsed
4319+
fallback = self._wait_for_schema_agreement_through_session(fallback_wait)
4320+
if fallback is not None:
4321+
return fallback
4322+
raise
4323+
4324+
log.debug("[control connection] Connection busy during schema agreement check: %s",
4325+
exc)
4326+
remaining = total_timeout - elapsed
4327+
if remaining > 0:
4328+
self._time.sleep(min(0.2, remaining))
4329+
elapsed = self._time.time() - start
4330+
continue
4331+
except ConnectionException as exc:
43164332
if isinstance(exc, ConnectionShutdown) and self._is_shutdown:
43174333
log.debug("[control connection] Aborting wait for schema match due to shutdown")
43184334
return None

tests/unit/test_control_connection.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,17 +323,23 @@ def test_wait_for_schema_agreement_does_not_accept_session_fallback_after_known_
323323
assert not self.control_connection.wait_for_schema_agreement()
324324
session.wait_for_schema_agreement.assert_not_called()
325325

326-
def test_wait_for_schema_agreement_does_not_accept_session_fallback_after_mismatch_then_busy(self):
326+
def test_wait_for_schema_agreement_retries_control_connection_after_mismatch_then_busy(self):
327327
session = Mock(is_shutdown=False)
328328
session.wait_for_schema_agreement.return_value = True
329329
self.cluster.sessions = [session]
330-
self.connection.peer_results[1][1][2] = 'b'
330+
331+
peer_columns = self.connection.peer_results[0]
332+
mismatching_peer_rows = [list(row) for row in self.connection.peer_results[1]]
333+
mismatching_peer_rows[1][2] = 'b'
334+
matching_peer_rows = [list(row) for row in self.connection.peer_results[1]]
331335
self.connection.wait_for_responses.side_effect = [
332-
_node_meta_results(self.connection.local_results, self.connection.peer_results),
333-
ConnectionBusy("overloaded")]
336+
_node_meta_results(self.connection.local_results, (peer_columns, mismatching_peer_rows)),
337+
ConnectionBusy("overloaded"),
338+
_node_meta_results(self.connection.local_results, (peer_columns, matching_peer_rows))]
334339

335-
assert not self.control_connection.wait_for_schema_agreement()
340+
assert self.control_connection.wait_for_schema_agreement()
336341
session.wait_for_schema_agreement.assert_not_called()
342+
assert self.connection.wait_for_responses.call_count == 3
337343

338344
def test_wait_for_schema_agreement_does_not_exceed_configured_wait_with_session_fallback(self):
339345
session = Mock(is_shutdown=False)

0 commit comments

Comments
 (0)