Skip to content

Commit 856efaa

Browse files
committed
connection: clean up failed async keyspace sends
1 parent c4862e7 commit 856efaa

2 files changed

Lines changed: 28 additions & 2 deletions

File tree

cassandra/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,13 @@ def process_result(result):
17451745
# acquire a new request id
17461746
request_id = self.get_request_id()
17471747

1748-
self.send_msg(query, request_id, process_result)
1748+
try:
1749+
self.send_msg(query, request_id, process_result)
1750+
except Exception as exc:
1751+
with self.lock:
1752+
if request_id not in self._requests and request_id not in self.request_ids:
1753+
self.request_ids.append(request_id)
1754+
callback(self, exc)
17491755

17501756
@property
17511757
def is_idle(self):

tests/unit/test_connection.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from cassandra.cluster import Cluster
2323
from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError,
2424
locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager,
25-
ConnectionException, ConnectionShutdown, DefaultEndPoint, ShardAwarePortGenerator)
25+
ConnectionException, ConnectionShutdown, ConnectionBusy, DefaultEndPoint, ShardAwarePortGenerator)
2626
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
2727
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
2828
SupportedMessage, ProtocolHandler, ResultMessage, QueryMessage,
@@ -389,6 +389,26 @@ def test_wait_for_responses_releases_request_id_when_send_raises_after_registrat
389389
assert len(c.request_ids) == initial_request_ids
390390
assert not c._requests
391391

392+
def test_set_keyspace_async_reports_send_failure_and_releases_request_id(self):
393+
c = self.make_connection()
394+
c.push = Mock(side_effect=ConnectionException("write failed"))
395+
initial_in_flight = c.in_flight
396+
initial_request_ids = len(c.request_ids)
397+
callback_errors = []
398+
399+
def callback(conn, error):
400+
callback_errors.append(error)
401+
with conn.lock:
402+
conn.in_flight -= 1
403+
404+
c.set_keyspace_async("ks", callback)
405+
406+
assert len(callback_errors) == 1
407+
assert isinstance(callback_errors[0], ConnectionException)
408+
assert c.in_flight == initial_in_flight
409+
assert len(c.request_ids) == initial_request_ids
410+
assert not c._requests
411+
392412

393413
@patch('cassandra.connection.ConnectionHeartbeat._raise_if_stopped')
394414
class ConnectionHeartbeatTest(unittest.TestCase):

0 commit comments

Comments
 (0)