Skip to content

Commit d6a29f5

Browse files
committed
cluster: reduce per-request lock overhead in ResponseFuture
Lazy Event: defer Event() creation until result() is actually called. For execute_concurrent (which never calls result()), this eliminates ~620ns of Event construction + Event.set() per request. Merged add_callbacks: register both callback and errback under a single _callback_lock acquisition instead of two separate ones (~80ns saved). _set_final_result/_set_final_exception: capture _event reference under _callback_lock for free-threaded Python safety; skip .set() when Event was never created. _wait_for_result: check result availability under _callback_lock before creating Event, avoiding Event creation entirely when the result arrived before the caller waits. _on_speculative_execute: check _final_result/_final_exception directly instead of relying on Event.is_set(), since Event may be None. All changes are safe under both GIL and free-threaded (PEP 703) Python.
1 parent a553869 commit d6a29f5

2 files changed

Lines changed: 71 additions & 28 deletions

File tree

cassandra/cluster.py

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4477,7 +4477,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
44774477
self._host = host
44784478
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
44794479
self._make_query_plan()
4480-
self._event = Event()
4480+
self._event = None # lazily created on first wait/check
44814481
self._errors = {}
44824482
self._callbacks = None
44834483
self._errbacks = None
@@ -4563,25 +4563,31 @@ def _on_timeout(self, _attempts=0):
45634563

45644564
def _on_speculative_execute(self):
45654565
self._timer = None
4566-
if not self._event.is_set():
4567-
4568-
# PYTHON-836, the speculative queries must be after
4569-
# the query is sent from the main thread, otherwise the
4570-
# query from the main thread may raise NoHostAvailable
4571-
# if the _query_plan has been exhausted by the specualtive queries.
4572-
# This also prevents a race condition accessing the iterator.
4573-
# We reschedule this call until the main thread has succeeded
4574-
# making a query
4575-
if not self.attempted_hosts:
4576-
self._timer = self.session.cluster.connection_class.create_timer(0.01, self._on_speculative_execute)
4577-
return
4566+
# Unlocked reads: under free-threaded Python a stale read may
4567+
# allow a harmless redundant speculative query — acceptable
4568+
# because the duplicate response will be discarded.
4569+
if self._final_result is not _NOT_SET or self._final_exception is not None:
4570+
return # already done
4571+
if self._event is not None and self._event.is_set():
4572+
return # already done (Event path)
4573+
4574+
# PYTHON-836, the speculative queries must be after
4575+
# the query is sent from the main thread, otherwise the
4576+
# query from the main thread may raise NoHostAvailable
4577+
# if the _query_plan has been exhausted by the specualtive queries.
4578+
# This also prevents a race condition accessing the iterator.
4579+
# We reschedule this call until the main thread has succeeded
4580+
# making a query
4581+
if not self.attempted_hosts:
4582+
self._timer = self.session.cluster.connection_class.create_timer(0.01, self._on_speculative_execute)
4583+
return
45784584

4579-
if self._time_remaining is not None:
4580-
if self._time_remaining <= 0:
4581-
self._on_timeout()
4582-
return
4583-
self.send_request(error_no_hosts=False)
4584-
self._start_timer()
4585+
if self._time_remaining is not None:
4586+
if self._time_remaining <= 0:
4587+
self._on_timeout()
4588+
return
4589+
self.send_request(error_no_hosts=False)
4590+
self._start_timer()
45854591

45864592
def _make_query_plan(self):
45874593
# set the query_plan according to the load balancing policy,
@@ -4692,7 +4698,7 @@ def warnings(self):
46924698
Otherwise it may throw if the response has not been received.
46934699
"""
46944700
# TODO: When timers are introduced, just make this wait
4695-
if not self._event.is_set():
4701+
if self._event is None or not self._event.is_set():
46964702
raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized")
46974703
return self._warnings
46984704

@@ -4710,7 +4716,7 @@ def custom_payload(self):
47104716
:return: :ref:`custom_payload`.
47114717
"""
47124718
# TODO: When timers are introduced, just make this wait
4713-
if not self._event.is_set():
4719+
if self._final_result is _NOT_SET and self._final_exception is None:
47144720
raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized")
47154721
return self._custom_payload
47164722

