diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index d837ed902..4d71e56c2 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -201,6 +201,18 @@ def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None: if job_status == AsyncJobStatus.COMPLETED: self._polling_job_response_by_id[job.api_job_id()] = polling_response + def _refresh_download_url(self, job: AsyncJob) -> None: + """ + Re-polls the API to refresh the stored polling response before downloading. + + Download URLs (e.g. Azure Blob Storage SAS tokens) may expire between + poll-completion and actual download when many concurrent streams delay + record fetching. Re-polling ensures the download URL is still valid. + """ + stream_slice = self._get_create_job_stream_slice(job) + polling_response = self._get_validated_polling_response(stream_slice) + self._polling_job_response_by_id[job.api_job_id()] = polling_response + def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: """ Fetches records from the given job. @@ -212,6 +224,7 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: Iterable[Mapping[str, Any]]: A generator that yields records as dictionaries. """ + self._refresh_download_url(job) for download_target in self._get_download_targets(job): job_slice = job.job_parameters() diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 473c3d99e..aa6f2802b 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -205,6 +205,43 @@ def test_given_multiple_urls_when_fetch_records_then_fetch_from_multiple_urls(se assert len(records) == 2 + def test_given_stale_polling_response_when_fetch_records_then_re_polls_for_fresh_url( + self, + ) -> None: + """ + Verifies that fetch_records re-polls the API before downloading to get a fresh download URL. + This prevents failures when download URLs (e.g. Azure SAS tokens) expire between + poll-completion and the actual download, which can happen with many concurrent streams. + """ + stale_url = "https://stale.blob.example.com/report?sv=old-sas-token" + fresh_url = "https://fresh.blob.example.com/report?sv=new-sas-token" + + self._mock_create_response(_A_JOB_ID) + # First poll (during update_jobs_status) returns the stale URL + # Second poll (during fetch_records refresh) returns the fresh URL + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), + [ + HttpResponse( + body=json.dumps({"id": _A_JOB_ID, "status": "ready", "urls": [stale_url]}) + ), + HttpResponse( + body=json.dumps({"id": _A_JOB_ID, "status": "ready", "urls": [fresh_url]}) + ), + ], + ) + # Only the fresh URL should be requested for download + self._http_mocker.get( + HttpRequest(url=fresh_url), + HttpResponse(body=_A_CSV_WITH_ONE_RECORD), + ) + + job = self._repository.start(_ANY_SLICE) + self._repository.update_jobs_status([job]) + records = list(self._repository.fetch_records(job)) + + assert len(records) == 1 + def _mock_create_response(self, job_id: str) -> None: self._http_mocker.post( HttpRequest(url=_EXPORT_URL),