|
34 | 34 | import functools |
35 | 35 | import itertools |
36 | 36 | import math |
37 | | -from collections.abc import Callable |
| 37 | +from collections.abc import Callable, Iterator |
38 | 38 | from typing import Any |
39 | 39 | from urllib.parse import quote_plus |
40 | 40 |
|
|
98 | 98 | # so a missing/malformed header doesn't trigger spurious aborts. |
99 | 99 | _QUOTA_UNKNOWN = 10**9 |
100 | 100 |
|
| 101 | +# Separators the two chunking dimensions use to compose their atoms |
| 102 | +# into URL-encoded blobs. List dims comma-join values |
| 103 | +# (``site=USGS-A,USGS-B``); filter dims OR-join clauses |
| 104 | +# (``filter=a='1' OR a='2'``). Pinned as constants so the URL-byte |
| 105 | +# sizing helper and the partition logic agree on the join shape. |
| 106 | +_LIST_SEP = "," |
| 107 | +_OR_SEP = " OR " |
| 108 | + |
101 | 109 |
|
102 | 110 | class RequestTooLarge(ValueError): |
103 | 111 | """Raised when a chunked request cannot be issued. Either the URL |
@@ -214,7 +222,7 @@ def _worst_case_args( |
214 | 222 | in by the caller).""" |
215 | 223 | out = dict(base_args) |
216 | 224 | for k, chunks in list_plan.items(): |
217 | | - out[k] = max(chunks, key=lambda c: _joined_url_bytes(c, ",")) |
| 225 | + out[k] = max(chunks, key=lambda c: _joined_url_bytes(c, _LIST_SEP)) |
218 | 226 | return out |
219 | 227 |
|
220 | 228 |
|
@@ -282,7 +290,9 @@ def _plan_list_chunks( |
282 | 290 | if len(chunk) > 1 |
283 | 291 | ) |
284 | 292 | biggest = max( |
285 | | - splittable, key=lambda t: _joined_url_bytes(t[2], ","), default=None |
| 293 | + splittable, |
| 294 | + key=lambda t: _joined_url_bytes(t[2], _LIST_SEP), |
| 295 | + default=None, |
286 | 296 | ) |
287 | 297 | if biggest is None: |
288 | 298 | raise RequestTooLarge( |
@@ -320,6 +330,32 @@ def _filter_chunk_counts(n_clauses: int) -> list[int]: |
320 | 330 | return counts |
321 | 331 |
|
322 | 332 |
|
| 333 | +def _filter_candidates( |
| 334 | + clauses: list[str], original_filter: str | None |
| 335 | +) -> Iterator[tuple[list[str | None], str | None]]: |
| 336 | + """Yield ``(filter_chunks, worst_filter)`` for each candidate filter |
| 337 | + chunk count. ``filter_chunks`` is the list of OR-joined sub-filters |
| 338 | + the wrapper will substitute one at a time; ``worst_filter`` is the |
| 339 | + longest URL-encoded chunk, used by the planner to size the list dims |
| 340 | + against the most demanding sub-request. |
| 341 | +
|
| 342 | + Falls through to a single ``(filter_chunks=[original_filter], None)`` |
| 343 | + candidate when the filter has no top-level OR splits (single clause, |
| 344 | + cql-json, missing filter): the wrapper still iterates that one |
| 345 | + "chunk" but the planner can skip substituting a filter into the |
| 346 | + URL probe.""" |
| 347 | + if len(clauses) < 2: |
| 348 | + chunks: list[str | None] = ( |
| 349 | + [original_filter] if original_filter is not None else [None] |
| 350 | + ) |
| 351 | + yield chunks, None |
| 352 | + return |
| 353 | + for k in _filter_chunk_counts(len(clauses)): |
| 354 | + groups = _partition_clauses(clauses, k) |
| 355 | + worst = max(groups, key=lambda g: _joined_url_bytes(g, _OR_SEP)) |
| 356 | + yield [_OR_SEP.join(g) for g in groups], _OR_SEP.join(worst) |
| 357 | + |
| 358 | + |
323 | 359 | def _plan_joint( |
324 | 360 | args: dict[str, Any], |
325 | 361 | build_request: Callable[..., Any], |
@@ -358,42 +394,30 @@ def _plan_joint( |
358 | 394 | if _is_chunkable(filter_expr, args.get("filter_lang")): |
359 | 395 | _check_numeric_filter_pitfall(filter_expr) |
360 | 396 | clauses = _split_top_level_or(filter_expr) |
361 | | - # Filter is chunkable only when there are ≥2 top-level OR clauses; |
362 | | - # a single clause can't be split losslessly. |
363 | | - filter_chunkable = len(clauses) >= 2 |
364 | 397 |
|
365 | | - if not _chunkable_params(args) and not filter_chunkable: |
| 398 | + if not _chunkable_params(args) and len(clauses) < 2: |
366 | 399 | return None |
367 | 400 | if _request_bytes(build_request(**args)) <= url_limit: |
368 | 401 | return None |
369 | 402 |
|
370 | | - candidate_counts = _filter_chunk_counts(len(clauses)) if filter_chunkable else [1] |
371 | 403 | best: tuple[int, dict[str, list[list[Any]]], list[str | None]] | None = None |
372 | 404 | last_error: RequestTooLarge | None = None |
373 | 405 |
|
374 | | - for k in candidate_counts: |
375 | | - if filter_chunkable: |
376 | | - groups = _partition_clauses(clauses, k) |
377 | | - worst_group = max(groups, key=lambda g: _joined_url_bytes(g, " OR ")) |
378 | | - filter_chunks: list[str | None] = [" OR ".join(g) for g in groups] |
379 | | - plan_args = {**args, "filter": " OR ".join(worst_group)} |
380 | | - else: |
381 | | - filter_chunks = [filter_expr] if filter_expr is not None else [None] |
382 | | - plan_args = args |
383 | | - |
384 | | - per_filter_cap = max(1, max_chunks // k) |
| 406 | + for filter_chunks, worst_filter in _filter_candidates(clauses, filter_expr): |
| 407 | + k = len(filter_chunks) |
| 408 | + plan_args = args if worst_filter is None else {**args, "filter": worst_filter} |
385 | 409 | try: |
386 | 410 | list_plan = _plan_list_chunks( |
387 | | - plan_args, build_request, url_limit, per_filter_cap |
| 411 | + plan_args, build_request, url_limit, max(1, max_chunks // k) |
388 | 412 | ) |
389 | 413 | except RequestTooLarge as exc: |
390 | 414 | last_error = exc |
391 | 415 | continue |
392 | 416 | if list_plan is None: |
393 | 417 | list_plan = {} |
394 | | - # When there are no list dims to shrink, ``_plan_list_chunks`` |
395 | | - # returns ``None`` regardless of whether the request actually |
396 | | - # fits. Filter chunking alone has to close the gap — verify it. |
| 418 | + # ``_plan_list_chunks`` returns ``None`` when no list dims are |
| 419 | + # chunkable, regardless of whether the request actually fits. |
| 420 | + # Filter chunking alone has to close the gap — verify it. |
397 | 421 | if not list_plan and _request_bytes(build_request(**plan_args)) > url_limit: |
398 | 422 | continue |
399 | 423 | total = _plan_total(list_plan, k) |
@@ -425,6 +449,34 @@ def _read_remaining(response: requests.Response) -> int: |
425 | 449 | return _QUOTA_UNKNOWN |
426 | 450 |
|
427 | 451 |
|
| 452 | +def _iter_sub_args( |
| 453 | + args: dict[str, Any], |
| 454 | + list_plan: dict[str, list[list[Any]]], |
| 455 | + filter_chunks: list[str | None], |
| 456 | +) -> Iterator[dict[str, Any]]: |
| 457 | + """Yield the substituted ``args`` for each sub-request in the joint |
| 458 | + plan, in deterministic order: list-dim cartesian product (insertion |
| 459 | + order, Python 3.7+ guarantee) crossed with filter chunks.""" |
| 460 | + list_keys = list(list_plan) |
| 461 | + list_combos = ( |
| 462 | + itertools.product(*(list_plan[k] for k in list_keys)) if list_plan else [()] |
| 463 | + ) |
| 464 | + for combo in list_combos: |
| 465 | + base = {**args, **dict(zip(list_keys, combo))} |
| 466 | + for filter_chunk in filter_chunks: |
| 467 | + yield base if filter_chunk is None else {**base, "filter": filter_chunk} |
| 468 | + |
| 469 | + |
| 470 | +def _finalize_response( |
| 471 | + responses: list[requests.Response], canonical_url: str |
| 472 | +) -> requests.Response: |
| 473 | + """Aggregate per-sub-request responses and restore the canonical |
| 474 | + URL representing the user's full original query.""" |
| 475 | + combined = _combine_chunk_responses(responses) |
| 476 | + combined.url = canonical_url |
| 477 | + return combined |
| 478 | + |
| 479 | + |
428 | 480 | def multi_value_chunked( |
429 | 481 | *, |
430 | 482 | build_request: Callable[..., Any], |
@@ -481,44 +533,32 @@ def wrapper( |
481 | 533 |
|
482 | 534 | list_plan, filter_chunks = plan |
483 | 535 | canonical_url = build_request(**args).url |
484 | | - |
485 | | - list_keys = list(list_plan) |
486 | | - list_combos = ( |
487 | | - list(itertools.product(*(list_plan[k] for k in list_keys))) |
488 | | - if list_plan |
489 | | - else [()] |
490 | | - ) |
491 | | - total = len(list_combos) * len(filter_chunks) |
| 536 | + total = _plan_total(list_plan, len(filter_chunks)) |
492 | 537 |
|
493 | 538 | frames: list[pd.DataFrame] = [] |
494 | 539 | responses: list[requests.Response] = [] |
495 | | - i = 0 |
496 | | - for combo in list_combos: |
497 | | - for filter_chunk in filter_chunks: |
498 | | - sub_args = dict(args) |
499 | | - sub_args.update(zip(list_keys, combo)) |
500 | | - if filter_chunk is not None: |
501 | | - sub_args["filter"] = filter_chunk |
502 | | - frame, response = fetch_once(sub_args) |
503 | | - frames.append(frame) |
504 | | - responses.append(response) |
505 | | - if i < total - 1: |
506 | | - remaining = _read_remaining(response) |
507 | | - if remaining < floor: |
508 | | - partial = _combine_chunk_responses(responses) |
509 | | - partial.url = canonical_url |
510 | | - raise QuotaExhausted( |
511 | | - partial_frame=_combine_chunk_frames(frames), |
512 | | - partial_response=partial, |
513 | | - completed_chunks=i + 1, |
514 | | - total_chunks=total, |
515 | | - remaining=remaining, |
516 | | - ) |
517 | | - i += 1 |
518 | | - |
519 | | - combined = _combine_chunk_responses(responses) |
520 | | - combined.url = canonical_url |
521 | | - return _combine_chunk_frames(frames), combined |
| 540 | + for i, sub_args in enumerate( |
| 541 | + _iter_sub_args(args, list_plan, filter_chunks) |
| 542 | + ): |
| 543 | + frame, response = fetch_once(sub_args) |
| 544 | + frames.append(frame) |
| 545 | + responses.append(response) |
| 546 | + if i == total - 1: |
| 547 | + continue # last chunk; no next sub-request to gate |
| 548 | + remaining = _read_remaining(response) |
| 549 | + if remaining < floor: |
| 550 | + raise QuotaExhausted( |
| 551 | + partial_frame=_combine_chunk_frames(frames), |
| 552 | + partial_response=_finalize_response(responses, canonical_url), |
| 553 | + completed_chunks=i + 1, |
| 554 | + total_chunks=total, |
| 555 | + remaining=remaining, |
| 556 | + ) |
| 557 | + |
| 558 | + return ( |
| 559 | + _combine_chunk_frames(frames), |
| 560 | + _finalize_response(responses, canonical_url), |
| 561 | + ) |
522 | 562 |
|
523 | 563 | return wrapper # type: ignore[return-value] |
524 | 564 |
|
|
0 commit comments