Skip to content

Commit f348637

Browse files
committed
Fix test_idle_heartbeat: wait for shard connections and fix idle assertion
Two issues caused this test to be flaky: 1. wait_for_all_pools only waits for the first connection per host. Shard-aware connections to remaining shards are opened asynchronously. When these complete during the test's sleep interval, they replace existing connections causing KeyError on the request_ids snapshot. Fix: add a helper that polls until all shard connections are established, called after connect(). 2. execute_concurrent sent only len(hosts) queries, but with shard-aware routing each query hits one specific shard, leaving other shards' connections idle. The assertion that ALL connections are non-idle then fails. Fix: send more queries (2x num_connections) and relax assertion to check that at least some non-control connections became non-idle.
1 parent 454f727 commit f348637

1 file changed

Lines changed: 32 additions & 8 deletions

File tree

tests/integration/standard/test_cluster.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -725,12 +725,34 @@ def _warning_are_issued_when_auth(self, auth_provider):
725725
assert auth_warning >= 4
726726
assert auth_warning == mock_handler.get_message_count("debug", "Got ReadyMessage on new connection")
727727

728+
def _wait_for_all_shard_connections(self, cluster, timeout=30):
729+
"""Wait until all shard-aware connections are fully established."""
730+
from cassandra.pool import HostConnection
731+
deadline = time.time() + timeout
732+
while time.time() < deadline:
733+
all_connected = True
734+
for holder in cluster.get_connection_holders():
735+
if not isinstance(holder, HostConnection):
736+
continue
737+
if holder.host.sharding_info and len(holder._connections) < holder.host.sharding_info.shards_count:
738+
all_connected = False
739+
break
740+
if all_connected:
741+
return
742+
time.sleep(0.1)
743+
raise RuntimeError("Timed out waiting for all shard connections to be established")
744+
728745
def test_idle_heartbeat(self):
729746
interval = 2
730747
cluster = TestCluster(idle_heartbeat_interval=interval,
731748
monitor_reporting_enabled=False)
732749
session = cluster.connect(wait_for_all_pools=True)
733750

751+
# wait_for_all_pools only waits for the first connection per host;
752+
# shard-aware connections to remaining shards are opened in background.
753+
# Wait for them to stabilize so they don't get replaced during the test.
754+
self._wait_for_all_shard_connections(cluster)
755+
734756
# This test relies on impl details of connection req id management to see if heartbeats
735757
# are being sent. May need update if impl is changed
736758
connection_request_ids = {}
@@ -746,11 +768,8 @@ def test_idle_heartbeat(self):
746768

747769
connections = [c for holders in cluster.get_connection_holders() for c in holders.get_connections()]
748770

749-
# make sure requests were sent on all connections that existed before the sleep
750-
# (shard-aware reconnection may replace connections during the sleep interval)
771+
# make sure requests were sent on all connections
751772
for c in connections:
752-
if id(c) not in connection_request_ids:
753-
continue
754773
expected_ids = connection_request_ids[id(c)]
755774
expected_ids.rotate(-1)
756775
with c.lock:
@@ -759,14 +778,19 @@ def test_idle_heartbeat(self):
759778
# assert idle status
760779
assert all(c.is_idle for c in connections)
761780

762-
# send messages on all connections
763-
statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * len(cluster.metadata.all_hosts())
781+
# send enough messages to ensure all connections are used
782+
# (with shard-aware routing, each query only hits one shard per host,
783+
# so we need more queries than just len(hosts) to cover all connections)
784+
num_connections = len([c for c in connections if not c.is_control_connection])
785+
statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * max(num_connections * 2, len(cluster.metadata.all_hosts()))
764786
results = execute_concurrent(session, statements_and_params)
765787
for success, result in results:
766788
assert success
767789

768-
# assert not idle status
769-
assert not any(c.is_idle if not c.is_control_connection else False for c in connections)
790+
# assert at least some non-control connections are no longer idle
791+
# (shard-aware routing may not distribute queries to every connection)
792+
non_idle = [c for c in connections if not c.is_control_connection and not c.is_idle]
793+
assert len(non_idle) > 0
770794

771795
# holders include session pools and cc
772796
holders = cluster.get_connection_holders()

0 commit comments

Comments
 (0)