Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3963,9 +3963,8 @@

if "tokens" in row and not row.get("tokens"):
log.debug(
"Found a zero-token node - tokens is None (broadcast_rpc: %s, host_id: %s). Ignoring host." %
"Found a zero-token node - tokens is None (broadcast_rpc: %s, host_id: %s). Adding host without tokens." %
(broadcast_rpc, host_id))
return False

return True

Expand Down Expand Up @@ -4340,7 +4339,7 @@
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._log_if_failed)

Check failure on line 4342 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.11)

cannot schedule new futures after shutdown

Check failure on line 4342 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.11)

cannot schedule new futures after shutdown
else:
self._queue.put_nowait((run_at, i, task))
break
Expand Down
74 changes: 71 additions & 3 deletions tests/unit/test_control_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def add_or_return_host(self, host):

def update_host(self, host, old_endpoint):
host, created = self.add_or_return_host(host)
self._host_id_by_endpoint[host.endpoint] = host.host_id
self._host_id_by_endpoint.pop(old_endpoint, False)
self._host_id_by_endpoint[host.endpoint] = host.host_id

def all_hosts_items(self):
return list(self.hosts.items())
Expand Down Expand Up @@ -206,6 +206,15 @@ def setUp(self):
self.control_connection._connection = self.connection
self.control_connection._time = self.time

def _assert_zero_token_host_without_token_map_entry(self, endpoint, host_id):
zero_token_host = self.cluster.metadata.get_host(endpoint)
assert zero_token_host is not None
assert zero_token_host.host_id == host_id
assert zero_token_host.datacenter == "dc1"
assert zero_token_host.rack == "rack1"
assert zero_token_host not in self.cluster.metadata.token_map
return zero_token_host

def test_wait_for_schema_agreement(self):
"""
Basic test with all schema versions agreeing
Expand Down Expand Up @@ -321,7 +330,6 @@ def refresh_and_validate_added_hosts():
[None, None, "a", "dc1", "rack1", ["1", "101", "201"], 'uuid1'],
["192.168.1.7", "10.0.0.1", "a", None, "rack1", ["1", "101", "201"], 'uuid2'],
["192.168.1.6", "10.0.0.1", "a", "dc1", None, ["1", "101", "201"], 'uuid3'],
["192.168.1.5", "10.0.0.1", "a", "dc1", "rack1", None, 'uuid4'],
["192.168.1.4", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], None]]])
refresh_and_validate_added_hosts()

Expand All @@ -335,7 +343,6 @@ def refresh_and_validate_added_hosts():
[None, 9042, None, 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"],
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", None, "rack1", ["2", "102", "202"], "uuid2"],
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", None, ["2", "102", "202"], "uuid2"],
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", None, "uuid2"],
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], None]]])
refresh_and_validate_added_hosts()

Expand Down Expand Up @@ -411,6 +418,44 @@ def test_refresh_nodes_and_tokens_add_host(self):
assert self.cluster.added_hosts[0].rack == "rack1"
assert self.cluster.added_hosts[0].host_id == "uuid4"

def test_refresh_nodes_and_tokens_adds_zero_token_host_without_token_map_entry(self):
# Zero-token nodes are valid topology members, but they do not own token ranges.
self.connection.peer_results[1].append(
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", None, "uuid4"]
)
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)

self.control_connection.refresh_node_list_and_token_map()

zero_token_host = self._assert_zero_token_host_without_token_map_entry(
DefaultEndPoint("192.168.1.3"), "uuid4")
assert 1 == len(self.cluster.added_hosts)
assert self.cluster.added_hosts[0] is zero_token_host
assert [] == self.cluster.metadata.removed_hosts

def test_refresh_nodes_and_tokens_adds_empty_token_host_without_token_map_entry(self):
self.connection.peer_results[1].append(
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", [], "uuid4"]
)
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)

self.control_connection.refresh_node_list_and_token_map()

zero_token_host = self._assert_zero_token_host_without_token_map_entry(
DefaultEndPoint("192.168.1.3"), "uuid4")
assert 1 == len(self.cluster.added_hosts)
assert self.cluster.added_hosts[0] is zero_token_host

def test_refresh_nodes_and_tokens_keeps_zero_token_local_host_without_token_map_entry(self):
self.connection.local_results[1][0][7] = None

self.control_connection.refresh_node_list_and_token_map()

self._assert_zero_token_host_without_token_map_entry(
DefaultEndPoint("192.168.1.0"), "uuid1")
assert [] == self.cluster.added_hosts
assert [] == self.cluster.metadata.removed_hosts

def test_refresh_nodes_and_tokens_remove_host(self):
del self.connection.peer_results[1][1]
self.control_connection.refresh_node_list_and_token_map()
Expand Down Expand Up @@ -589,6 +634,29 @@ def test_refresh_nodes_and_tokens_add_host_detects_port(self):
assert self.cluster.added_hosts[0].datacenter == "dc1"
assert self.cluster.added_hosts[0].rack == "rack1"

def test_refresh_nodes_and_tokens_adds_zero_token_host_from_peers_v2_without_token_map_entry(self):
del self.connection.peer_results[:]
self.connection.peer_results.extend(self.connection.peer_results_v2)
self.connection.peer_results[1].append(
["192.168.1.3", 555, "10.0.0.3", 666, "a", "dc1", "rack1", None, "uuid4"]
)
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
self.connection.local_results, self.connection.peer_results))
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)

self.control_connection.refresh_node_list_and_token_map()

zero_token_host = self._assert_zero_token_host_without_token_map_entry(
DefaultEndPoint("192.168.1.3", 555), "uuid4")
assert 1 == len(self.cluster.added_hosts)
assert self.cluster.added_hosts[0] is zero_token_host
assert zero_token_host.endpoint.port == 555
assert zero_token_host.broadcast_rpc_address == "192.168.1.3"
assert zero_token_host.broadcast_rpc_port == 555
assert zero_token_host.broadcast_address == "10.0.0.3"
assert zero_token_host.broadcast_port == 666
assert [] == self.cluster.metadata.removed_hosts

def test_refresh_nodes_and_tokens_add_host_detects_invalid_port(self):
del self.connection.peer_results[:]
self.connection.peer_results.extend(self.connection.peer_results_v2)
Expand Down
Loading