Skip to content

Commit 37e2d2a

Browse files
thodson-usgsclaude
andcommitted
Add optional parallel chunk processing via ThreadPoolExecutor
Draft. Adds `max_workers` to `@multi_value_chunked`; when set > 1 sub-requests run concurrently in a ``ThreadPoolExecutor``. Default (``None``) keeps the existing sequential behavior, including the mid-call quota guard and ``QuotaExhausted.partial_frame`` resumability. Parallel mode forfeits both: workers can race past the floor before any one observes the crossing, and any chunk failure discards completed-but-uncollected results. Benchmarked on 671 CAMELS sites x ``get_field_measurements`` (337,808 rows, ~14 paginated chunks): - sync: 82.1s +/- 12.1s - workers=2: 100.2s (unstable; one run truncated by 429) - workers=4: 51.5s +/- 2.8s (~1.6x; one run lightly truncated) - workers=8: crashed on 429 mid-flight Open question for follow-up: under parallelism we observe partial data being returned silently when one chunk hits a paginated 429 -- ``_walk_pages`` returns the rows it already has rather than surfacing the truncation. This pre-existed but parallelism makes it easier to trigger. Investigate as part of any future merge. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 526a7e4 commit 37e2d2a

1 file changed

Lines changed: 32 additions & 2 deletions

File tree

dataretrieval/waterdata/chunking.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
from __future__ import annotations
3737

38+
import concurrent.futures
3839
import functools
3940
import itertools
4041
import math
@@ -339,6 +340,7 @@ def multi_value_chunked(
339340
url_limit: int | None = None,
340341
max_chunks: int | None = None,
341342
quota_safety_floor: int | None = None,
343+
max_workers: int | None = None,
342344
) -> Callable[[_FetchOnce], _FetchOnce]:
343345
"""Decorator that splits multi-value list params across sub-requests
344346
so each URL fits ``url_limit`` bytes (defaults to
@@ -362,6 +364,14 @@ def multi_value_chunked(
362364
response)`` — so the two decorators compose cleanly. The planner is
363365
filter-aware so it doesn't raise prematurely when the inner filter
364366
chunker would have shrunk the per-sub-request URL on its own.
367+
368+
``max_workers`` (default ``None``, sequential) runs sub-requests in a
369+
``ThreadPoolExecutor`` when ``> 1``. Parallel mode forfeits the mid-call
370+
quota guard and ``QuotaExhausted.partial_frame`` resumability — workers
371+
race past the floor before any one observes the crossing, and any
372+
chunk failure discards completed-but-uncollected results. Empirically
373+
helpful only on pagination-bound workloads, where shared per-IP rate
374+
limits cap useful parallelism well below host CPU count.
365375
"""
366376

367377
def decorator(fetch_once: _FetchOnce) -> _FetchOnce:
@@ -385,10 +395,30 @@ def wrapper(
385395

386396
keys = list(plan)
387397
total = math.prod(len(plan[k]) for k in keys)
398+
sub_args_list = [
399+
{**args, **dict(zip(keys, combo))}
400+
for combo in itertools.product(*(plan[k] for k in keys))
401+
]
402+
403+
if max_workers is not None and max_workers > 1:
404+
# Parallel mode forfeits the mid-call quota guard and
405+
# ``QuotaExhausted.partial_frame`` resumability: workers race
406+
# past the floor before any one observes the crossing, and a
407+
# failed chunk discards completed-but-uncollected results.
408+
with concurrent.futures.ThreadPoolExecutor(
409+
max_workers=max_workers
410+
) as executor:
411+
results = list(executor.map(fetch_once, sub_args_list))
412+
frames = [r[0] for r in results]
413+
responses = [r[1] for r in results]
414+
return (
415+
_combine_chunk_frames(frames),
416+
_combine_chunk_responses(responses),
417+
)
418+
388419
frames: list[pd.DataFrame] = []
389420
responses: list[requests.Response] = []
390-
for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))):
391-
sub_args = {**args, **dict(zip(keys, combo))}
421+
for i, sub_args in enumerate(sub_args_list):
392422
frame, response = fetch_once(sub_args)
393423
frames.append(frame)
394424
responses.append(response)

0 commit comments

Comments
 (0)