|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 |
|
16 | | -from collections import namedtuple |
| 16 | +from collections import deque, namedtuple |
17 | 17 | from heapq import heappush, heappop |
18 | 18 | from itertools import cycle |
19 | | -from threading import Condition |
| 19 | +from threading import Condition, Event, Thread |
20 | 20 | import sys |
21 | 21 |
|
22 | 22 | from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT |
@@ -193,28 +193,126 @@ class ConcurrentExecutorListResults(_ConcurrentExecutor): |
193 | 193 |
|
194 | 194 | def execute(self, concurrency, fail_fast): |
195 | 195 | self._exception = None |
196 | | - return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast) |
| 196 | + self._submit_ready = deque() |
| 197 | + self._submit_event = Event() |
| 198 | + self._stop_event = Event() |
| 199 | + # Submit the initial batch from the calling thread (no contention |
| 200 | + # yet -- the submitter thread is not started until afterward). |
| 201 | + result = super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast) |
| 202 | + return result |
| 203 | + |
| 204 | + def _results(self): |
| 205 | + # Start the submitter thread *after* the initial batch has been |
| 206 | + # fully dispatched so that _enum_statements and _exec_count are |
| 207 | + # not accessed concurrently during the seeding phase. |
| 208 | + self._submitter = Thread(target=self._submitter_loop, |
| 209 | + daemon=True, name="concurrent-submitter") |
| 210 | + self._submitter.start() |
| 211 | + |
| 212 | + try: |
| 213 | + with self._condition: |
| 214 | + while self._current < self._exec_count: |
| 215 | + self._condition.wait() |
| 216 | + if self._exception and self._fail_fast: |
| 217 | + break |
| 218 | + finally: |
| 219 | + self._stop_event.set() |
| 220 | + self._submit_event.set() # wake submitter so it sees the stop |
| 221 | + self._submitter.join() |
| 222 | + if self._exception and self._fail_fast: |
| 223 | + raise self._exception |
| 224 | + return [r[1] for r in sorted(self._results_queue)] |
197 | 225 |
|
198 | 226 | def _put_result(self, result, idx, success): |
| 227 | + """Record a completion and signal the submitter thread. |
| 228 | +
|
| 229 | + Called from the event-loop callback thread (or from the submitter |
| 230 | + thread when execute_async raises synchronously). Increments |
| 231 | + ``_current`` under the condition lock so the waiter in _results() |
| 232 | + can detect completion. Appends a signal to ``_submit_ready`` so |
| 233 | + the submitter thread knows it can submit a replacement request. |
| 234 | + """ |
199 | 235 | self._results_queue.append((idx, ExecutionResult(success, result))) |
200 | 236 | with self._condition: |
201 | 237 | self._current += 1 |
202 | 238 | if not success and self._fail_fast: |
203 | 239 | if not self._exception: |
204 | 240 | self._exception = result |
205 | 241 | self._condition.notify() |
206 | | - elif not self._execute_next() and self._current == self._exec_count: |
| 242 | + elif self._current >= self._exec_count: |
207 | 243 | self._condition.notify() |
208 | | - |
209 | | - def _results(self): |
210 | | - with self._condition: |
211 | | - while self._current < self._exec_count: |
212 | | - self._condition.wait() |
213 | | - if self._exception and self._fail_fast: |
214 | | - raise self._exception |
215 | | - if self._exception and self._fail_fast: # raise the exception even if there was no wait |
216 | | - raise self._exception |
217 | | - return [r[1] for r in sorted(self._results_queue)] |
| 244 | + # Signal the submitter thread to send the next request instead of |
| 245 | + # calling _execute_next() inline. This keeps the event-loop thread |
| 246 | + # (which fires the callback) free to process I/O rather than doing |
| 247 | + # query-plan lookup, message serialisation, and connection borrowing. |
| 248 | + self._submit_ready.append(1) |
| 249 | + self._submit_event.set() |
| 250 | + |
| 251 | + def _submitter_loop(self): |
| 252 | + """Drain completion signals and submit follow-up requests. |
| 253 | +
|
| 254 | + Runs on a dedicated thread so that the libev event-loop thread |
| 255 | + only needs to do the lightweight ``deque.append`` + ``Event.set`` |
| 256 | + in ``_put_result`` rather than the full execute_async cycle |
| 257 | + (query-plan, borrow connection, serialise, enqueue). |
| 258 | +
|
| 259 | + Completion accounting (_current) is handled entirely by |
| 260 | + ``_put_result`` -- this thread only needs to know *how many* |
| 261 | + new requests to submit (one per completion signal drained). |
| 262 | + """ |
| 263 | + ready = self._submit_ready |
| 264 | + ready_event = self._submit_event |
| 265 | + stop_event = self._stop_event |
| 266 | + enum_stmts = self._enum_statements |
| 267 | + session = self.session |
| 268 | + profile = self._execution_profile |
| 269 | + on_success = self._on_success |
| 270 | + on_error = self._on_error |
| 271 | + condition = self._condition |
| 272 | + exhausted = False |
| 273 | + while not stop_event.is_set(): |
| 274 | + ready_event.wait() |
| 275 | + ready_event.clear() |
| 276 | + # Drain all pending completion signals. |
| 277 | + count = 0 |
| 278 | + while True: |
| 279 | + try: |
| 280 | + ready.popleft() |
| 281 | + count += 1 |
| 282 | + except IndexError: |
| 283 | + break |
| 284 | + if count == 0: |
| 285 | + continue |
| 286 | + if exhausted: |
| 287 | + continue |
| 288 | + # Submit follow-up requests directly (fast path). |
| 289 | + # The iterator is only consumed from this thread (the initial |
| 290 | + # batch was fully dispatched before this thread started). |
| 291 | + for _ in range(count): |
| 292 | + try: |
| 293 | + idx, (statement, params) = next(enum_stmts) |
| 294 | + except StopIteration: |
| 295 | + exhausted = True |
| 296 | + break |
| 297 | + # Bump _exec_count *before* dispatching so that _results() |
| 298 | + # never sees _current >= _exec_count while requests are |
| 299 | + # still in flight. |
| 300 | + with condition: |
| 301 | + self._exec_count += 1 |
| 302 | + try: |
| 303 | + future = session.execute_async(statement, params, |
| 304 | + timeout=None, |
| 305 | + execution_profile=profile) |
| 306 | + args = (future, idx) |
| 307 | + future.add_callbacks( |
| 308 | + callback=on_success, callback_args=args, |
| 309 | + errback=on_error, errback_args=args) |
| 310 | + except Exception as exc: |
| 311 | + # Record the failure directly. _put_result handles |
| 312 | + # _current accounting and will enqueue another signal |
| 313 | + # to _submit_ready -- but that is fine because the |
| 314 | + # next drain will attempt another next(enum_stmts). |
| 315 | + self._put_result(exc, idx, False) |
218 | 316 |
|
219 | 317 |
|
220 | 318 |
|
|
0 commit comments