Skip to content

Commit b4f7d25

Browse files
devin-ai-integration[bot]bot_apk
andauthored
fix(source-github): make parse_response and error handlers defensive against unexpected response shapes (#77685)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai>
1 parent 42d27a7 commit b4f7d25

6 files changed

Lines changed: 274 additions & 13 deletions

File tree

airbyte-integrations/connectors/source-github/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
13-
dockerImageTag: 2.1.26
13+
dockerImageTag: 2.1.27
1414
dockerRepository: airbyte/source-github
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/github
1616
erdUrl: https://dbdocs.io/airbyteio/source-github?view=relationships

airbyte-integrations/connectors/source-github/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.1.26"
6+
version = "2.1.27"
77
name = "source-github"
88
description = "Source implementation for GitHub."
99
authors = [ "Airbyte <contact@airbyte.io>",]

airbyte-integrations/connectors/source-github/source_github/errors_handlers.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
import logging
56
from typing import Optional, Union
67

78
import requests
@@ -14,6 +15,9 @@
1415
from . import constants
1516

1617

18+
logger = logging.getLogger("airbyte")
19+
20+
1721
GITHUB_DEFAULT_ERROR_MAPPING = DEFAULT_ERROR_MAPPING | {
1822
401: ErrorResolution(
1923
response_action=ResponseAction.RETRY,
@@ -54,7 +58,14 @@
5458

5559
def is_conflict_with_empty_repository(response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> bool:
5660
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.CONFLICT:
57-
response_data = response_or_exception.json()
61+
try:
62+
response_data = response_or_exception.json()
63+
except ValueError:
64+
logger.warning(
65+
"is_conflict_with_empty_repository received non-JSON 409 response (first 50 chars: %r).",
66+
response_or_exception.text[:50],
67+
)
68+
return False
5869
return response_data.get("message") == "Git Repository is empty."
5970
return False
6071

@@ -64,6 +75,10 @@ def is_gone_with_feature_disabled(response_or_exception: Optional[Union[requests
6475
try:
6576
message = (response_or_exception.json().get("message") or "").lower()
6677
except ValueError:
78+
logger.warning(
79+
"is_gone_with_feature_disabled received non-JSON 410 response (first 50 chars: %r).",
80+
response_or_exception.text[:50],
81+
)
6782
return False
6883
return "are disabled" in message or "is disabled" in message
6984
return False
@@ -74,14 +89,26 @@ def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
7489
self.stream = stream
7590
super().__init__(**kwargs)
7691

92+
def _safe_json_check_graphql_rate_limited(self, response: requests.Response) -> bool:
93+
try:
94+
body = response.json()
95+
except ValueError:
96+
self._logger.warning(
97+
"GraphQL rate-limit check received non-JSON response (HTTP %s, first 50 chars: %r).",
98+
response.status_code,
99+
response.text[:50],
100+
)
101+
return False
102+
return self.stream.check_graphql_rate_limited(body or {})
103+
77104
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
78105
if isinstance(response_or_exception, requests.Response):
79106
retry_flag = (
80107
# The GitHub GraphQL API has limitations
81108
# https://docs.github.com/en/graphql/overview/resource-limitations
82109
(
83110
response_or_exception.headers.get("X-RateLimit-Resource") == "graphql"
84-
and self.stream.check_graphql_rate_limited(response_or_exception.json())
111+
and self._safe_json_check_graphql_rate_limited(response_or_exception)
85112
)
86113
# Rate limit HTTP headers
87114
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#rate-limit-http-headers
@@ -162,6 +189,13 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
162189

163190

164191
class GitHubGraphQLErrorHandler(GithubStreamABCErrorHandler):
192+
def _safe_json_get_errors(self, response: requests.Response) -> bool:
193+
try:
194+
body = response.json()
195+
except ValueError:
196+
return False
197+
return bool((body or {}).get("errors"))
198+
165199
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
166200
if isinstance(response_or_exception, requests.Response):
167201
if response_or_exception.status_code in (requests.codes.BAD_GATEWAY, requests.codes.GATEWAY_TIMEOUT):
@@ -176,7 +210,7 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
176210
constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM if self.stream.large_stream else constants.DEFAULT_PAGE_SIZE
177211
)
178212

179-
if response_or_exception.json().get("errors"):
213+
if self._safe_json_get_errors(response_or_exception):
180214
return ErrorResolution(
181215
response_action=ResponseAction.RETRY,
182216
failure_type=FailureType.transient_error,

airbyte-integrations/connectors/source-github/source_github/streams.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,39 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
251251

252252
return record
253253

254+
def _safe_json_list(self, response: requests.Response, key: Optional[str] = None) -> Optional[list]:
255+
"""Parse JSON from `response` and return a list, or ``None`` on failure.
256+
257+
When `key` is provided the body is expected to be a dict and the list is
258+
extracted via ``body[key]``. When `key` is ``None`` the body itself must
259+
be a list. On any parse/validation failure a warning is logged and
260+
``None`` is returned so callers can short-circuit gracefully.
261+
"""
262+
try:
263+
body = response.json()
264+
except ValueError:
265+
self.logger.warning(
266+
"`%s` received non-JSON response (HTTP %s, first 50 chars: %r).",
267+
self.name,
268+
response.status_code,
269+
response.text[:50],
270+
)
271+
return None
272+
if key is not None:
273+
items = (body or {}).get(key)
274+
else:
275+
items = body
276+
if not isinstance(items, list):
277+
self.logger.warning(
278+
"`%s` response has unexpected structure (HTTP %s, key=%r, got %s).",
279+
self.name,
280+
response.status_code,
281+
key,
282+
type(items).__name__,
283+
)
284+
return None
285+
return items
286+
254287
def parse_response(
255288
self,
256289
response: requests.Response,
@@ -1595,8 +1628,10 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
15951628
return f"repos/{stream_slice['repository']}/actions/workflows"
15961629

15971630
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
1598-
response = response.json().get("workflows")
1599-
for record in response:
1631+
items = self._safe_json_list(response, key="workflows")
1632+
if items is None:
1633+
return
1634+
for record in items:
16001635
yield self.transform(record=record, stream_slice=stream_slice)
16011636

16021637
def convert_cursor_value(self, value):
@@ -1620,8 +1655,10 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
16201655
return f"repos/{stream_slice['repository']}/actions/runs"
16211656

16221657
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
1623-
response = response.json().get("workflow_runs")
1624-
for record in response:
1658+
items = self._safe_json_list(response, key="workflow_runs")
1659+
if items is None:
1660+
return
1661+
for record in items:
16251662
yield record
16261663

16271664
def read_records(
@@ -1699,7 +1736,10 @@ def parse_response(
16991736
stream_slice: Mapping[str, Any] = None,
17001737
next_page_token: Mapping[str, Any] = None,
17011738
) -> Iterable[Mapping]:
1702-
for record in response.json()["jobs"]:
1739+
items = self._safe_json_list(response, key="jobs")
1740+
if items is None:
1741+
return
1742+
for record in items:
17031743
if record.get(self.cursor_field):
17041744
yield self.transform(record=record, stream_slice=stream_slice)
17051745

@@ -1885,8 +1925,11 @@ def parse_response(
18851925
stream_slice: Mapping[str, Any] = None,
18861926
next_page_token: Mapping[str, Any] = None,
18871927
) -> Iterable[Mapping]:
1888-
events_list = response.json()
18891928
record = {"repository": stream_slice["repository"], "issue_number": stream_slice["number"]}
1929+
events_list = self._safe_json_list(response)
1930+
if events_list is None:
1931+
yield record
1932+
return
18901933
for event in events_list:
18911934
record[event["event"]] = event
18921935
yield record

0 commit comments

Comments
 (0)