Skip to content

Commit 850f14d

Browse files
thodson-usgsclaude
andcommitted
Address Copilot review on chunked CQL filter
- Dedupe on pre-rename feature `id` (always present at that stage) instead of `output_id`, which is the post-rename name and may not be on every OGC service's response. - Aggregate elapsed time across chunk responses so the returned metadata's query_time reflects the whole operation rather than just the last chunk. - Drop the redundant `continuous_id` from the fan-out test's mock properties so the assertion exercises the real `id`-based dedup path, and add a separate test that forces cross-chunk duplicate feature ids to prove they collapse to a single row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d4774ac commit 850f14d

2 files changed

Lines changed: 73 additions & 13 deletions

File tree

dataretrieval/waterdata/utils.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -930,28 +930,35 @@ def get_ogc_data(
930930
convert_type = args.pop("convert_type", False)
931931
# Create fresh dictionary of args without any None values
932932
args = {k: v for k, v in args.items() if v is not None}
933-
# Overlapping user OR-clauses are deduplicated by output_id further below.
933+
# Overlapping user OR-clauses are deduplicated by feature id further below.
934934
filter_expr = args.get("filter")
935935
filter_chunks = (
936936
_chunk_cql_or(filter_expr) if isinstance(filter_expr, str) else [None]
937937
)
938938

939939
frames = []
940-
response = None
940+
first_response = None
941+
total_elapsed = None
941942
for chunk in filter_chunks:
942943
chunk_args = args if chunk is None else {**args, "filter": chunk}
943944
req = _construct_api_requests(**chunk_args)
944-
chunk_df, response = _walk_pages(geopd=GEOPANDAS, req=req)
945+
chunk_df, chunk_response = _walk_pages(geopd=GEOPANDAS, req=req)
945946
frames.append(chunk_df)
947+
if first_response is None:
948+
first_response = chunk_response
949+
total_elapsed = chunk_response.elapsed
950+
else:
951+
total_elapsed = total_elapsed + chunk_response.elapsed
946952

947953
if len(frames) == 1:
948954
return_list = frames[0]
949955
else:
950956
return_list = pd.concat(frames, ignore_index=True)
951-
if output_id in return_list.columns:
952-
return_list = return_list.drop_duplicates(
953-
subset=output_id, ignore_index=True
954-
)
957+
# The top-level feature "id" is always present at this stage (the
958+
# rename to output_id happens later in _arrange_cols), so dedup on
959+
# it directly to catch overlapping OR-clauses across chunks.
960+
if "id" in return_list.columns:
961+
return_list = return_list.drop_duplicates(subset="id", ignore_index=True)
955962
# Manage some aspects of the returned dataset
956963
return_list = _deal_with_empty(return_list, properties, service)
957964

@@ -961,8 +968,12 @@ def get_ogc_data(
961968
return_list = _arrange_cols(return_list, properties, output_id)
962969

963970
return_list = _sort_rows(return_list)
964-
# Create metadata object from response
965-
metadata = BaseMetadata(response)
971+
# Create metadata object from the first response. When the filter was
972+
# chunked into multiple sub-requests, query_time reflects the total
973+
# elapsed time across all chunks rather than just the first.
974+
if len(frames) > 1:
975+
first_response.elapsed = total_elapsed
976+
metadata = BaseMetadata(first_response)
966977
return return_list, metadata
967978

968979

tests/waterdata_utils_test.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,7 @@ def respond(request, context):
234234
"type": "Feature",
235235
"id": f"chunk-{call_count['n']}",
236236
"geometry": None,
237-
"properties": {
238-
"continuous_id": f"chunk-{call_count['n']}",
239-
"value": call_count["n"],
240-
},
237+
"properties": {"value": call_count["n"]},
241238
}
242239
],
243240
"links": [],
@@ -260,3 +257,55 @@ def respond(request, context):
260257
for req in requests_mock.request_history:
261258
filter_qs = parse_qs(urlsplit(req.url).query).get("filter", [""])[0]
262259
assert len(filter_qs) <= _CQL_FILTER_CHUNK_LEN
260+
261+
262+
@pytest.mark.skipif(
263+
sys.version_info < (3, 10),
264+
reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)",
265+
)
266+
def test_long_filter_deduplicates_cross_chunk_overlap(requests_mock):
267+
"""Features returned by multiple chunks (same feature `id`) are
268+
deduplicated in the concatenated result."""
269+
from dataretrieval.waterdata import get_continuous
270+
271+
clause = (
272+
"(time >= '2023-01-{day:02d}T00:00:00Z' "
273+
"AND time <= '2023-01-{day:02d}T00:30:00Z')"
274+
)
275+
expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300))
276+
277+
call_count = {"n": 0}
278+
279+
def respond(request, context):
280+
context.status_code = 200
281+
call_count["n"] += 1
282+
# Every chunk returns the same feature id so dedup should collapse
283+
# the concatenated frame down to a single row.
284+
return {
285+
"type": "FeatureCollection",
286+
"numberReturned": 1,
287+
"features": [
288+
{
289+
"type": "Feature",
290+
"id": "shared-feature",
291+
"geometry": None,
292+
"properties": {"value": 1},
293+
}
294+
],
295+
"links": [],
296+
}
297+
298+
requests_mock.get(OGC_CONTINUOUS_URL, json=respond)
299+
300+
df, _ = get_continuous(
301+
monitoring_location_id="USGS-07374525",
302+
parameter_code="72255",
303+
filter=expr,
304+
filter_lang="cql-text",
305+
)
306+
307+
expected_chunks = _chunk_cql_or(expr)
308+
assert len(expected_chunks) > 1
309+
assert call_count["n"] == len(expected_chunks)
310+
# Even though each chunk returned a feature, dedup by id collapses them.
311+
assert len(df) == 1

0 commit comments

Comments
 (0)