Skip to content

Commit 5579e29

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix(cdk): retry async job download on HTTP 403 with fresh polling URL
When downloading async job results (e.g., Bing Ads reports), the download URL may contain a time-limited SAS token that expires before the download starts. This causes an HTTP 403 that the CDK misclassifies as config_error, preventing retries and killing all running jobs. This change adds retry logic to AsyncHttpJobRepository.fetch_records that: 1. Catches HTTP 403 errors during the download phase 2. Re-polls the polling endpoint to obtain a fresh download URL 3. Retries the download with the new URL Resolves airbytehq/airbyte-internal-issues#16096 Related to airbytehq/oncall#11749 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 0e57414 commit 5579e29

1 file changed

Lines changed: 43 additions & 2 deletions

File tree

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
LOGGER = logging.getLogger("airbyte")
3131

32+
_MAX_DOWNLOAD_RETRIES = 1
33+
3234

3335
@dataclass
3436
class AsyncHttpJobRepository(AsyncJobRepository):
@@ -188,7 +190,9 @@ def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None:
188190
lazy_log(
189191
LOGGER,
190192
logging.DEBUG,
191-
lambda: f"Status of job {job.api_job_id()} changed from {job.status()} to {job_status}",
193+
lambda: (
194+
f"Status of job {job.api_job_id()} changed from {job.status()} to {job_status}"
195+
),
192196
)
193197
else:
194198
lazy_log(
@@ -205,14 +209,35 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
205209
"""
206210
Fetches records from the given job.
207211
212+
If a download fails due to an expired download URL (e.g., HTTP 403 from an expired
213+
SAS token), this method re-polls the polling endpoint to obtain fresh download URLs
214+
and retries the download.
215+
208216
Args:
209217
job (AsyncJob): The job to fetch records from.
210218
211219
Yields:
212220
Iterable[Mapping[str, Any]]: A generator that yields records as dictionaries.
213221
214222
"""
223+
for attempt in range(_MAX_DOWNLOAD_RETRIES + 1):
224+
try:
225+
yield from self._download_records_for_job(job)
226+
return
227+
except AirbyteTracedException as error:
228+
is_last_attempt = attempt >= _MAX_DOWNLOAD_RETRIES
229+
if is_last_attempt or not self._is_retriable_download_error(error):
230+
raise
231+
LOGGER.info(
232+
f"Download failed (attempt {attempt + 1}/{_MAX_DOWNLOAD_RETRIES + 1}), "
233+
f"re-polling for fresh download URLs. Error: {error.internal_message}"
234+
)
235+
self._refresh_polling_response(job)
236+
237+
yield from []
215238

239+
def _download_records_for_job(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
240+
"""Downloads and yields records from all download targets for a job."""
216241
for download_target in self._get_download_targets(job):
217242
job_slice = job.job_parameters()
218243
stream_slice = StreamSlice(
@@ -238,6 +263,20 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
238263

239264
yield from []
240265

266+
def _is_retriable_download_error(self, error: AirbyteTracedException) -> bool:
267+
"""Check if a download error is likely caused by an expired download URL.
268+
269+
HTTP 403 during the download phase typically indicates an expired pre-signed URL
270+
or SAS token, not a genuine permissions issue. Re-polling for a fresh URL may resolve it.
271+
"""
272+
return error.internal_message is not None and "status code '403'" in error.internal_message
273+
274+
def _refresh_polling_response(self, job: AsyncJob) -> None:
275+
"""Re-poll the polling endpoint to obtain a fresh response with updated download URLs."""
276+
stream_slice = self._get_create_job_stream_slice(job)
277+
polling_response = self._get_validated_polling_response(stream_slice)
278+
self._polling_job_response_by_id[job.api_job_id()] = polling_response
279+
241280
def abort(self, job: AsyncJob) -> None:
242281
if not self.abort_requester:
243282
return
@@ -340,7 +379,9 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
340379
lazy_log(
341380
LOGGER,
342381
logging.DEBUG,
343-
lambda: "No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`.",
382+
lambda: (
383+
"No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`."
384+
),
344385
)
345386
yield ""
346387
return

0 commit comments

Comments
 (0)