Skip to content

Commit 88b0f45

Browse files
committed
cluster: add control-connection query fallback
Allow application queries to fall back to the control connection when the driver cannot populate normal pools, such as when the cluster is reached through a non-broadcast API address like a TCP proxy or a node public IP. This is a degraded path with poor throughput and higher churn-related error risk, so it should stay disabled in production.\n\nAlso propagate keyspace updates on the fallback path so USE statements keep the control connection in sync.
1 parent 573c92e commit 88b0f45

3 files changed

Lines changed: 71 additions & 6 deletions

File tree

cassandra/cluster.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -932,13 +932,17 @@ def default_retry_policy(self, policy):
932932

933933
allow_control_connection_query_fallback = False
934934
"""
935-
Enables an opt-in degraded availability path for application queries.
935+
Enables an opt-in degraded path for application queries.
936936
937937
When :const:`True`, a request may be sent on the control connection if
938-
the session has no usable node connection pools. This fallback is disabled
939-
by default because the control connection is normally reserved for driver
940-
metadata and event handling. It is not used for requests targeted to an
941-
explicit host.
938+
the session has no usable node connection pools. This is intended for
939+
deployments that expose the cluster through a non-broadcast API address,
940+
such as a TCP proxy or a node's public IP address, where the driver
941+
cannot fill the normal pool set. Queries can still execute over the single
942+
control connection, but throughput is poor and connection churn raises the
943+
chance of request errors. Do not enable this in production.
944+
945+
This fallback is not used for requests targeted to an explicit host.
942946
"""
943947

944948
idle_heartbeat_interval = 30
@@ -1235,6 +1239,10 @@ def __init__(self,
12351239
extablishing connection pools or refreshing metadata.
12361240
12371241
Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
1242+
``allow_control_connection_query_fallback`` is a degraded-availability
1243+
setting for cases where the driver reaches the cluster through a
1244+
non-broadcast API address and cannot populate the normal pools. It
1245+
should not be enabled in production.
12381246
"""
12391247

12401248
# Handle port passed as string
@@ -3382,7 +3390,7 @@ def pool_finished_setting_keyspace(pool, host_errors):
33823390
errors[pool.host] = host_errors
33833391

33843392
if not remaining_callbacks:
3385-
callback(host_errors)
3393+
callback(errors)
33863394

33873395
for pool in tuple(self._pools.values()):
33883396
pool._set_keyspace_for_all_conns(keyspace, pool_finished_setting_keyspace)
@@ -4883,6 +4891,8 @@ def _set_result(self, host, connection, pool, response):
48834891
if isinstance(response, ResultMessage):
48844892
if response.kind == RESULT_KIND_SET_KEYSPACE:
48854893
session = getattr(self, 'session', None)
4894+
if connection is not None:
4895+
connection.keyspace = response.new_keyspace
48864896
# since we're running on the event loop thread, we need to
48874897
# use a non-blocking method for setting the keyspace on
48884898
# all connections in this session, otherwise the event

tests/unit/test_cluster.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException, ProtocolVersion
2424
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \
2525
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT
26+
from cassandra.connection import ConnectionException
2627
from cassandra.pool import Host
2728
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy
2829
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory
@@ -343,6 +344,32 @@ def test_set_keyspace_escapes_quotes(self, *_):
343344
assert query == 'USE simple_ks', (
344345
"Simple keyspace names should not be quoted, got: %r" % query)
345346

347+
@mock_session_pools
348+
def test_set_keyspace_for_all_pools_reports_all_errors(self, *_):
349+
cluster = Cluster()
350+
session = Session(
351+
cluster,
352+
[Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())],
353+
)
354+
355+
pool1 = Mock(host='host1')
356+
pool2 = Mock(host='host2')
357+
keyspace_error = ConnectionException("boom")
358+
359+
pool1._set_keyspace_for_all_conns.side_effect = (
360+
lambda keyspace, callback: callback(pool1, [keyspace_error])
361+
)
362+
pool2._set_keyspace_for_all_conns.side_effect = (
363+
lambda keyspace, callback: callback(pool2, [])
364+
)
365+
session._pools = {'host1': pool1, 'host2': pool2}
366+
367+
callback = Mock()
368+
session._set_keyspace_for_all_pools('ks', callback)
369+
370+
callback.assert_called_once()
371+
assert callback.call_args.args[0] == {'host1': [keyspace_error]}
372+
346373
class ProtocolVersionTests(unittest.TestCase):
347374

348375
def test_protocol_downgrade_test(self):

tests/unit/test_response_future.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,34 @@ def test_control_connection_fallback_disabled_by_default(self):
422422
with pytest.raises(NoHostAvailable):
423423
rf.result()
424424

425+
def test_control_connection_fallback_updates_connection_keyspace(self):
426+
session = self.make_basic_session()
427+
session.cluster.allow_control_connection_query_fallback = True
428+
session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1']
429+
session._pools = {}
430+
431+
def set_keyspace_for_all_pools(keyspace, callback):
432+
session.keyspace = keyspace
433+
callback({})
434+
435+
session._set_keyspace_for_all_pools.side_effect = set_keyspace_for_all_pools
436+
437+
connection = self.make_control_connection()
438+
connection.keyspace = 'oldks'
439+
session.cluster.control_connection._connection = connection
440+
control_host = Mock(endpoint=connection.endpoint)
441+
session.cluster.get_control_connection_host.return_value = control_host
442+
443+
rf = self.make_response_future(session)
444+
assert rf.send_request()
445+
446+
result = Mock(spec=ResultMessage, kind=RESULT_KIND_SET_KEYSPACE, new_keyspace='newks')
447+
connection.send_msg.call_args[1]['cb'](result)
448+
449+
assert connection.keyspace == 'newks'
450+
assert session.keyspace == 'newks'
451+
assert rf.result().current_rows == []
452+
425453
def test_control_connection_fallback_when_no_usable_pools(self):
426454
session = self.make_basic_session()
427455
session.cluster.allow_control_connection_query_fallback = True

0 commit comments

Comments
 (0)