Skip to content

Commit 058b202

Browse files
committed
connection: clean up failed heartbeat sends
1 parent 4651323 commit 058b202

2 files changed

Lines changed: 25 additions & 2 deletions

File tree

cassandra/connection.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1833,7 +1833,15 @@ def __init__(self, connection, owner):
18331833
with connection.lock:
18341834
if connection.in_flight < connection.max_request_id:
18351835
connection.in_flight += 1
1836-
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
1836+
request_id = connection.get_request_id()
1837+
try:
1838+
connection.send_msg(OptionsMessage(), request_id, self._options_callback)
1839+
except Exception as exc:
1840+
connection.in_flight -= 1
1841+
if request_id not in connection._requests and request_id not in connection.request_ids:
1842+
connection.request_ids.append(request_id)
1843+
self._exception = exc
1844+
self._event.set()
18371845
else:
18381846
self._exception = Exception("Failed to send heartbeat because connection 'in_flight' exceeds threshold")
18391847
self._event.set()

tests/unit/test_connection.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from cassandra import ConsistencyLevel, OperationTimedOut
2222
from cassandra.cluster import Cluster
2323
from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError,
24-
locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager,
24+
locally_supported_compressions, ConnectionHeartbeat, HeartbeatFuture, _Frame, Timer, TimerManager,
2525
ConnectionBusy, ConnectionException, ConnectionShutdown, DefaultEndPoint,
2626
ShardAwarePortGenerator)
2727
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
@@ -510,6 +510,21 @@ def test_no_req_ids(self, *args):
510510
holder.return_connection.assert_has_calls(
511511
[call(max_connection)] * get_holders.call_count)
512512

513+
def test_heartbeat_future_releases_request_id_when_send_fails(self, *args):
514+
connection = Connection(DefaultEndPoint('1.2.3.4'))
515+
connection.push = Mock(side_effect=ConnectionException("write failed"))
516+
initial_in_flight = connection.in_flight
517+
initial_request_ids = len(connection.request_ids)
518+
519+
future = HeartbeatFuture(connection, Mock())
520+
521+
with pytest.raises(ConnectionException):
522+
future.wait(0)
523+
524+
assert connection.in_flight == initial_in_flight
525+
assert len(connection.request_ids) == initial_request_ids
526+
assert not connection._requests
527+
513528
def test_unexpected_response(self, *args):
514529
request_id = 999
515530

0 commit comments

Comments
 (0)