Skip to content

Commit 6028727

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: initialize FixedWindowCallRatePolicy next_reset_ts to one period instead of 10 days
Previously, the initial reset timestamp was set to 10 days from now. If the API did not return a ratelimit-reset header (e.g. only retry-after), this value was never updated, causing the rate limiter to sleep for ~10 days on a 429 response instead of respecting the configured period. This caused a deadlock where the heartbeat monitor would kill the process after 1.5 hours of no records emitted. Now the initial reset timestamp defaults to now + period (e.g. 1 minute), so the window resets promptly even when the API does not provide reset headers. Also updated the FixedWindowCallRatePolicy component description to document the default reset behavior when no ratelimit-reset header is present. Resolves: airbytehq/oncall#11924 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 69cd63d commit 6028727

3 files changed

Lines changed: 45 additions & 38 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1718,7 +1718,12 @@ definitions:
17181718
additionalProperties: true
17191719
FixedWindowCallRatePolicy:
17201720
title: Fixed Window Call Rate Policy
1721-
description: A policy that allows a fixed number of calls within a specific time window.
1721+
description: >
1722+
A policy that allows a fixed number of calls within a specific time window.
1723+
The initial rate limit window resets after one `period`. If the API returns a
1724+
`ratelimit-reset` response header (as configured on the parent `HTTPAPIBudget`),
1725+
subsequent windows will align to the server-provided reset timestamp. When the
1726+
header is absent, the window resets based on the configured `period`.
17221727
type: object
17231728
required:
17241729
- type

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@ class AuthFlowType(Enum):
1818
oauth1_0 = "oauth1.0"
1919

2020

21-
class ScopesJoinStrategy(Enum):
22-
space = "space"
23-
comma = "comma"
24-
plus = "plus"
25-
26-
2721
class BasicHttpAuthenticator(BaseModel):
2822
type: Literal["BasicHttpAuthenticator"]
2923
username: str = Field(
@@ -488,7 +482,7 @@ class Config:
488482
)
489483
weight: Optional[Union[int, str]] = Field(
490484
None,
491-
description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.",
485+
description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.\n",
492486
title="Weight",
493487
)
494488

@@ -828,22 +822,32 @@ class NoPagination(BaseModel):
828822
type: Literal["NoPagination"]
829823

830824

831-
class State(BaseModel):
825+
class Scope(BaseModel):
832826
class Config:
833827
extra = Extra.allow
834828

835-
min: int
836-
max: int
829+
scope: str = Field(..., description="The OAuth scope string to request from the provider.")
837830

838831

839-
class OAuthScope(BaseModel):
832+
class OptionalScope(BaseModel):
840833
class Config:
841834
extra = Extra.allow
842835

843-
scope: str = Field(
844-
...,
845-
description="The OAuth scope string to request from the provider.",
846-
)
836+
scope: str = Field(..., description="The OAuth scope string to request from the provider.")
837+
838+
839+
class ScopesJoinStrategy(Enum):
840+
space = "space"
841+
comma = "comma"
842+
plus = "plus"
843+
844+
845+
class State(BaseModel):
846+
class Config:
847+
extra = Extra.allow
848+
849+
min: int
850+
max: int
847851

848852

849853
class OauthConnectorInputSpecification(BaseModel):
@@ -865,17 +869,13 @@ class Config:
865869
examples=["user:read user:read_orders workspaces:read"],
866870
title="Scopes",
867871
)
868-
# NOTE: scopes, optional_scopes, and scopes_join_strategy are processed by the
869-
# platform OAuth handler (DeclarativeOAuthSpecHandler.kt), not by the CDK runtime.
870-
# The CDK schema defines the manifest contract; the platform reads these fields
871-
# during the OAuth consent flow to build the authorization URL.
872-
scopes: Optional[List[OAuthScope]] = Field(
872+
scopes: Optional[List[Scope]] = Field(
873873
None,
874874
description="List of OAuth scope objects. When present, takes precedence over the `scope` string property.\nThe scope values are joined using the `scopes_join_strategy` (default: space) before being\nsent to the OAuth provider.",
875875
examples=[[{"scope": "user:read"}, {"scope": "user:write"}]],
876876
title="Scopes",
877877
)
878-
optional_scopes: Optional[List[OAuthScope]] = Field(
878+
optional_scopes: Optional[List[OptionalScope]] = Field(
879879
None,
880880
description="Optional OAuth scope objects that may or may not be granted.",
881881
examples=[[{"scope": "admin:read"}]],
@@ -1250,6 +1250,10 @@ class AsyncJobStatusMap(BaseModel):
12501250
timeout: List[str]
12511251

12521252

1253+
class BlockSimultaneousSyncsAction(BaseModel):
1254+
type: Literal["BlockSimultaneousSyncsAction"]
1255+
1256+
12531257
class ValueType(Enum):
12541258
string = "string"
12551259
number = "number"
@@ -2401,7 +2405,7 @@ class Config:
24012405
api_budget: Optional[HTTPAPIBudget] = None
24022406
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
24032407
None,
2404-
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2408+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n",
24052409
title="Stream Groups",
24062410
)
24072411
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
@@ -2441,7 +2445,7 @@ class Config:
24412445
api_budget: Optional[HTTPAPIBudget] = None
24422446
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
24432447
None,
2444-
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2448+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n",
24452449
title="Stream Groups",
24462450
)
24472451
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
@@ -2937,7 +2941,7 @@ class StateDelegatingStream(BaseModel):
29372941
)
29382942
api_retention_period: Optional[str] = Field(
29392943
None,
2940-
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n",
2944+
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n* **PT1H**: 1 hour\n* **P1D**: 1 day\n* **P1W**: 1 week\n* **P1M**: 1 month\n* **P1Y**: 1 year\n* **P30D**: 30 days\n",
29412945
examples=["P30D", "P90D", "P1Y"],
29422946
title="API Retention Period",
29432947
)
@@ -3111,20 +3115,14 @@ class AsyncRetriever(BaseModel):
31113115
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
31123116

31133117

3114-
class BlockSimultaneousSyncsAction(BaseModel):
3115-
type: Literal["BlockSimultaneousSyncsAction"]
3116-
3117-
31183118
class StreamGroup(BaseModel):
3119-
streams: List[str] = Field(
3119+
streams: List[DeclarativeStream] = Field(
31203120
...,
3121-
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
3121+
description="List of references to streams that belong to this group.\n",
31223122
title="Streams",
31233123
)
31243124
action: BlockSimultaneousSyncsAction = Field(
3125-
...,
3126-
description="The action to apply to streams in this group.",
3127-
title="Action",
3125+
..., description="The action to apply to streams in this group.", title="Action"
31283126
)
31293127

31303128

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4307,11 +4307,15 @@ def create_fixed_window_call_rate_policy(
43074307
for matcher in model.matchers
43084308
]
43094309

4310-
# Set the initial reset timestamp to 10 days from now.
4311-
# This value will be updated by the first request.
4310+
period = parse_duration(model.period)
4311+
4312+
# Set the initial reset timestamp to one period from now.
4313+
# If the API returns a ratelimit-reset header, this value will be updated
4314+
# by the first response. Otherwise, the window will reset after one period,
4315+
# preventing a deadlock when the header is absent.
43124316
return FixedWindowCallRatePolicy(
4313-
next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10),
4314-
period=parse_duration(model.period),
4317+
next_reset_ts=datetime.datetime.now() + period,
4318+
period=period,
43154319
call_limit=model.call_limit,
43164320
matchers=matchers,
43174321
)

0 commit comments

Comments
 (0)