Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 2.1.26
dockerImageTag: 2.1.27
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
erdUrl: https://dbdocs.io/airbyteio/source-github?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.1.26"
version = "2.1.27"
name = "source-github"
description = "Source implementation for GitHub."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Optional, Union

import requests
Expand All @@ -14,6 +15,9 @@
from . import constants


logger = logging.getLogger("airbyte")


GITHUB_DEFAULT_ERROR_MAPPING = DEFAULT_ERROR_MAPPING | {
401: ErrorResolution(
response_action=ResponseAction.RETRY,
Expand Down Expand Up @@ -54,7 +58,14 @@

def is_conflict_with_empty_repository(response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> bool:
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.CONFLICT:
response_data = response_or_exception.json()
try:
response_data = response_or_exception.json()
except ValueError:
logger.warning(
"is_conflict_with_empty_repository received non-JSON 409 response (first 50 chars: %r).",
response_or_exception.text[:50],
)
return False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log warning

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added a warning log with the first 50 chars of the response body in 02fc609.

return response_data.get("message") == "Git Repository is empty."
return False

Expand All @@ -64,6 +75,10 @@ def is_gone_with_feature_disabled(response_or_exception: Optional[Union[requests
try:
message = (response_or_exception.json().get("message") or "").lower()
except ValueError:
logger.warning(
"is_gone_with_feature_disabled received non-JSON 410 response (first 50 chars: %r).",
response_or_exception.text[:50],
)
return False
return "are disabled" in message or "is disabled" in message
return False
Expand All @@ -74,14 +89,26 @@ def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)

def _safe_json_check_graphql_rate_limited(self, response: requests.Response) -> bool:
try:
body = response.json()
except ValueError:
self._logger.warning(
"GraphQL rate-limit check received non-JSON response (HTTP %s, first 50 chars: %r).",
response.status_code,
response.text[:50],
)
return False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log warning

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added a warning log with HTTP status and first 50 chars of the response body in 02fc609.

return self.stream.check_graphql_rate_limited(body or {})

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
retry_flag = (
# The GitHub GraphQL API has limitations
# https://docs.github.com/en/graphql/overview/resource-limitations
(
response_or_exception.headers.get("X-RateLimit-Resource") == "graphql"
and self.stream.check_graphql_rate_limited(response_or_exception.json())
and self._safe_json_check_graphql_rate_limited(response_or_exception)
)
# Rate limit HTTP headers
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#rate-limit-http-headers
Expand Down Expand Up @@ -162,6 +189,13 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp


class GitHubGraphQLErrorHandler(GithubStreamABCErrorHandler):
def _safe_json_get_errors(self, response: requests.Response) -> bool:
try:
body = response.json()
except ValueError:
return False
return bool((body or {}).get("errors"))

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code in (requests.codes.BAD_GATEWAY, requests.codes.GATEWAY_TIMEOUT):
Expand All @@ -176,7 +210,7 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM if self.stream.large_stream else constants.DEFAULT_PAGE_SIZE
)

if response_or_exception.json().get("errors"):
if self._safe_json_get_errors(response_or_exception):
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,39 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,

return record

def _safe_json_list(self, response: requests.Response, key: Optional[str] = None) -> Optional[list]:
"""Parse JSON from `response` and return a list, or ``None`` on failure.

When `key` is provided the body is expected to be a dict and the list is
extracted via ``body[key]``. When `key` is ``None`` the body itself must
be a list. On any parse/validation failure a warning is logged and
``None`` is returned so callers can short-circuit gracefully.
"""
try:
body = response.json()
except ValueError:
self.logger.warning(
"`%s` received non-JSON response (HTTP %s, first 50 chars: %r).",
self.name,
response.status_code,
response.text[:50],
)
return None
if key is not None:
items = (body or {}).get(key)
else:
items = body
if not isinstance(items, list):
self.logger.warning(
"`%s` response has unexpected structure (HTTP %s, key=%r, got %s).",
self.name,
response.status_code,
key,
type(items).__name__,
)
return None
return items

def parse_response(
self,
response: requests.Response,
Expand Down Expand Up @@ -1595,8 +1628,10 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/actions/workflows"

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
response = response.json().get("workflows")
for record in response:
items = self._safe_json_list(response, key="workflows")
if items is None:
return
for record in items:
yield self.transform(record=record, stream_slice=stream_slice)

def convert_cursor_value(self, value):
Expand All @@ -1620,8 +1655,10 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/actions/runs"

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
response = response.json().get("workflow_runs")
for record in response:
items = self._safe_json_list(response, key="workflow_runs")
if items is None:
return
for record in items:
yield record

def read_records(
Expand Down Expand Up @@ -1699,7 +1736,10 @@ def parse_response(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
for record in response.json()["jobs"]:
items = self._safe_json_list(response, key="jobs")
if items is None:
return
for record in items:
if record.get(self.cursor_field):
yield self.transform(record=record, stream_slice=stream_slice)

Expand Down Expand Up @@ -1885,8 +1925,11 @@ def parse_response(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
events_list = response.json()
record = {"repository": stream_slice["repository"], "issue_number": stream_slice["number"]}
events_list = self._safe_json_list(response)
if events_list is None:
yield record
return
for event in events_list:
record[event["event"]] = event
yield record
Loading
Loading