Skip to content

Commit 58b9c7f

Browse files
committed
schema-agreement: fix retry deadline cleanup
1 parent 9bccd36 commit 58b9c7f

4 files changed

Lines changed: 48 additions & 11 deletions

File tree

cassandra/cluster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4269,6 +4269,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42694269
if schema_mismatches is None:
42704270
return True
42714271

4272+
elapsed = self._time.time() - start
42724273
log.debug("[control connection] Schemas mismatched, trying again")
42734274
remaining = total_timeout - elapsed
42744275
if remaining > 0:

cassandra/connection.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -642,16 +642,25 @@ def maybe_request_more(self):
642642
if space_in_queue >= max_queue_size / 2:
643643
self.update_next_pages(space_in_queue)
644644

645+
def _send_revise_request(self, request, callback):
646+
with self.connection.lock:
647+
request_id = self.connection.get_request_id()
648+
try:
649+
self.connection.send_msg(request, request_id, callback)
650+
except Exception:
651+
if request_id not in self.connection._requests and request_id not in self.connection.request_ids:
652+
self.connection.request_ids.append(request_id)
653+
raise
654+
645655
def update_next_pages(self, num_next_pages):
646656
try:
647657
self._state.num_pages_requested += num_next_pages
648658
log.debug("Updating backpressure for session %s from %s", self.stream_id, self.connection.host)
649-
with self.connection.lock:
650-
self.connection.send_msg(ReviseRequestMessage(ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE,
651-
self.stream_id,
652-
next_pages=num_next_pages),
653-
self.connection.get_request_id(),
654-
self._on_backpressure_response)
659+
self._send_revise_request(
660+
ReviseRequestMessage(ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE,
661+
self.stream_id,
662+
next_pages=num_next_pages),
663+
self._on_backpressure_response)
655664
except ConnectionShutdown as ex:
656665
log.debug("Failed to update backpressure for session %s from %s, connection is shutdown",
657666
self.stream_id, self.connection.host)
@@ -668,11 +677,10 @@ def _on_backpressure_response(self, response):
668677
def cancel(self):
669678
try:
670679
log.debug("Canceling paging session %s from %s", self.stream_id, self.connection.host)
671-
with self.connection.lock:
672-
self.connection.send_msg(ReviseRequestMessage(ReviseRequestMessage.RevisionType.PAGING_CANCEL,
673-
self.stream_id),
674-
self.connection.get_request_id(),
675-
self._on_cancel_response)
680+
self._send_revise_request(
681+
ReviseRequestMessage(ReviseRequestMessage.RevisionType.PAGING_CANCEL,
682+
self.stream_id),
683+
self._on_cancel_response)
676684
except ConnectionShutdown:
677685
log.debug("Failed to cancel session %s from %s, connection is shutdown",
678686
self.stream_id, self.connection.host)

tests/unit/test_connection.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,19 @@ def callback(conn, error):
410410
assert len(c.request_ids) == initial_request_ids
411411
assert not c._requests
412412

413+
def test_continuous_paging_cancel_releases_request_id_when_send_fails(self):
414+
c = self.make_connection()
415+
c.push = Mock(side_effect=ConnectionException("write failed"))
416+
state = Mock(max_queue_size=100, num_pages_requested=0, num_pages_received=0)
417+
session = c.new_continuous_paging_session(1, Mock(), Mock(), state)
418+
initial_request_ids = len(c.request_ids)
419+
420+
with pytest.raises(ConnectionException):
421+
session.cancel()
422+
423+
assert len(c.request_ids) == initial_request_ids
424+
assert not c._requests
425+
413426

414427
@patch('cassandra.connection.ConnectionHeartbeat._raise_if_stopped')
415428
class ConnectionHeartbeatTest(unittest.TestCase):

tests/unit/test_control_connection.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,21 @@ def test_wait_for_schema_agreement_does_not_sleep_past_deadline_after_mismatch(s
377377
assert not self.control_connection.wait_for_schema_agreement()
378378
assert self.time.clock == self.cluster.max_schema_agreement_wait
379379

380+
def test_wait_for_schema_agreement_counts_query_time_before_mismatch_retry_sleep(self):
381+
self.cluster.max_schema_agreement_wait = 0.1
382+
peer_columns = self.connection.peer_results[0]
383+
mismatching_peer_rows = [list(row) for row in self.connection.peer_results[1]]
384+
mismatching_peer_rows[1][2] = 'b'
385+
386+
def wait_for_responses(*args, **kwargs):
387+
self.time.sleep(0.09)
388+
return _node_meta_results(self.connection.local_results, (peer_columns, mismatching_peer_rows))
389+
390+
self.connection.wait_for_responses.side_effect = wait_for_responses
391+
392+
assert not self.control_connection.wait_for_schema_agreement()
393+
self.assertAlmostEqual(self.time.clock, self.cluster.max_schema_agreement_wait)
394+
380395
def test_wait_for_schema_agreement_skipping(self):
381396
"""
382397
If rpc_address or schema_version isn't set, the host should be skipped

0 commit comments

Comments
 (0)