Skip to content

Commit 4ab9f66

Browse files
committed
feat: Modernize the async routines
1 parent 7d8015e commit 4ab9f66

4 files changed

Lines changed: 52 additions & 40 deletions

File tree

cassandra/cluster.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ def _try_libev_import():
149149
except DependencyException as e:
150150
return (None, e)
151151

152+
def _try_asyncio_import():
153+
try:
154+
from cassandra.io.asyncioreactor import AsyncioConnection
155+
return (AsyncioConnection, None)
156+
except (ImportError, DependencyException) as e:
157+
return (None, e)
158+
152159
def _try_asyncore_import():
153160
try:
154161
from cassandra.io.asyncorereactor import AsyncoreConnection
@@ -168,7 +175,7 @@ def _connection_reduce_fn(val,import_fn):
168175

169176
log = logging.getLogger(__name__)
170177

171-
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
178+
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncio_import, _try_asyncore_import)
172179
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
173180
if not conn_class:
174181
raise DependencyException("Unable to load a default connection class", excs)
@@ -876,25 +883,21 @@ def default_retry_policy(self, policy):
876883
This determines what event loop system will be used for managing
877884
I/O with Cassandra. These are the current options:
878885
879-
* :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
880886
* :class:`cassandra.io.libevreactor.LibevConnection`
887+
* :class:`cassandra.io.asyncioreactor.AsyncioConnection`
888+
* :class:`cassandra.io.asyncorereactor.AsyncoreConnection` (Python < 3.12 only)
881889
* :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
882890
* :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
883891
* :class:`cassandra.io.twistedreactor.TwistedConnection`
884-
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`
885-
886-
By default, ``AsyncoreConnection`` will be used, which uses
887-
the ``asyncore`` module in the Python standard library.
888-
889-
If ``libev`` is installed, ``LibevConnection`` will be used instead.
890892
891-
If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
892-
connection class will be used automatically.
893+
The default is selected automatically using the following priority:
893894
894-
``AsyncioConnection``, which uses the ``asyncio`` module in the Python
895-
standard library, is also available, but currently experimental. Note that
896-
it requires ``asyncio`` features that were only introduced in the 3.4 line
897-
in 3.4.6, and in the 3.5 line in 3.5.1.
895+
1. If ``gevent`` or ``eventlet`` monkey-patching is detected, the
896+
corresponding connection class will be used.
897+
2. If the ``libev`` C extension is available, ``LibevConnection`` is used.
898+
3. ``AsyncioConnection`` is used as the standard-library fallback. This is
899+
the preferred default on Python 3.12+ where ``asyncore`` was removed.
900+
4. On Python < 3.12, ``AsyncoreConnection`` is used as a last resort.
898901
"""
899902

900903
control_connection_timeout = 2.0

cassandra/io/asyncioreactor.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,6 @@
1111
log = logging.getLogger(__name__)
1212

1313

14-
# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
15-
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
16-
# managed coroutines are generator-based, not native coroutines. See PEP 492:
17-
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects
18-
19-
20-
try:
21-
asyncio.run_coroutine_threadsafe
22-
except AttributeError:
23-
raise ImportError(
24-
'Cannot use asyncioreactor without access to '
25-
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
26-
)
27-
28-
2914
class AsyncioTimer(object):
3015
"""
3116
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
@@ -67,11 +52,12 @@ def finish(self):
6752

6853
class AsyncioConnection(Connection):
6954
"""
70-
An experimental implementation of :class:`.Connection` that uses the
71-
``asyncio`` module in the Python standard library for its event loop.
55+
An implementation of :class:`.Connection` that uses the ``asyncio``
56+
module in the Python standard library for its event loop.
7257
73-
Note that it requires ``asyncio`` features that were only introduced in the
74-
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
58+
This is the preferred connection class on Python 3.12+ where the
59+
``asyncore`` module has been removed. It is also used as a fallback
60+
when the libev C extension is not available.
7561
"""
7662

7763
_loop = None
@@ -109,7 +95,6 @@ def initialize_reactor(cls):
10995
cls._loop = None
11096
if cls._loop is None:
11197
cls._loop = asyncio.new_event_loop()
112-
asyncio.set_event_loop(cls._loop)
11398

11499
if not cls._loop_thread:
115100
# daemonize so the loop will be shut down on interpreter
@@ -173,7 +158,7 @@ def push(self, data):
173158

174159
async def _push_msg(self, chunks):
175160
# This lock ensures all chunks of a message are sequential in the Queue
176-
with await self._write_queue_lock:
161+
async with self._write_queue_lock:
177162
for chunk in chunks:
178163
self._write_queue.put_nowait(chunk)
179164

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def is_monkey_patched():
5454
return is_gevent_monkey_patched() or is_eventlet_monkey_patched()
5555

5656
MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
57-
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev")
57+
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "asyncio")
5858

5959

6060
# If set to to true this will force the Cython tests to run regardless of whether they are installed

tests/unit/io/test_asyncioreactor.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
AsyncioConnection, ASYNCIO_AVAILABLE = None, False
22
try:
33
from cassandra.io.asyncioreactor import AsyncioConnection
4-
import asynctest
54
ASYNCIO_AVAILABLE = True
65
except (ImportError, SyntaxError):
76
AsyncioConnection = None
87
ASYNCIO_AVAILABLE = False
98

109
from tests import is_monkey_patched, connection_class
11-
from tests.unit.io.utils import TimerCallback, TimerTestMixin
10+
from tests.unit.io.utils import TimerCallback, TimerTestMixin, submit_and_wait_for_completion
1211

13-
from unittest.mock import patch
12+
from unittest.mock import patch, MagicMock
1413

1514
import unittest
1615
import time
@@ -56,7 +55,7 @@ def setUp(self):
5655
socket_patcher.start()
5756

5857
old_selector = AsyncioConnection._loop._selector
59-
AsyncioConnection._loop._selector = asynctest.TestSelector()
58+
AsyncioConnection._loop._selector = MagicMock()
6059

6160
def reset_selector():
6261
AsyncioConnection._loop._selector = old_selector
@@ -65,6 +64,31 @@ def reset_selector():
6564

6665
super(AsyncioTimerTests, self).setUp()
6766

67+
def test_multi_timer_validation(self):
68+
"""
69+
Override with a wider tolerance for asyncio's thread-based scheduling,
70+
which has inherently more jitter than libev's native event loop.
71+
"""
72+
from tests.unit.io.utils import get_timeout
73+
pending_callbacks = []
74+
completed_callbacks = []
75+
76+
for gross_time in range(0, 100, 1):
77+
timeout = get_timeout(gross_time, 0, 100, 100, False)
78+
callback = TimerCallback(timeout)
79+
self.create_timer(timeout, callback.invoke)
80+
pending_callbacks.append(callback)
81+
82+
while len(pending_callbacks) != 0:
83+
for callback in pending_callbacks:
84+
if callback.was_invoked():
85+
pending_callbacks.remove(callback)
86+
completed_callbacks.append(callback)
87+
time.sleep(.1)
88+
89+
for callback in completed_callbacks:
90+
self.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.25)
91+
6892
def test_timer_cancellation(self):
6993
# Various lists for tracking callback stage
7094
timeout = .1

0 commit comments

Comments
 (0)