Skip to content

Commit 0d74e74

Browse files
thodson-usgsclaude
andcommitted
Split get_ogc_data into phase helpers
Pull the chunked fan-out, frame combining, and metadata aggregation out of ``get_ogc_data`` into four private helpers so the top-level function reads as a short recipe rather than a 70-line procedure. Behaviour is unchanged (all 32 PR-related tests still pass); each helper docstring captures the non-obvious *why* of its phase: - ``_plan_filter_chunks`` decide how to fan out - ``_fetch_chunks`` one request per chunk, pure I/O loop - ``_combine_chunk_frames`` concat, drop empties to preserve GeoDataFrame type, dedup by feature id - ``_aggregate_response_metadata`` first response + summed elapsed The top-of-``get_ogc_data`` arg normalization stays inline — it's short and has a subtle ordering requirement (capture ``properties`` before the id-switch) that extraction would hide. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 443c875 commit 0d74e74

1 file changed

Lines changed: 76 additions & 48 deletions

File tree

dataretrieval/waterdata/utils.py

Lines changed: 76 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -984,79 +984,107 @@ def get_ogc_data(
984984
- Applies column cleanup and reordering based on service and properties.
985985
"""
986986
args = args.copy()
987-
# Add service as an argument
988987
args["service"] = service
989-
# Switch the input id to "id" if needed
990988
args = _switch_arg_id(args, id_name=output_id, service=service)
989+
# Capture `properties` before the id-switch so post-processing sees
990+
# the user-facing names, not the wire-format ones.
991991
properties = args.get("properties")
992-
# Switch properties id to "id" if needed
993992
args["properties"] = _switch_properties_id(
994993
properties, id_name=output_id, service=service
995994
)
996995
convert_type = args.pop("convert_type", False)
997-
# Create fresh dictionary of args without any None values
998996
args = {k: v for k, v in args.items() if v is not None}
999-
# Only cql-text filters can be safely chunked by splitting top-level OR
1000-
# chains. For cql-json (or unknown languages), pass through unchanged.
1001-
# Overlapping user OR-clauses are deduplicated by feature id further below.
997+
998+
chunks = _plan_filter_chunks(args)
999+
frames, responses = _fetch_chunks(args, chunks)
1000+
1001+
return_list = _combine_chunk_frames(frames)
1002+
return_list = _deal_with_empty(return_list, properties, service)
1003+
if convert_type:
1004+
return_list = _type_cols(return_list)
1005+
return_list = _arrange_cols(return_list, properties, output_id)
1006+
return_list = _sort_rows(return_list)
1007+
1008+
return return_list, _aggregate_response_metadata(responses)
1009+
1010+
1011+
def _plan_filter_chunks(args: dict[str, Any]) -> list[str | None]:
1012+
"""Decide how to fan ``args["filter"]`` out across HTTP calls.
1013+
1014+
Returns one entry per request to send. A ``None`` entry means "send
1015+
``args`` as-is" — either there is no filter, or the filter language
1016+
is not one we can safely split (only cql-text top-level ``OR``
1017+
chains are chunkable). Otherwise each string entry is a chunked
1018+
cql-text expression that replaces ``args["filter"]`` for its
1019+
sub-request. Overlapping user OR-clauses are deduplicated by feature
1020+
id later in ``_combine_chunk_frames``.
1021+
"""
10021022
filter_expr = args.get("filter")
10031023
filter_lang = args.get("filter_lang")
1004-
should_chunk_filter = (
1024+
chunkable = (
10051025
isinstance(filter_expr, str)
10061026
and filter_expr
10071027
and filter_lang in {None, "cql-text"}
10081028
)
1009-
if should_chunk_filter:
1010-
raw_budget = _effective_filter_budget(args, filter_expr)
1011-
filter_chunks = _chunk_cql_or(filter_expr, max_len=raw_budget)
1012-
else:
1013-
filter_chunks = [None]
1014-
1015-
frames = []
1016-
responses = []
1017-
for chunk in filter_chunks:
1029+
if not chunkable:
1030+
return [None]
1031+
raw_budget = _effective_filter_budget(args, filter_expr)
1032+
return _chunk_cql_or(filter_expr, max_len=raw_budget)
1033+
1034+
1035+
def _fetch_chunks(
1036+
args: dict[str, Any], chunks: list[str | None]
1037+
) -> tuple[list[pd.DataFrame], list[requests.Response]]:
1038+
"""Send one request per chunk; return the per-chunk frames and responses."""
1039+
frames: list[pd.DataFrame] = []
1040+
responses: list[requests.Response] = []
1041+
for chunk in chunks:
10181042
chunk_args = args if chunk is None else {**args, "filter": chunk}
10191043
req = _construct_api_requests(**chunk_args)
1020-
chunk_df, chunk_response = _walk_pages(geopd=GEOPANDAS, req=req)
1021-
frames.append(chunk_df)
1022-
responses.append(chunk_response)
1023-
1024-
# Drop empty frames before concat — `_get_resp_data` returns a plain
1025-
# ``pd.DataFrame()`` on empty responses, which can downgrade a concat
1026-
# of real GeoDataFrames back to a plain DataFrame (losing geometry/
1027-
# CRS). Empty frames contribute no rows, so discarding them is safe.
1028-
non_empty = [f for f in frames if not f.empty]
1029-
if not non_empty:
1030-
return_list = pd.DataFrame()
1031-
elif len(non_empty) == 1:
1032-
return_list = non_empty[0]
1033-
else:
1034-
return_list = pd.concat(non_empty, ignore_index=True)
1035-
# The top-level feature "id" is always present at this stage (the
1036-
# rename to output_id happens later in _arrange_cols), so dedup on
1037-
# it directly to catch overlapping OR-clauses across chunks.
1038-
if "id" in return_list.columns:
1039-
return_list = return_list.drop_duplicates(subset="id", ignore_index=True)
1040-
# Manage some aspects of the returned dataset
1041-
return_list = _deal_with_empty(return_list, properties, service)
1044+
frame, response = _walk_pages(geopd=GEOPANDAS, req=req)
1045+
frames.append(frame)
1046+
responses.append(response)
1047+
return frames, responses
10421048

1043-
if convert_type:
1044-
return_list = _type_cols(return_list)
10451049

1046-
return_list = _arrange_cols(return_list, properties, output_id)
1050+
def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
1051+
"""Concatenate per-chunk frames, handling the edge cases.
10471052
1048-
return_list = _sort_rows(return_list)
1049-
# Use the first response for URL/headers; when the filter was chunked,
1050-
# aggregate elapsed time across all chunks so ``query_time`` reflects
1051-
# the full operation rather than just the first sub-request.
1053+
Drops empty frames before concat — ``_get_resp_data`` returns a
1054+
plain ``pd.DataFrame()`` on empty responses, which would downgrade
1055+
a concat of real GeoDataFrames back to a plain DataFrame and strip
1056+
geometry/CRS. Also dedups on the pre-rename feature ``id`` so
1057+
overlapping user-supplied OR-clauses don't produce duplicate rows
1058+
across chunks.
1059+
"""
1060+
non_empty = [f for f in frames if not f.empty]
1061+
if not non_empty:
1062+
return pd.DataFrame()
1063+
if len(non_empty) == 1:
1064+
return non_empty[0]
1065+
combined = pd.concat(non_empty, ignore_index=True)
1066+
if "id" in combined.columns:
1067+
combined = combined.drop_duplicates(subset="id", ignore_index=True)
1068+
return combined
1069+
1070+
1071+
def _aggregate_response_metadata(
1072+
responses: list[requests.Response],
1073+
) -> BaseMetadata:
1074+
"""Build metadata from the first response, summing elapsed across chunks.
1075+
1076+
The first response's URL and headers are the representative ones to
1077+
return. When the filter was fanned across multiple chunks, replace
1078+
its elapsed with the sum so ``query_time`` reflects the full
1079+
operation rather than just the first sub-request.
1080+
"""
10521081
metadata_response = responses[0]
10531082
if len(responses) > 1:
10541083
metadata_response.elapsed = sum(
10551084
(r.elapsed for r in responses[1:]),
10561085
start=metadata_response.elapsed,
10571086
)
1058-
metadata = BaseMetadata(metadata_response)
1059-
return return_list, metadata
1087+
return BaseMetadata(metadata_response)
10601088

10611089

10621090
def _handle_stats_nesting(

0 commit comments

Comments
 (0)