Skip to content

Commit 11c0191

Browse files
committed
concurrent: update submitter thread to cycle-6 (lock-free event-loop hot path)
1 parent c5eaf99 commit 11c0191

1 file changed

Lines changed: 127 additions & 49 deletions

File tree

cassandra/concurrent.py

Lines changed: 127 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -195,40 +195,76 @@ def execute(self, concurrency, fail_fast):
195195
self._exception = None
196196
self._submit_ready = deque()
197197
self._submit_event = Event()
198-
self._submit_stopped = False
198+
self._stop_event = Event()
199+
self._exhausted = False
199200
# Submit the initial batch from the calling thread (no contention
200201
# yet -- the submitter thread is not started until afterward).
201-
result = super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast)
202-
return result
202+
# Track whether the initial batch consumed all statements.
203+
self._fail_fast = fail_fast
204+
self._results_queue = []
205+
self._current = 0
206+
self._exec_count = 0
207+
with self._condition:
208+
for n in range(concurrency):
209+
if not self._execute_next():
210+
self._exhausted = True
211+
break
212+
return self._results()
203213

204214
def _results(self):
205-
# Start the submitter thread *after* the initial batch has been
206-
# fully dispatched so that _enum_statements and _exec_count are
207-
# not accessed concurrently during the seeding phase.
215+
# Always start the submitter thread: it owns ``_current`` accounting
216+
# (incrementing from drained completion signals) so the event-loop
217+
# callback path can stay lock-free in the success case. Even when
218+
# the iterator was fully consumed by the initial batch, the
219+
# submitter still needs to run to record completions.
208220
self._submitter = Thread(target=self._submitter_loop,
209221
daemon=True, name="concurrent-submitter")
210222
self._submitter.start()
211223

212-
with self._condition:
213-
while self._current < self._exec_count:
214-
self._condition.wait()
215-
if self._exception and self._fail_fast:
216-
break
217-
self._submit_stopped = True
218-
self._submit_event.set()
224+
try:
225+
with self._condition:
226+
while not self._exhausted or self._current < self._exec_count:
227+
self._condition.wait()
228+
if self._exception and self._fail_fast:
229+
break
230+
finally:
231+
self._stop_event.set()
232+
self._submit_event.set() # wake submitter so it sees the stop
233+
self._submitter.join()
219234
if self._exception and self._fail_fast:
220235
raise self._exception
221236
return [r[1] for r in sorted(self._results_queue)]
222237

223238
def _put_result(self, result, idx, success):
239+
"""Record a completion and signal the submitter thread.
240+
241+
Called from the event-loop callback thread (or from the submitter
242+
thread when execute_async raises synchronously).
243+
244+
Hot path (success, not fail-fast): NO lock acquisition. We rely on
245+
the submitter thread to bump ``_current`` from the drained signal
246+
count under the same lock acquisition that bumps ``_exec_count``.
247+
This removes ~0.5-1us of lock cost from every callback on the
248+
event-loop thread.
249+
250+
Note: ``self._results_queue.append`` and ``self._submit_ready.append``
251+
are safe under the GIL (CPython list/deque appends are atomic).
252+
Under free-threaded builds (PEP 703) the GIL is removed; this
253+
module assumes a GIL build, which is the default for the driver's
254+
supported Python versions.
255+
"""
224256
self._results_queue.append((idx, ExecutionResult(success, result)))
225257
if not success and self._fail_fast:
226-
if not self._exception:
227-
self._exception = result
228-
# Signal the submitter thread to send the next request instead of
229-
# calling _execute_next() inline. This keeps the event-loop thread
230-
# (which fires the callback) free to process I/O rather than doing
231-
# query-plan lookup, message serialisation, and connection borrowing.
258+
# Cold path: take the lock to record the exception and wake
259+
# the main thread immediately so it can stop waiting.
260+
with self._condition:
261+
if not self._exception:
262+
self._exception = result
263+
self._condition.notify()
264+
# Signal the submitter thread. It will:
265+
# 1) bump _current under the lock from the drained signal count,
266+
# 2) submit a replacement request,
267+
# 3) notify _results() if all completions have arrived.
232268
self._submit_ready.append(1)
233269
self._submit_event.set()
234270

