Skip to content

Commit 5a24812

Browse files
committed
Fall back to control connection when host pools are empty
When all hosts are marked IGNORED by the load-balancing policy (e.g. WhiteListRoundRobinPolicy with a NAT address not known to the cluster), no connection pools are created. Instead of raising NoHostAvailable on Session.connect(), log a warning and fall back to executing queries on the already-established control connection. Fixes: #720
1 parent 9de3793 commit 5a24812

3 files changed

Lines changed: 274 additions & 6 deletions

File tree

cassandra/cluster.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2589,10 +2589,7 @@ def __init__(self, cluster, hosts, keyspace=None):
25892589
futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
25902590

25912591
if not any(f.result() for f in self._initial_connect_futures):
2592-
msg = "Unable to connect to any servers"
2593-
if self.keyspace:
2594-
msg += " using keyspace '%s'" % self.keyspace
2595-
raise NoHostAvailable(msg, [h.address for h in hosts])
2592+
log.warning("No host pools available; queries will use the control connection")
25962593

25972594
self.session_id = uuid.uuid4()
25982595

@@ -4495,6 +4492,13 @@ def send_request(self, error_no_hosts=True):
44954492
if self.timeout is not None and time.time() - self._start_time > self.timeout:
44964493
self._on_timeout()
44974494
return True
4495+
# Fallback: try control connection when no pools are available
4496+
if not self.session._pools:
4497+
req_id = self._query_control_connection()
4498+
if req_id is not None:
4499+
self._req_id = req_id
4500+
return True
4501+
44984502
if error_no_hosts:
44994503
self._set_final_exception(NoHostAvailable(
45004504
"Unable to complete the operation against any hosts", self._errors))
@@ -4549,6 +4553,43 @@ def _query(self, host, message=None, cb=None):
45494553

45504554
return None
45514555

