|
| 1 | +# Copyright DataStax, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +""" |
| 16 | +Reproducer for https://github.com/scylladb/python-driver/issues/720 |
| 17 | +
|
| 18 | +A 3-node CCM cluster with broadcast_rpc_address set to 127.0.1.{1,2,3} |
| 19 | +(different from the internal rpc_address 127.0.0.{1,2,3}). |
| 20 | +
|
| 21 | +Two sets of TCP proxies: |
| 22 | + - 127.0.1.{1,2,3}:9042 → 127.0.0.{1,2,3}:9042 (advertised via broadcast_rpc_address) |
| 23 | + - 127.0.2.{1,2,3}:9042 → 127.0.0.{1,2,3}:9042 (NOT advertised — simulates cloud NAT) |
| 24 | +""" |
| 25 | + |
| 26 | +import logging |
| 27 | +import os |
| 28 | +import select |
| 29 | +import socket |
| 30 | +import threading |
| 31 | +import unittest |
| 32 | + |
| 33 | +from cassandra.cluster import Cluster |
| 34 | +from cassandra.policies import WhiteListRoundRobinPolicy |
| 35 | + |
| 36 | +from tests.integration import ( |
| 37 | + use_cluster, get_cluster, local, |
| 38 | + default_protocol_version, wait_for_node_socket, |
| 39 | +) |
| 40 | + |
| 41 | +LOGGER = logging.getLogger(__name__) |
| 42 | + |
| 43 | +CLUSTER_NAME = 'test_public_addr' |
| 44 | +PROXY_PORT = 9042 |
| 45 | + |
| 46 | + |
| 47 | +class TCPProxy: |
| 48 | + """ |
| 49 | + A minimal TCP proxy that forwards connections from a listen address |
| 50 | + to a target address. Runs in a background thread. |
| 51 | + """ |
| 52 | + |
| 53 | + def __init__(self, listen_host, listen_port, target_host, target_port): |
| 54 | + self.listen_host = listen_host |
| 55 | + self.listen_port = listen_port |
| 56 | + self.target_host = target_host |
| 57 | + self.target_port = target_port |
| 58 | + self._server_sock = None |
| 59 | + self._thread = None |
| 60 | + self._stop_event = threading.Event() |
| 61 | + |
| 62 | + def start(self): |
| 63 | + self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 64 | + self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 65 | + self._server_sock.settimeout(1.0) |
| 66 | + self._server_sock.bind((self.listen_host, self.listen_port)) |
| 67 | + self._server_sock.listen(32) |
| 68 | + self._stop_event.clear() |
| 69 | + self._thread = threading.Thread(target=self._accept_loop, daemon=True) |
| 70 | + self._thread.start() |
| 71 | + LOGGER.debug("TCP proxy %s:%d -> %s:%d started", |
| 72 | + self.listen_host, self.listen_port, |
| 73 | + self.target_host, self.target_port) |
| 74 | + |
| 75 | + def stop(self): |
| 76 | + self._stop_event.set() |
| 77 | + if self._thread: |
| 78 | + self._thread.join(timeout=5) |
| 79 | + if self._server_sock: |
| 80 | + self._server_sock.close() |
| 81 | + |
| 82 | + def _accept_loop(self): |
| 83 | + while not self._stop_event.is_set(): |
| 84 | + try: |
| 85 | + client, _ = self._server_sock.accept() |
| 86 | + except socket.timeout: |
| 87 | + continue |
| 88 | + except OSError: |
| 89 | + break |
| 90 | + t = threading.Thread(target=self._relay, args=(client,), daemon=True) |
| 91 | + t.start() |
| 92 | + |
| 93 | + def _relay(self, client): |
| 94 | + target = None |
| 95 | + try: |
| 96 | + target = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 97 | + target.connect((self.target_host, self.target_port)) |
| 98 | + socks = [client, target] |
| 99 | + while not self._stop_event.is_set(): |
| 100 | + readable, _, errored = select.select(socks, [], socks, 1.0) |
| 101 | + if errored: |
| 102 | + break |
| 103 | + for s in readable: |
| 104 | + data = s.recv(65536) |
| 105 | + if not data: |
| 106 | + return |
| 107 | + other = target if s is client else client |
| 108 | + other.sendall(data) |
| 109 | + except OSError: |
| 110 | + pass |
| 111 | + finally: |
| 112 | + client.close() |
| 113 | + if target: |
| 114 | + target.close() |
| 115 | + |
| 116 | + |
| 117 | +_proxies = [] |
| 118 | + |
| 119 | + |
| 120 | +def setup_module(): |
| 121 | + os.environ['SCYLLA_EXT_OPTS'] = os.environ.get('SCYLLA_EXT_OPTS', '') + ' --smp 1 --memory 512M' |
| 122 | + |
| 123 | + use_cluster(CLUSTER_NAME, [3], start=False, set_keyspace=False) |
| 124 | + |
| 125 | + ccm_cluster = get_cluster() |
| 126 | + for i in range(1, 4): |
| 127 | + node = ccm_cluster.nodes[f'node{i}'] |
| 128 | + node.set_configuration_options(values={ |
| 129 | + 'broadcast_rpc_address': f'127.0.1.{i}', |
| 130 | + }) |
| 131 | + |
| 132 | + ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True) |
| 133 | + for node in ccm_cluster.nodes.values(): |
| 134 | + wait_for_node_socket(node, 120) |
| 135 | + |
| 136 | + # Advertised proxies: 127.0.1.x (matches broadcast_rpc_address) |
| 137 | + for i in range(1, 4): |
| 138 | + p = TCPProxy(f'127.0.1.{i}', PROXY_PORT, f'127.0.0.{i}', PROXY_PORT) |
| 139 | + p.start() |
| 140 | + _proxies.append(p) |
| 141 | + |
| 142 | + # Unadvertised proxies: 127.0.2.x (simulates cloud NAT, unknown to nodes) |
| 143 | + for i in range(1, 4): |
| 144 | + p = TCPProxy(f'127.0.2.{i}', PROXY_PORT, f'127.0.0.{i}', PROXY_PORT) |
| 145 | + p.start() |
| 146 | + _proxies.append(p) |
| 147 | + |
| 148 | + |
| 149 | +def teardown_module(): |
| 150 | + for p in _proxies: |
| 151 | + p.stop() |
| 152 | + _proxies.clear() |
| 153 | + |
| 154 | + |
| 155 | +@local |
| 156 | +class TestPublicAddress(unittest.TestCase): |
| 157 | + |
| 158 | + def test_connect_via_single_broadcast_address_with_whitelist(self): |
| 159 | + """ |
| 160 | + Connect via advertised broadcast_rpc_address (127.0.1.1). |
| 161 | + system.local returns rpc_address=127.0.1.1, so the whitelist |
| 162 | + accepts it. |
| 163 | + """ |
| 164 | + proxy_address = '127.0.1.1' |
| 165 | + policy = WhiteListRoundRobinPolicy([proxy_address]) |
| 166 | + cluster = Cluster( |
| 167 | + contact_points=[proxy_address], |
| 168 | + load_balancing_policy=policy, |
| 169 | + protocol_version=default_protocol_version, |
| 170 | + ) |
| 171 | + try: |
| 172 | + session = cluster.connect() |
| 173 | + result = session.execute("SELECT * FROM system.local WHERE key='local'") |
| 174 | + assert result.one() is not None |
| 175 | + finally: |
| 176 | + cluster.shutdown() |
| 177 | + |
| 178 | + def test_connect_via_all_broadcast_addresses_with_whitelist(self): |
| 179 | + """ |
| 180 | + Connect via all advertised broadcast_rpc_addresses (127.0.1.{1,2,3}). |
| 181 | + """ |
| 182 | + proxy_addresses = [f'127.0.1.{i}' for i in range(1, 4)] |
| 183 | + policy = WhiteListRoundRobinPolicy(proxy_addresses) |
| 184 | + cluster = Cluster( |
| 185 | + contact_points=proxy_addresses, |
| 186 | + load_balancing_policy=policy, |
| 187 | + protocol_version=default_protocol_version, |
| 188 | + ) |
| 189 | + try: |
| 190 | + session = cluster.connect(wait_for_all_pools=True) |
| 191 | + |
| 192 | + host_addresses = {h.broadcast_rpc_address for h in cluster.metadata.all_hosts()} |
| 193 | + assert set(proxy_addresses) == host_addresses, \ |
| 194 | + f"Expected {set(proxy_addresses)}, got {host_addresses}" |
| 195 | + |
| 196 | + result = session.execute("SELECT * FROM system.local WHERE key='local'") |
| 197 | + assert result.one() is not None |
| 198 | + finally: |
| 199 | + cluster.shutdown() |
| 200 | + |
| 201 | + def test_connect_via_unadvertised_nat_address_with_whitelist(self): |
| 202 | + """ |
| 203 | + Reproducer for the exact scenario in issue #720. |
| 204 | +
|
| 205 | + Connect via unadvertised NAT proxy (127.0.2.1) with |
| 206 | + WhiteListRoundRobinPolicy(['127.0.2.1']). The node has |
| 207 | + broadcast_rpc_address=127.0.1.1, so system.local returns |
| 208 | + rpc_address=127.0.1.1 — NOT 127.0.2.1 that we connected to. |
| 209 | +
|
| 210 | + The driver must preserve the original contact point endpoint |
| 211 | + (127.0.2.1) so the whitelist accepts it. Without the fix, the |
| 212 | + driver replaces it with 127.0.1.1 from system.local and the |
| 213 | + whitelist rejects it → NoHostAvailable. |
| 214 | + """ |
| 215 | + nat_address = '127.0.2.1' |
| 216 | + policy = WhiteListRoundRobinPolicy([nat_address]) |
| 217 | + cluster = Cluster( |
| 218 | + contact_points=[nat_address], |
| 219 | + load_balancing_policy=policy, |
| 220 | + protocol_version=default_protocol_version, |
| 221 | + ) |
| 222 | + try: |
| 223 | + session = cluster.connect() |
| 224 | + result = session.execute("SELECT * FROM system.local WHERE key='local'") |
| 225 | + assert result.one() is not None |
| 226 | + finally: |
| 227 | + cluster.shutdown() |
0 commit comments