@@ -240,22 +276,24 @@ def _submitter_loop(self):
240276
in ``_put_result`` rather than the full execute_async cycle
241277
(query-plan, borrow connection, serialise, enqueue).
242278
243-
Calls execute_async directly instead of going through the
244-
_execute / _execute_next indirection to avoid per-request
245-
overhead from the re-entrancy guard and pending-executions list.
279+
Owns ``_current`` accounting: each drained completion signal
280+
increments ``_current`` by one under the same lock acquisition
281+
that bumps ``_exec_count`` for the new batch. This keeps the
282+
event-loop callback path lock-free in the success case.
246283
"""
247284
ready = self._submit_ready
248285
ready_event = self._submit_event
286+
stop_event = self._stop_event
249287
enum_stmts = self._enum_statements
250288
session = self.session
251289
profile = self._execution_profile
252290
on_success = self._on_success
253291
on_error = self._on_error
254-
exec_count = self._exec_count # snapshot after initial batch
255-
exhausted = False
256-
while not self._submit_stopped:
292+
condition = self._condition
293+
while not stop_event.is_set():
257294
ready_event.wait()
258295
ready_event.clear()
296+
# Drain all pending completion signals.
259297
count = 0
260298
while True:
261299
try:
@@ -265,34 +303,74 @@ def _submitter_loop(self):
265303
break
266304
if count == 0:
267305
continue
306+
if stop_event.is_set():
307+
# Main thread is shutting down (e.g. fail-fast). Do the
308+
# accounting for already-completed requests but skip
309+
# dispatching new ones.
310+
with condition:
311+
self._current += count
312+
if self._exhausted and self._current >= self._exec_count:
313+
condition.notify()
314+
continue
315+
if self._exhausted:
316+
# No more statements to dispatch -- just account for the
317+
# completions we just drained and notify the waiter if
318+
# everything has caught up.
319+
with condition:
320+
self._current += count
321+
if self._current >= self._exec_count:
322+
condition.notify()
323+
continue
268324
# Submit follow-up requests directly (fast path).
269325
# The iterator is only consumed from this thread (the initial
270326
# batch was fully dispatched before this thread started).
271-
if not exhausted:
272-
for _ in range(count):
273-
try:
274-
idx, (statement, params) = next(enum_stmts)
275-
except StopIteration:
276-
exhausted = True
277-
break
278-
exec_count += 1
279-
try:
280-
future = session.execute_async(statement, params,
281-
timeout=None,
282-
execution_profile=profile)
283-
args = (future, idx)
284-
future.add_callbacks(
285-
callback=on_success, callback_args=args,
286-
errback=on_error, errback_args=args)
287-
except Exception as exc:
288-
self._put_result(exc, idx, False)
289-
with self._condition:
290-
self._exec_count = exec_count
327+
#
328+
# Pull statements from the iterator first, then bump _current
329+
# and _exec_count for the entire batch in one lock acquisition,
330+
# then dispatch. This avoids per-request lock overhead while
331+
# ensuring _results() never sees _current >= _exec_count
332+
# prematurely.
333+
batch = []
334+
iterator_done = False
335+
for _ in range(count):
336+
try:
337+
batch.append(next(enum_stmts))
338+
except StopIteration:
339+
iterator_done = True
340+
break
341+
# Single lock acquisition: bump both _current (from the
342+
# drained completion count) and _exec_count (from the new
343+
# batch size) atomically. Setting _exhausted in the same
344+
# critical section ensures the main thread never sees
345+
# _exhausted=True with a stale _exec_count.
346+
with condition:
291347
self._current += count
292-
if self._current >= self._exec_count:
293-
self._condition.notify()
294-
if self._exception and self._fail_fast:
295-
self._condition.notify()
348+
self._exec_count += len(batch)
349+
if iterator_done:
350+
self._exhausted = True
351+
# Wake the waiter if all completions have caught up.
352+
if self._exhausted and self._current >= self._exec_count:
353+
condition.notify()
354+
# Re-check stop after the lock release: fail-fast may have
355+
# arrived while we were holding the lock; avoid dispatching
356+
# requests we know will be discarded.
357+
if stop_event.is_set():
358+
continue
359+
for idx, (statement, params) in batch:
360+
try:
361+
future = session.execute_async(statement, params,
362+
timeout=None,
363+
execution_profile=profile)
364+
args = (future, idx)
365+
future.add_callbacks(
366+
callback=on_success, callback_args=args,
367+
errback=on_error, errback_args=args)
368+
except Exception as exc:
369+
# Record the failure directly. _put_result handles
370+
# _current accounting and will enqueue another signal
371+
# to _submit_ready -- but that is fine because the
372+
# next drain will attempt another next(enum_stmts).
373+
self._put_result(exc, idx, False)
296374

297375

298376

0 commit comments

Comments
 (0)