Skip to content

Commit 573c92e

Browse files
committed
cluster: add control connection query fallback
1 parent e6f9e9f commit 573c92e

4 files changed

Lines changed: 365 additions & 4 deletions

File tree

cassandra/cluster.py

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,17 @@ def default_retry_policy(self, policy):
930930
If set to :const:`None`, there will be no timeout for these queries.
931931
"""
932932

933+
allow_control_connection_query_fallback = False
934+
"""
935+
Enables an opt-in degraded availability path for application queries.
936+
937+
When :const:`True`, a request may be sent on the control connection if
938+
the session has no usable node connection pools. This fallback is disabled
939+
by default because the control connection is normally reserved for driver
940+
metadata and event handling. It is not used for requests targeted to an
941+
explicit host.
942+
"""
943+
933944
idle_heartbeat_interval = 30
934945
"""
935946
Interval, in seconds, on which to heartbeat idle connections. This helps
@@ -1216,7 +1227,8 @@ def __init__(self,
12161227
metadata_request_timeout: Optional[float] = None,
12171228
column_encryption_policy=None,
12181229
application_info:Optional[ApplicationInfoBase]=None,
1219-
client_routes_config:Optional[ClientRoutesConfig]=None
1230+
client_routes_config:Optional[ClientRoutesConfig]=None,
1231+
allow_control_connection_query_fallback:Optional[bool]=False
12201232
):
12211233
"""
12221234
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
@@ -1464,6 +1476,7 @@ def __init__(self,
14641476
self.cql_version = cql_version
14651477
self.max_schema_agreement_wait = max_schema_agreement_wait
14661478
self.control_connection_timeout = control_connection_timeout
1479+
self.allow_control_connection_query_fallback = bool(allow_control_connection_query_fallback)
14671480
self.metadata_request_timeout = self.control_connection_timeout if metadata_request_timeout is None else metadata_request_timeout
14681481
self.idle_heartbeat_interval = idle_heartbeat_interval
14691482
self.idle_heartbeat_timeout = idle_heartbeat_timeout
@@ -4439,6 +4452,7 @@ class ResponseFuture(object):
44394452
_spec_execution_plan = NoSpeculativeExecutionPlan()
44404453
_continuous_paging_session = None
44414454
_host = None
4455+
_control_connection_query_attempted = False
44424456
_TABLET_ROUTING_CTYPE = None
44434457

44444458
_warned_timeout = False
@@ -4459,6 +4473,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
44594473
self._callback_lock = Lock()
44604474
self._start_time = start_time or time.time()
44614475
self._host = host
4476+
self._control_connection_query_attempted = False
44624477
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
44634478
self._make_query_plan()
44644479
self._event = Event()
@@ -4537,11 +4552,16 @@ def _on_timeout(self, _attempts=0):
45374552
self._connection.orphaned_threshold_reached = True
45384553

45394554
pool.return_connection(self._connection, stream_was_orphaned=True)
4555+
elif getattr(self._connection, 'is_control_connection', False):
4556+
with self._connection.lock:
4557+
self._connection.orphaned_request_ids.add(self._req_id)
4558+
if len(self._connection.orphaned_request_ids) >= self._connection.orphaned_threshold:
4559+
self._connection.orphaned_threshold_reached = True
45404560

45414561
errors = self._errors
45424562
if not errors:
45434563
if self.is_schema_agreed:
4544-
key = str(self._current_host.endpoint) if self._current_host else 'no host queried before timeout'
4564+
key = str(self._get_host_endpoint(self._current_host)) if self._current_host else 'no host queried before timeout'
45454565
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
45464566
else:
45474567
connection = self.session.cluster.control_connection._connection
@@ -4599,14 +4619,111 @@ def send_request(self, error_no_hosts=True):
45994619
self._on_timeout()
46004620
return True
46014621
if error_no_hosts:
4622+
if self._fallback_to_control_connection():
4623+
req_id = self._query_control_connection()
4624+
if req_id is not None:
4625+
self._req_id = req_id
4626+
return True
4627+
46024628
self._set_final_exception(NoHostAvailable(
46034629
"Unable to complete the operation against any hosts", self._errors))
46044630
return False
46054631

4632+
@staticmethod
4633+
def _get_host_endpoint(host):
4634+
return getattr(host, 'endpoint', host)
4635+
4636+
def _has_usable_node_pool(self):
4637+
try:
4638+
pools = tuple(self.session._pools.values())
4639+
except (AttributeError, TypeError):
4640+
return False
4641+
4642+
return any(pool and not pool.is_shutdown for pool in pools)
4643+
4644+
def _fallback_to_control_connection(self):
4645+
if getattr(self.session.cluster, 'allow_control_connection_query_fallback', False) is not True:
4646+
return False
4647+
if self._host or self._control_connection_query_attempted:
4648+
return False
4649+
return not self._has_usable_node_pool()
4650+
4651+
def _borrow_control_connection(self, connection):
4652+
with connection.lock:
4653+
if connection.in_flight >= connection.max_request_id:
4654+
raise NoConnectionsAvailable("All request IDs are currently in use")
4655+
connection.in_flight += 1
4656+
return connection.get_request_id()
4657+
4658+
def _release_control_connection_request(self, connection, request_id):
4659+
with connection.lock:
4660+
connection.in_flight -= 1
4661+
connection.request_ids.append(request_id)
4662+
connection._requests.pop(request_id, None)
4663+
4664+
def _handle_control_connection_response(self, connection, cb, response):
4665+
with connection.lock:
4666+
connection.in_flight -= 1
4667+
cb(response)
4668+
4669+
def _query_control_connection(self, message=None, cb=None, connection=None, host=None):
4670+
self._control_connection_query_attempted = True
4671+
4672+
if message is None:
4673+
message = self.message
4674+
4675+
if connection is None:
4676+
control_connection = self.session.cluster.control_connection
4677+
connection = control_connection._connection if control_connection else None
4678+
if not connection:
4679+
self._errors['control connection'] = ConnectionException("Control connection is not connected")
4680+
return None
4681+
4682+
if host is None:
4683+
host = self.session.cluster.get_control_connection_host() or connection.endpoint
4684+
self._current_host = host
4685+
4686+
request_id = None
4687+
request_sent = False
4688+
try:
4689+
request_id = self._borrow_control_connection(connection)
4690+
self._connection = connection
4691+
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
4692+
if cb is None:
4693+
cb = partial(self._set_result, host, connection, None)
4694+
cb = partial(self._handle_control_connection_response, connection, cb)
4695+
4696+
log.debug("No usable node pools; falling back to control connection for host %s", host)
4697+
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
4698+
encoder=self._protocol_handler.encode_message,
4699+
decoder=self._protocol_handler.decode_message,
4700+
result_metadata=result_meta)
4701+
request_sent = True
4702+
self.attempted_hosts.append(host)
4703+
return request_id
4704+
except NoConnectionsAvailable as exc:
4705+
log.debug("Control connection is at capacity")
4706+
self._errors[host] = exc
4707+
except ConnectionBusy as exc:
4708+
log.debug("Control connection is busy")
4709+
self._errors[host] = exc
4710+
except Exception as exc:
4711+
log.debug("Error querying control connection", exc_info=True)
4712+
self._errors[host] = exc
4713+
if self._metrics is not None:
4714+
self._metrics.on_connection_error()
4715+
finally:
4716+
if request_id is not None and not request_sent:
4717+
self._release_control_connection_request(connection, request_id)
4718+
4719+
return None
4720+
46064721
def _query(self, host, message=None, cb=None):
46074722
if message is None:
46084723
message = self.message
46094724

4725+
self._control_connection_query_attempted = False
4726+
46104727
pool = self.session._pools.get(host)
46114728
if not pool:
46124729
self._errors[host] = ConnectionException("Host has been marked down or removed")
@@ -4717,12 +4834,17 @@ def start_fetching_next_page(self):
47174834
self._event.clear()
47184835
self._final_result = _NOT_SET
47194836
self._final_exception = None
4837+
self._control_connection_query_attempted = False
47204838
self._start_timer()
47214839
self.send_request()
47224840

47234841
def _reprepare(self, prepare_message, host, connection, pool):
47244842
cb = partial(self.session.submit, self._execute_after_prepare, host, connection, pool)
4725-
request_id = self._query(host, prepare_message, cb=cb)
4843+
if pool is None and getattr(connection, 'is_control_connection', False):
4844+
request_id = self._query_control_connection(prepare_message, cb=cb,
4845+
connection=connection, host=host)
4846+
else:
4847+
request_id = self._query(host, prepare_message, cb=cb)
47264848
if request_id is None:
47274849
# try to submit the original prepared statement on some other host
47284850
self.send_request()
@@ -4940,7 +5062,10 @@ def _execute_after_prepare(self, host, connection, pool, response):
49405062

49415063
# use self._query to re-use the same host and
49425064
# at the same time properly borrow the connection
4943-
request_id = self._query(host)
5065+
if pool is None and getattr(connection, 'is_control_connection', False):
5066+
request_id = self._query_control_connection(connection=connection, host=host)
5067+
else:
5068+
request_id = self._query(host)
49445069
if request_id is None:
49455070
# this host errored out, move on to the next
49465071
self.send_request()
@@ -5053,6 +5178,11 @@ def _retry_task(self, reuse_connection, host):
50535178
# to retry the operation
50545179
return
50555180

5181+
if self._control_connection_query_attempted:
5182+
self._control_connection_query_attempted = False
5183+
self.send_request()
5184+
return
5185+
50565186
if reuse_connection and self._query(host) is not None:
50575187
return
50585188

docs/api/cassandra/cluster.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Clusters and Sessions
4848

4949
.. autoattribute:: control_connection_timeout
5050

51+
.. autoattribute:: allow_control_connection_query_fallback
52+
5153
.. autoattribute:: idle_heartbeat_interval
5254

5355
.. autoattribute:: idle_heartbeat_timeout

tests/unit/test_cluster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ def test_port_range(self):
184184
with pytest.raises(ValueError):
185185
cluster = Cluster(contact_points=['127.0.0.1'], port=invalid_port)
186186

187+
def test_control_connection_query_fallback_flag(self):
188+
assert Cluster().allow_control_connection_query_fallback is False
189+
assert Cluster(allow_control_connection_query_fallback=True).allow_control_connection_query_fallback is True
190+
187191
def test_compression_autodisabled_without_libraries(self):
188192
with patch.dict('cassandra.cluster.locally_supported_compressions', {}, clear=True):
189193
with patch('cassandra.cluster.log') as patched_logger:

0 commit comments

Comments
 (0)