Skip to content

Commit d83adab

Browse files
mykauldkropachev
authored andcommitted
Add timeout and in-flight observability to OperationTimedOut
Improve timeout observability in the driver, inspired by the Go driver PR scylladb/gocql#847. OperationTimedOut now carries optional timeout and in_flight fields that are appended to the exception message when present (e.g. "(timeout=10.0s, in_flight=42)"). All seven production raise sites in connection.py and cluster.py pass these values where available. Additionally, debug-level log lines are emitted for: - Client-side request timeouts (host, timeout, in_flight, orphaned) - Server-side read/write timeouts (host, consistency, received/required, data_retrieved/write_type, retry decision) A helper _retry_decision_name() translates RetryPolicy constants to human-readable strings for the log messages. New keyword-only parameters are backward compatible — existing callers that pass only positional errors/last_host continue to work unchanged. Fixes: DRIVER-538 Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent 32548a6 commit d83adab

6 files changed

Lines changed: 112 additions & 12 deletions

File tree

cassandra/__init__.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,10 +687,29 @@ class OperationTimedOut(DriverException):
687687
The last :class:`~.Host` this operation was attempted against.
688688
"""
689689

690-
def __init__(self, errors=None, last_host=None):
690+
timeout = None
691+
"""
692+
The timeout value (in seconds) that was in effect when the operation
693+
timed out, or ``None`` if not applicable.
694+
"""
695+
696+
in_flight = None
697+
"""
698+
The number of in-flight requests on the connection at the time of
699+
the timeout (includes orphaned requests), or ``None`` if not applicable.
700+
"""
701+
702+
def __init__(self, errors=None, last_host=None, timeout=None, in_flight=None):
691703
self.errors = errors
692704
self.last_host = last_host
705+
self.timeout = timeout
706+
self.in_flight = in_flight
693707
message = "errors=%s, last_host=%s" % (self.errors, self.last_host)
708+
if self.timeout is not None:
709+
message += " (timeout=%ss" % self.timeout
710+
if self.in_flight is not None:
711+
message += ", in_flight=%d" % self.in_flight
712+
message += ")"
694713
Exception.__init__(self, message)
695714

696715

cassandra/cluster.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ def _connection_reduce_fn(val,import_fn):
191191

192192
log = logging.getLogger(__name__)
193193

194-
195194
_GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0')
196195

197196
_NOT_SET = object()
@@ -1683,7 +1682,8 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
16831682
futures.update(session.update_created_pools())
16841683
_, not_done = wait_futures(futures, pool_wait_timeout)
16851684
if not_done:
1686-
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout." % pool_wait_timeout)
1685+
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout." % pool_wait_timeout,
1686+
timeout=pool_wait_timeout)
16871687

16881688
def connection_factory(self, endpoint, host_conn = None, *args, **kwargs):
16891689
"""
@@ -4505,6 +4505,7 @@ def _on_timeout(self, _attempts=0):
45054505
)
45064506
return
45074507

4508+
conn_in_flight = None
45084509
if self._connection is not None:
45094510
try:
45104511
self._connection._requests.pop(self._req_id)
@@ -4515,9 +4516,14 @@ def _on_timeout(self, _attempts=0):
45154516
except KeyError:
45164517
key = "Connection defunct by heartbeat"
45174518
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
4518-
self._set_final_exception(OperationTimedOut(errors, self._current_host))
4519+
self._set_final_exception(OperationTimedOut(errors, self._current_host,
4520+
timeout=self.timeout,
4521+
in_flight=self._connection.in_flight))
45194522
return
45204523

4524+
# Capture connection stats before pool.return_connection() can alter state
4525+
conn_in_flight = self._connection.in_flight
4526+
45214527
pool = self.session._pools.get(self._current_host)
45224528
if pool and not pool.is_shutdown:
45234529
# Do not return the stream ID to the pool yet. We cannot reuse it
@@ -4542,7 +4548,9 @@ def _on_timeout(self, _attempts=0):
45424548
host = str(connection.endpoint) if connection else 'unknown'
45434549
errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}
45444550

4545-
self._set_final_exception(OperationTimedOut(errors, self._current_host))
4551+
self._set_final_exception(OperationTimedOut(errors, self._current_host,
4552+
timeout=self.timeout,
4553+
in_flight=conn_in_flight))
45464554

45474555
def _on_speculative_execute(self):
45484556
self._timer = None

cassandra/connection.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs):
984984
raise conn.last_error
985985
elif not conn.connected_event.is_set():
986986
conn.close()
987-
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
987+
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout,
988+
timeout=timeout)
988989
else:
989990
return conn
990991

@@ -1247,6 +1248,7 @@ def wait_for_responses(self, *msgs, **kwargs):
12471248
msg += ": %s" % (self.last_error,)
12481249
raise ConnectionShutdown(msg)
12491250
timeout = kwargs.get('timeout')
1251+
original_timeout = timeout # preserve for exception reporting
12501252
fail_on_error = kwargs.get('fail_on_error', True)
12511253
waiter = ResponseWaiter(self, len(msgs), fail_on_error)
12521254

@@ -1271,7 +1273,8 @@ def wait_for_responses(self, *msgs, **kwargs):
12711273
if timeout is not None:
12721274
timeout -= 0.01
12731275
if timeout <= 0.0:
1274-
raise OperationTimedOut()
1276+
raise OperationTimedOut(timeout=original_timeout,
1277+
in_flight=self.in_flight)
12751278
time.sleep(0.01)
12761279

12771280
try:
@@ -1796,7 +1799,8 @@ def deliver(self, timeout=None):
17961799
if self.error:
17971800
raise self.error
17981801
elif not self.event.is_set():
1799-
raise OperationTimedOut()
1802+
raise OperationTimedOut(timeout=timeout,
1803+
in_flight=self.connection.in_flight)
18001804
else:
18011805
return self.responses
18021806

