Skip to content

Commit 8a6c55c

Browse files
Copilotmykaul
andcommitted
Refactor paging helper to standalone function per review feedback
- Convert _fetch_all_pages from instance method to module-level function _fetch_remaining_pages - Function no longer takes 'result' parameter - caller fetches first page - This preserves parallel execution of initial queries via wait_for_responses - Add parameter annotations in docstring - Update all call sites in ControlConnection - Update test to import and use the standalone function Addresses @dkropachev's review feedback to make it a standalone function. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
1 parent 4a715a3 commit 8a6c55c

2 files changed

Lines changed: 46 additions & 37 deletions

File tree

cassandra/cluster.py

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3441,6 +3441,41 @@ def _clear_watcher(conn, expiring_weakref):
34413441
pass
34423442

34433443

3444+
def _fetch_remaining_pages(connection, result, query_msg, timeout):
3445+
"""
3446+
Fetch remaining pages for a paged query result that already has the first page.
3447+
3448+
:param connection: The connection to use for querying
3449+
:param result: The initial result from the first page (must have paging_state if there are more pages)
3450+
:param query_msg: The QueryMessage used for the initial query (will be reused with paging_state)
3451+
:param timeout: Timeout for each query operation
3452+
:return: The result with all parsed_rows combined from all pages
3453+
"""
3454+
if not result or not result.paging_state:
3455+
return result
3456+
3457+
all_rows = list(result.parsed_rows) if result.parsed_rows else []
3458+
3459+
# Save original paging_state to restore later
3460+
original_paging_state = query_msg.paging_state
3461+
3462+
try:
3463+
# Fetch remaining pages
3464+
while result and result.paging_state:
3465+
query_msg.paging_state = result.paging_state
3466+
result = connection.wait_for_response(query_msg, timeout=timeout)
3467+
if result and result.parsed_rows:
3468+
all_rows.extend(result.parsed_rows)
3469+
3470+
# Update the result with all rows
3471+
if result:
3472+
result.parsed_rows = all_rows
3473+
return result
3474+
finally:
3475+
# Restore original paging_state to prevent affecting subsequent uses of this QueryMessage
3476+
query_msg.paging_state = original_paging_state
3477+
3478+
34443479
class ControlConnection(object):
34453480
"""
34463481
Internal
@@ -3662,8 +3697,8 @@ def _try_connect(self, host):
36623697
# Fetch all pages if there are more results
36633698
# Note: system.local always has exactly 1 row, so it will never have additional pages
36643699
# system.peers might have multiple pages for very large clusters (>1000 nodes)
3665-
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, self._timeout)
3666-
local_result = self._fetch_all_pages(connection, local_result, local_query, self._timeout)
3700+
peers_result = _fetch_remaining_pages(connection, peers_result, peers_query, self._timeout)
3701+
local_result = _fetch_remaining_pages(connection, local_result, local_query, self._timeout)
36673702

36683703
shared_results = (peers_result, local_result)
36693704
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
@@ -3725,34 +3760,6 @@ def _submit(self, *args, **kwargs):
37253760
pass
37263761
return None
37273762

3728-
def _fetch_all_pages(self, connection, result, query_msg, timeout):
3729-
"""
3730-
Fetch all pages for a paged query result.
3731-
Returns the result with all parsed_rows combined from all pages.
3732-
"""
3733-
if not result or not result.paging_state:
3734-
return result
3735-
3736-
all_rows = list(result.parsed_rows) if result.parsed_rows else []
3737-
3738-
# Save original paging_state to restore later
3739-
original_paging_state = query_msg.paging_state
3740-
3741-
try:
3742-
while result and result.paging_state:
3743-
query_msg.paging_state = result.paging_state
3744-
result = connection.wait_for_response(query_msg, timeout=timeout)
3745-
if result and result.parsed_rows:
3746-
all_rows.extend(result.parsed_rows)
3747-
3748-
# Update the result with all rows
3749-
if result:
3750-
result.parsed_rows = all_rows
3751-
return result
3752-
finally:
3753-
# Restore original paging_state to prevent affecting subsequent uses of this QueryMessage
3754-
query_msg.paging_state = original_paging_state
3755-
37563763
def shutdown(self):
37573764
# stop trying to reconnect (if we are)
37583765
with self._reconnection_lock:
@@ -3845,8 +3852,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38453852
# Fetch all pages if there are more results
38463853
# Note: system.local always has exactly 1 row, so it will never have additional pages
38473854
# system.peers might have multiple pages for very large clusters (>1000 nodes)
3848-
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, self._timeout)
3849-
local_result = self._fetch_all_pages(connection, local_result, local_query, self._timeout)
3855+
peers_result = _fetch_remaining_pages(connection, peers_result, peers_query, self._timeout)
3856+
local_result = _fetch_remaining_pages(connection, local_result, local_query, self._timeout)
38503857

38513858
peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)
38523859

@@ -3907,7 +3914,7 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39073914
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
39083915
if success:
39093916
# Fetch all pages for consistency (system.local table always contains exactly one row, so this is effectively a no-op)
3910-
local_rpc_address_result = self._fetch_all_pages(connection, local_rpc_address_result,
3917+
local_rpc_address_result = _fetch_remaining_pages(connection, local_rpc_address_result,
39113918
local_rpc_address_query, self._timeout)
39123919
row = dict_factory(
39133920
local_rpc_address_result.column_names,
@@ -4154,8 +4161,8 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
41544161
# Fetch all pages if there are more results
41554162
# Note: system.local always has exactly 1 row, so it will never have additional pages
41564163
# system.peers might have multiple pages for very large clusters (>1000 nodes)
4157-
peers_result = self._fetch_all_pages(connection, peers_result, peers_query, timeout)
4158-
local_result = self._fetch_all_pages(connection, local_result, local_query, timeout)
4164+
peers_result = _fetch_remaining_pages(connection, peers_result, peers_query, timeout)
4165+
local_result = _fetch_remaining_pages(connection, local_result, local_query, timeout)
41594166
except OperationTimedOut as timeout:
41604167
log.debug("[control connection] Timed out waiting for "
41614168
"response during schema agreement check: %s", timeout)

tests/unit/test_control_connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ def test_topology_queries_fetch_all_pages(self):
327327
"""
328328
Test that topology queries fetch all pages when results are paged
329329
"""
330+
from cassandra.cluster import _fetch_remaining_pages
331+
330332
# Create mock connection
331333
mock_connection = MagicMock()
332334
mock_connection.endpoint = DefaultEndPoint("192.168.1.0")
@@ -354,13 +356,13 @@ def test_topology_queries_fetch_all_pages(self):
354356
mock_connection.wait_for_responses.return_value = (first_page, local_result)
355357
mock_connection.wait_for_response.return_value = second_page
356358

357-
# Test _fetch_all_pages
359+
# Test _fetch_remaining_pages
358360
self.control_connection._connection = mock_connection
359361
query_msg = QueryMessage(query="SELECT * FROM system.peers",
360362
consistency_level=ConsistencyLevel.ONE,
361363
fetch_size=self.control_connection._schema_meta_page_size)
362364

363-
result = self.control_connection._fetch_all_pages(mock_connection, first_page, query_msg, timeout=5)
365+
result = _fetch_remaining_pages(mock_connection, first_page, query_msg, timeout=5)
364366

365367
# Verify that both pages were fetched
366368
assert len(result.parsed_rows) == 2

0 commit comments

Comments
 (0)