@@ -4729,7 +4735,8 @@ def start_fetching_next_page(self):
47294735

47304736
self._make_query_plan()
47314737
self.message.paging_state = self._paging_state
4732-
self._event.clear()
4738+
if self._event is not None:
4739+
self._event.clear()
47334740
self._final_result = _NOT_SET
47344741
self._final_exception = None
47354742
self._start_timer()
@@ -5029,8 +5036,10 @@ def _set_final_result(self, response):
50295036
)
50305037
else:
50315038
to_call = None
5039+
event = self._event # capture under lock for free-threaded safety
50325040

5033-
self._event.set()
5041+
if event is not None:
5042+
event.set()
50345043

50355044
# apply each callback
50365045
if to_call:
@@ -5056,7 +5065,9 @@ def _set_final_exception(self, response):
50565065
)
50575066
else:
50585067
to_call = None
5059-
self._event.set()
5068+
event = self._event # capture under lock for free-threaded safety
5069+
if event is not None:
5070+
event.set()
50605071

50615072
# apply each callback
50625073
if to_call:
@@ -5143,7 +5154,20 @@ def result(self):
51435154
return ResultSet(self, self._wait_for_result())
51445155

51455156
def _wait_for_result(self):
5146-
self._event.wait()
5157+
# Check under lock whether the result is already available.
5158+
# If so, we can skip Event creation entirely.
5159+
with self._callback_lock:
5160+
if self._final_result is not _NOT_SET:
5161+
return self._final_result
5162+
if self._final_exception is not None:
5163+
raise self._final_exception
5164+
# Result not yet available — ensure Event exists so that
5165+
# _set_final_result/_set_final_exception (which also hold
5166+
# _callback_lock when capturing _event) will call .set().
5167+
if self._event is None:
5168+
self._event = Event()
5169+
event = self._event
5170+
event.wait()
51475171
if self._final_result is not _NOT_SET:
51485172
return self._final_result
51495173
else:
@@ -5288,8 +5312,26 @@ def add_callbacks(self, callback, errback,
52885312
... errback=log_error, errback_args=(query,))
52895313
52905314
"""
5291-
self.add_callback(callback, *callback_args, **(callback_kwargs or {}))
5292-
self.add_errback(errback, *errback_args, **(errback_kwargs or {}))
5315+
cb_kwargs = callback_kwargs or {}
5316+
eb_kwargs = errback_kwargs or {}
5317+
run_callback = False
5318+
run_errback = False
5319+
with self._callback_lock:
5320+
if self._callbacks is None:
5321+
self._callbacks = []
5322+
self._callbacks.append((callback, callback_args, cb_kwargs))
5323+
if self._errbacks is None:
5324+
self._errbacks = []
5325+
self._errbacks.append((errback, errback_args, eb_kwargs))
5326+
if self._final_result is not _NOT_SET:
5327+
run_callback = True
5328+
if self._final_exception:
5329+
run_errback = True
5330+
if run_callback:
5331+
callback(self._final_result, *callback_args, **cb_kwargs)
5332+
if run_errback:
5333+
errback(self._final_exception, *errback_args, **eb_kwargs)
5334+
return self
52935335

52945336
def clear_callbacks(self):
52955337
with self._callback_lock:

tests/unit/test_response_future.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ def test_multiple_errbacks(self):
534534
result = Mock(spec=UnavailableErrorMessage, info={"required_replicas":2, "alive_replicas": 1, "consistency": 1})
535535
result.to_exception.return_value = expected_exception
536536
rf._set_result(None, None, None, result)
537-
rf._event.set()
537+
if rf._event is not None:
538+
rf._event.set()
538539
with pytest.raises(Exception):
539540
rf.result()
540541

0 commit comments

Comments
 (0)