4556+
def _query_control_connection(self):
4557+
"""
4558+
Fallback: execute a query on the control connection when no host
4559+
pools are available (e.g. all hosts are IGNORED by the load-balancing
4560+
policy).
4561+
"""
4562+
conn = self.session.cluster.control_connection._connection
4563+
if not conn or conn.is_closed or conn.is_defunct:
4564+
return None
4565+
4566+
message = self.message
4567+
4568+
try:
4569+
with conn.lock:
4570+
if conn.in_flight >= conn.max_request_id:
4571+
return None
4572+
conn.in_flight += 1
4573+
request_id = conn.get_request_id()
4574+
4575+
def cb(response):
4576+
with conn.lock:
4577+
conn.in_flight -= 1
4578+
self._set_result(None, conn, None, response)
4579+
4580+
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
4581+
self.request_encoded_size = conn.send_msg(
4582+
message, request_id, cb=cb,
4583+
encoder=self._protocol_handler.encode_message,
4584+
decoder=self._protocol_handler.decode_message,
4585+
result_metadata=result_meta)
4586+
return request_id
4587+
except Exception as exc:
4588+
with conn.lock:
4589+
conn.in_flight -= 1
4590+
self._errors['control_connection'] = exc
4591+
return None
4592+
45524593
@property
45534594
def has_more_pages(self):
45544595
"""

tests/integration/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ def cleanup_clusters():
1616
yield
1717

1818
if not os.environ.get('DISABLE_CLUSTER_CLEANUP'):
19-
for cluster_name in [CLUSTER_NAME, SINGLE_NODE_CLUSTER_NAME, MULTIDC_CLUSTER_NAME,
20-
'shared_aware', 'sni_proxy', 'test_ip_change']:
19+
for cluster_name in [CLUSTER_NAME, SINGLE_NODE_CLUSTER_NAME, MULTIDC_CLUSTER_NAME, 'shared_aware', 'sni_proxy',
20+
'test_ip_change', 'test_public_addr']:
2121
try:
2222
cluster = CCMClusterFactory.load(ccm_path, cluster_name)
2323
logging.debug("Using external CCM cluster {0}".format(cluster.name))
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Reproducer for https://github.com/scylladb/python-driver/issues/720
17+
18+
A 3-node CCM cluster with broadcast_rpc_address set to 127.0.1.{1,2,3}
19+
(different from the internal rpc_address 127.0.0.{1,2,3}).
20+
21+
Two sets of TCP proxies:
22+
- 127.0.1.{1,2,3}:9042 → 127.0.0.{1,2,3}:9042 (advertised via broadcast_rpc_address)
23+
- 127.0.2.{1,2,3}:9042 → 127.0.0.{1,2,3}:9042 (NOT advertised — simulates cloud NAT)
24+
"""
25+
26+
import logging
27+
import os
28+
import select
29+
import socket
30+
import threading
31+
import unittest
32+
33+
from cassandra.cluster import Cluster
34+
from cassandra.policies import WhiteListRoundRobinPolicy
35+
36+
from tests.integration import (
37+
use_cluster, get_cluster, local,
38+
default_protocol_version, wait_for_node_socket,
39+
)
40+
41+
LOGGER = logging.getLogger(__name__)
42+
43+
CLUSTER_NAME = 'test_public_addr'
44+
PROXY_PORT = 9042
45+
46+
47+
class TCPProxy:
48+
"""
49+
A minimal TCP proxy that forwards connections from a listen address
50+
to a target address. Runs in a background thread.
51+
"""
52+
53+
def __init__(self, listen_host, listen_port, target_host, target_port):
54+
self.listen_host = listen_host
55+
self.listen_port = listen_port
56+
self.target_host = target_host
57+
self.target_port = target_port
58+
self._server_sock = None
59+
self._thread = None
60+
self._stop_event = threading.Event()
61+
62+
def start(self):
63+
self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64+
self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
65+
self._server_sock.settimeout(1.0)
66+
self._server_sock.bind((self.listen_host, self.listen_port))
67+
self._server_sock.listen(32)
68+
self._stop_event.clear()
69+
self._thread = threading.Thread(target=self._accept_loop, daemon=True)
70+
self._thread.start()
71+
LOGGER.debug("TCP proxy %s:%d -> %s:%d started",
72+
self.listen_host, self.listen_port,
73+
self.target_host, self.target_port)
74+
75+
def stop(self):
76+
self._stop_event.set()
77+
if self._thread:
78+
self._thread.join(timeout=5)
79+
if self._server_sock:
80+
self._server_sock.close()
81+
82+
def _accept_loop(self):
83+
while not self._stop_event.is_set():
84+
try:
85+
client, _ = self._server_sock.accept()
86+
except socket.timeout:
87+
continue
88+
except OSError:
89+
break
90+
t = threading.Thread(target=self._relay, args=(client,), daemon=True)
91+
t.start()
92+
93+
def _relay(self, client):
94+
target = None
95+
try:
96+
target = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
97+
target.connect((self.target_host, self.target_port))
98+
socks = [client, target]
99+
while not self._stop_event.is_set():
100+
readable, _, errored = select.select(socks, [], socks, 1.0)
101+
if errored:
102+
break
103+
for s in readable:
104+
data = s.recv(65536)
105+
if not data:
106+
return
107+
other = target if s is client else client
108+
other.sendall(data)
109+
except OSError:
110+
pass
111+
finally:
112+
client.close()
113+
if target:
114+
target.close()
115+
116+
117+
_proxies = []
118+
119+
120+
def setup_module():
121+
os.environ['SCYLLA_EXT_OPTS'] = os.environ.get('SCYLLA_EXT_OPTS', '') + ' --smp 1 --memory 512M'
122+
123+
use_cluster(CLUSTER_NAME, [3], start=False, set_keyspace=False)
124+
125+
ccm_cluster = get_cluster()
126+
for i in range(1, 4):
127+
node = ccm_cluster.nodes[f'node{i}']
128+
node.set_configuration_options(values={
129+
'broadcast_rpc_address': f'127.0.1.{i}',
130+
})
131+
132+
ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
133+
for node in ccm_cluster.nodes.values():
134+
wait_for_node_socket(node, 120)
135+
136+
# Advertised proxies: 127.0.1.x (matches broadcast_rpc_address)
137+
for i in range(1, 4):
138+
p = TCPProxy(f'127.0.1.{i}', PROXY_PORT, f'127.0.0.{i}', PROXY_PORT)
139+
p.start()
140+
_proxies.append(p)
141+
142+
# Unadvertised proxies: 127.0.2.x (simulates cloud NAT, unknown to nodes)
143+
for i in range(1, 4):
144+
p = TCPProxy(f'127.0.2.{i}', PROXY_PORT, f'127.0.0.{i}', PROXY_PORT)
145+
p.start()
146+
_proxies.append(p)
147+
148+
149+
def teardown_module():
150+
for p in _proxies:
151+
p.stop()
152+
_proxies.clear()
153+
154+
155+
@local
156+
class TestPublicAddress(unittest.TestCase):
157+
158+
def test_connect_via_single_broadcast_address_with_whitelist(self):
159+
"""
160+
Connect via advertised broadcast_rpc_address (127.0.1.1).
161+
system.local returns rpc_address=127.0.1.1, so the whitelist
162+
accepts it.
163+
"""
164+
proxy_address = '127.0.1.1'
165+
policy = WhiteListRoundRobinPolicy([proxy_address])
166+
cluster = Cluster(
167+
contact_points=[proxy_address],
168+
load_balancing_policy=policy,
169+
protocol_version=default_protocol_version,
170+
)
171+
try:
172+
session = cluster.connect()
173+
result = session.execute("SELECT * FROM system.local WHERE key='local'")
174+
assert result.one() is not None
175+
finally:
176+
cluster.shutdown()
177+
178+
def test_connect_via_all_broadcast_addresses_with_whitelist(self):
179+
"""
180+
Connect via all advertised broadcast_rpc_addresses (127.0.1.{1,2,3}).
181+
"""
182+
proxy_addresses = [f'127.0.1.{i}' for i in range(1, 4)]
183+
policy = WhiteListRoundRobinPolicy(proxy_addresses)
184+
cluster = Cluster(
185+
contact_points=proxy_addresses,
186+
load_balancing_policy=policy,
187+
protocol_version=default_protocol_version,
188+
)
189+
try:
190+
session = cluster.connect(wait_for_all_pools=True)
191+
192+
host_addresses = {h.broadcast_rpc_address for h in cluster.metadata.all_hosts()}
193+
assert set(proxy_addresses) == host_addresses, \
194+
f"Expected {set(proxy_addresses)}, got {host_addresses}"
195+
196+
result = session.execute("SELECT * FROM system.local WHERE key='local'")
197+
assert result.one() is not None
198+
finally:
199+
cluster.shutdown()
200+
201+
def test_connect_via_unadvertised_nat_address_with_whitelist(self):
202+
"""
203+
Reproducer for the exact scenario in issue #720.
204+
205+
Connect via unadvertised NAT proxy (127.0.2.1) with
206+
WhiteListRoundRobinPolicy(['127.0.2.1']). The node has
207+
broadcast_rpc_address=127.0.1.1, so system.local returns
208+
rpc_address=127.0.1.1 — NOT 127.0.2.1 that we connected to.
209+
210+
The driver must preserve the original contact point endpoint
211+
(127.0.2.1) so the whitelist accepts it. Without the fix, the
212+
driver replaces it with 127.0.1.1 from system.local and the
213+
whitelist rejects it → NoHostAvailable.
214+
"""
215+
nat_address = '127.0.2.1'
216+
policy = WhiteListRoundRobinPolicy([nat_address])
217+
cluster = Cluster(
218+
contact_points=[nat_address],
219+
load_balancing_policy=policy,
220+
protocol_version=default_protocol_version,
221+
)
222+
try:
223+
session = cluster.connect()
224+
result = session.execute("SELECT * FROM system.local WHERE key='local'")
225+
assert result.one() is not None
226+
finally:
227+
cluster.shutdown()

0 commit comments

Comments
 (0)