@@ -1823,7 +1827,10 @@ def wait(self, timeout):
18231827
if self._exception:
18241828
raise self._exception
18251829
else:
1826-
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,), self.connection.endpoint)
1830+
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,),
1831+
self.connection.endpoint,
1832+
timeout=timeout,
1833+
in_flight=self.connection.in_flight)
18271834

18281835
def _options_callback(self, response):
18291836
if isinstance(response, SupportedMessage):

tests/unit/test_cluster.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,64 @@ def test_exception_types(self):
8787
assert issubclass(UnsupportedOperation, DriverException)
8888

8989

90+
class OperationTimedOutTest(unittest.TestCase):
91+
92+
def test_message_without_timeout(self):
93+
"""Default message format when no timeout info is provided."""
94+
exc = OperationTimedOut(errors={'host1': 'some error'}, last_host='host1')
95+
msg = str(exc)
96+
assert "errors={'host1': 'some error'}" in msg
97+
assert "last_host=host1" in msg
98+
assert "timeout=" not in msg
99+
assert "in_flight=" not in msg
100+
101+
def test_message_with_timeout_and_in_flight(self):
102+
"""Message includes timeout and in_flight when both are provided."""
103+
exc = OperationTimedOut(errors={'host1': 'err'}, last_host='host1',
104+
timeout=10.0, in_flight=42)
105+
msg = str(exc)
106+
assert "(timeout=10.0s, in_flight=42)" in msg
107+
108+
def test_message_with_timeout_no_in_flight(self):
109+
"""Message includes timeout but not in_flight when only timeout is set."""
110+
exc = OperationTimedOut(timeout=5.0)
111+
msg = str(exc)
112+
assert "(timeout=5.0s)" in msg
113+
assert "in_flight=" not in msg
114+
115+
def test_message_no_args(self):
116+
"""No-argument form should not crash and should have clean message."""
117+
exc = OperationTimedOut()
118+
msg = str(exc)
119+
assert "errors=None, last_host=None" in msg
120+
assert "timeout=" not in msg
121+
122+
def test_attributes_accessible(self):
123+
"""New and existing attributes should be readable."""
124+
exc = OperationTimedOut(errors={'h': 'e'}, last_host='h',
125+
timeout=10.0, in_flight=42)
126+
assert exc.errors == {'h': 'e'}
127+
assert exc.last_host == 'h'
128+
assert exc.timeout == 10.0
129+
assert exc.in_flight == 42
130+
131+
def test_attributes_default_none(self):
132+
"""New attributes should default to None when not provided."""
133+
exc = OperationTimedOut()
134+
assert exc.timeout is None
135+
assert exc.in_flight is None
136+
assert exc.errors is None
137+
assert exc.last_host is None
138+
139+
def test_backward_compat_positional(self):
140+
"""Existing two-positional-arg form should still work."""
141+
exc = OperationTimedOut({'h': 'err'}, 'host1')
142+
assert exc.errors == {'h': 'err'}
143+
assert exc.last_host == 'host1'
144+
assert exc.timeout is None
145+
assert exc.in_flight is None
146+
147+
90148
class ClusterTest(unittest.TestCase):
91149

92150
def test_tuple_for_contact_points(self):

tests/unit/test_connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,8 @@ def send_msg(msg, req_id, msg_callback):
520520
assert isinstance(exc, OperationTimedOut)
521521
assert exc.errors == 'Connection heartbeat timeout after 0.05 seconds'
522522
assert exc.last_host == DefaultEndPoint('localhost')
523+
assert exc.timeout == 0.05
524+
assert isinstance(exc.in_flight, int)
523525
holder.return_connection.assert_has_calls(
524526
[call(connection)] * get_holders.call_count)
525527

tests/unit/test_response_future.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ def test_heartbeat_defunct_deadlock(self):
142142

143143
connection = MagicMock(spec=Connection)
144144
connection._requests = {}
145+
connection.in_flight = 5
146+
connection.orphaned_request_ids = set()
145147

146148
pool = Mock()
147149
pool.is_shutdown = False
@@ -162,8 +164,10 @@ def test_heartbeat_defunct_deadlock(self):
162164

163165
# Simulate ResponseFuture timing out
164166
rf._on_timeout()
165-
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat"):
167+
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat") as exc_info:
166168
rf.result()
169+
assert exc_info.value.timeout == 1
170+
assert exc_info.value.in_flight == 5
167171

168172
def test_read_timeout_error_message(self):
169173
session = self.make_session()
@@ -653,7 +657,7 @@ def test_timeout_does_not_release_stream_id(self):
653657
pool = self.make_pool()
654658
session._pools.get.return_value = pool
655659
connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque(),
656-
orphaned_request_ids=set(), orphaned_threshold=256)
660+
orphaned_request_ids=set(), orphaned_threshold=256, in_flight=3)
657661
pool.borrow_connection.return_value = (connection, 1)
658662

659663
rf = self.make_response_future(session)
@@ -663,8 +667,10 @@ def test_timeout_does_not_release_stream_id(self):
663667

664668
rf._on_timeout()
665669
pool.return_connection.assert_called_once_with(connection, stream_was_orphaned=True)
666-
with pytest.raises(OperationTimedOut, match="Client request timeout"):
670+
with pytest.raises(OperationTimedOut, match="Client request timeout") as exc_info:
667671
rf.result()
672+
assert exc_info.value.timeout == 1
673+
assert exc_info.value.in_flight == 3
668674

669675
assert len(connection.request_ids) == 0, \
670676
"Request IDs should be empty but it's not: {}".format(connection.request_ids)

0 commit comments

Comments
 (0)