Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None:
if job_status == AsyncJobStatus.COMPLETED:
self._polling_job_response_by_id[job.api_job_id()] = polling_response

def _refresh_download_url(self, job: AsyncJob) -> None:
"""
Re-polls the API to refresh the stored polling response before downloading.

Download URLs (e.g. Azure Blob Storage SAS tokens) may expire between
poll-completion and actual download when many concurrent streams delay
record fetching. Re-polling ensures the download URL is still valid.
"""
stream_slice = self._get_create_job_stream_slice(job)
polling_response = self._get_validated_polling_response(stream_slice)
self._polling_job_response_by_id[job.api_job_id()] = polling_response

def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
"""
Fetches records from the given job.
Expand All @@ -212,6 +224,7 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
Iterable[Mapping[str, Any]]: A generator that yields records as dictionaries.

"""
self._refresh_download_url(job)

for download_target in self._get_download_targets(job):
job_slice = job.job_parameters()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,43 @@ def test_given_multiple_urls_when_fetch_records_then_fetch_from_multiple_urls(se

assert len(records) == 2

def test_given_stale_polling_response_when_fetch_records_then_re_polls_for_fresh_url(
self,
) -> None:
"""
Verifies that fetch_records re-polls the API before downloading to get a fresh download URL.
This prevents failures when download URLs (e.g. Azure SAS tokens) expire between
poll-completion and the actual download, which can happen with many concurrent streams.
"""
stale_url = "https://stale.blob.example.com/report?sv=old-sas-token"
fresh_url = "https://fresh.blob.example.com/report?sv=new-sas-token"

self._mock_create_response(_A_JOB_ID)
# First poll (during update_jobs_status) returns the stale URL
# Second poll (during fetch_records refresh) returns the fresh URL
self._http_mocker.get(
HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"),
[
HttpResponse(
body=json.dumps({"id": _A_JOB_ID, "status": "ready", "urls": [stale_url]})
),
HttpResponse(
body=json.dumps({"id": _A_JOB_ID, "status": "ready", "urls": [fresh_url]})
),
],
)
# Only the fresh URL should be requested for download
self._http_mocker.get(
HttpRequest(url=fresh_url),
HttpResponse(body=_A_CSV_WITH_ONE_RECORD),
)

job = self._repository.start(_ANY_SLICE)
self._repository.update_jobs_status([job])
records = list(self._repository.fetch_records(job))

assert len(records) == 1

def _mock_create_response(self, job_id: str) -> None:
self._http_mocker.post(
HttpRequest(url=_EXPORT_URL),
Expand Down
Loading