Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,12 @@ definitions:
items:
type: integer
default: [429]
path_for_status_code:
title: Path to look for the status code in the response
description: When the status code is not a HTTP status code but a code in the HTTP response.
type: array
items:
type: string
additionalProperties: true
FixedWindowCallRatePolicy:
title: Fixed Window Call Rate Policy
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2215,6 +2213,11 @@ class Config:
description="List of HTTP status codes that indicate a rate limit has been hit.",
title="Status Codes for Rate Limit Hit",
)
path_for_status_code: Optional[List[str]] = Field(
None,
description="When the status code is not a HTTP status code but a code in the HTTP response.",
title="Path to look for the status code in the response",
)


class ZipfileDecoder(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4153,6 +4153,7 @@ def create_http_api_budget(
ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset",
ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining",
status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or [429],
path_for_status_code=model.path_for_status_code,
)

def create_fixed_window_call_rate_policy(
Expand Down
19 changes: 16 additions & 3 deletions airbyte_cdk/sources/streams/call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import abc
import dataclasses
import datetime
import functools
import logging
import re
import time
from datetime import timedelta
from threading import RLock
from typing import TYPE_CHECKING, Any, Mapping, Optional
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
from urllib import parse

import requests
Expand Down Expand Up @@ -634,7 +635,8 @@ def __init__(
self,
ratelimit_reset_header: str = "ratelimit-reset",
ratelimit_remaining_header: str = "ratelimit-remaining",
status_codes_for_ratelimit_hit: list[int] = [429],
status_codes_for_ratelimit_hit: List[int] = [429],
path_for_status_code: Optional[List[str]] = None,
**kwargs: Any,
):
"""Constructor
Expand All @@ -646,6 +648,7 @@ def __init__(
self._ratelimit_reset_header = ratelimit_reset_header
self._ratelimit_remaining_header = ratelimit_remaining_header
self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
self._path_for_status_code = path_for_status_code
super().__init__(**kwargs)

def update_from_response(self, request: Any, response: Any) -> None:
Expand All @@ -671,7 +674,17 @@ def get_calls_left_from_response(self, response: requests.Response) -> Optional[
if response.headers.get(self._ratelimit_remaining_header):
return int(response.headers[self._ratelimit_remaining_header])

if response.status_code in self._status_codes_for_ratelimit_hit:
if self._path_for_status_code:
try:
if (
functools.reduce(lambda a, b: a[b], self._path_for_status_code, response.json())
in self._status_codes_for_ratelimit_hit
):
return 0
except KeyError:
# the status is not present in the response so we will assume that we're not being rate limited
pass
elif response.status_code in self._status_codes_for_ratelimit_hit:
return 0
Comment on lines +677 to 688
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t skip HTTP-status fallback when a JSON path is configured; harden JSON/path handling.

With elif, a configured path_for_status_code suppresses the HTTP status check; a missing/unevaluable path could cause us to ignore a 429. Also, only KeyError is handled; JSON decoding or type/index errors may surface. Proposed minimal fix:

-        if self._path_for_status_code:
-            try:
-                if (
-                    functools.reduce(lambda a, b: a[b], self._path_for_status_code, response.json())
-                    in self._status_codes_for_ratelimit_hit
-                ):
-                    return 0
-            except KeyError:
-                # the status is not present in the response so we will assume that we're not being rate limited
-                pass
-        elif response.status_code in self._status_codes_for_ratelimit_hit:
+        if self._path_for_status_code:
+            try:
+                body = response.json()
+                value = functools.reduce(lambda a, b: a[b], self._path_for_status_code, body)
+                # normalize common stringified codes, e.g. "429"
+                if isinstance(value, str) and value.isdigit():
+                    value = int(value)
+                if value in self._status_codes_for_ratelimit_hit:
+                    return 0
+            except (KeyError, TypeError, ValueError, json.JSONDecodeError):
+                # path missing or body not JSON; fall back to HTTP status check below
+                pass
+        if response.status_code in self._status_codes_for_ratelimit_hit:
             return 0

And add the import near the top:

+import json

Optionally, do you want to support numeric list indices in the path (e.g., ["errors", "0", "code"]) by treating numeric tokens as indexes? Easy to add if helpful. Wdyt?

🤖 Prompt for AI Agents
In airbyte_cdk/sources/streams/call_rate.py around lines 677-688, the current
code uses an elif so when a path_for_status_code is configured the plain HTTP
status_code check is skipped, and it only catches KeyError which ignores JSON
decode/type/index problems; change the logic so the HTTP-status check is always
evaluated (use a separate if for the JSON-path branch, not elif), broaden the
exception handling around path evaluation to catch JSONDecodeError, TypeError,
IndexError (import JSONDecodeError from json at the top), and on any exception
fall back to evaluating response.status_code against
_status_codes_for_ratelimit_hit; optionally, if you want numeric path tokens to
select list indices, convert tokens that are numeric strings to ints before
indexing.


return None
Expand Down
Loading
Loading