diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 7443dccd6..f39d2860c 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -5,7 +5,7 @@ import logging import time from abc import ABC, abstractmethod -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from io import IOBase from os import makedirs, path @@ -24,6 +24,7 @@ from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile +from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse class FileReadMode(Enum): @@ -98,6 +99,33 @@ def get_matching_files( """ ... + def _parse_start_date(self, start_date_str: str) -> datetime: + """Parse a start_date string, supporting both with and without microseconds. + + AbstractFileBasedSpec accepts start_date in multiple formats as described by its + pattern_descriptor: "YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ". + The primary format (self.DATE_TIME_FORMAT) includes microseconds, but the spec also + allows the shorter "YYYY-MM-DDTHH:mm:ssZ" variant. This method tries the primary + format first and falls back to the shorter format without microseconds. + + Note: this fallback is only relevant for start_date values provided by the user in the + connector configuration. Cursor values persisted in connector state are always formatted + using the default DATE_TIME_FORMAT (with microseconds). + """ + try: + return datetime.strptime(start_date_str, self.DATE_TIME_FORMAT) + except ValueError: + try: + return datetime.strptime(start_date_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + # ab_datetime_parse may return a timezone-aware datetime (e.g. for inputs + # like "2025-01-01T00:00:00+05:30"). We convert to UTC first so the offset + # is applied correctly, then strip tzinfo to produce a naive UTC datetime + # compatible with RemoteFile.last_modified comparisons. + return ( + ab_datetime_parse(start_date_str).astimezone(timezone.utc).replace(tzinfo=None) + ) + def filter_files_by_globs_and_start_date( self, files: List[RemoteFile], globs: List[str] ) -> Iterable[RemoteFile]: @@ -105,7 +133,7 @@ def filter_files_by_globs_and_start_date( Utility method for filtering files based on globs. """ start_date = ( - datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) + self._parse_start_date(self.config.start_date) if self.config and self.config.start_date else None ) diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 13fa1025c..a8b8e7752 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -401,6 +401,13 @@ def documentation_url(cls) -> AnyUrl: set(), id="all_csvs_modified_before_start_date", ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-01T03:54:07Z", "streams": []}, + {"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"}, + set(), + id="all_csvs_start_date_without_microseconds", + ), pytest.param( ["**/*.csv"], {"start_date": "2023-06-05T03:54:07.000Z", "streams": []}, @@ -494,6 +501,63 @@ def test_preserve_sub_directories_scenarios( assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path) +@pytest.mark.parametrize( + "start_date_str, expected", + [ + pytest.param( + "2025-01-01T00:00:00.000000Z", + datetime(2025, 1, 1, 0, 0, 0), + id="with_microseconds_zero", + ), + pytest.param( + "2025-06-15T12:30:45.123456Z", + datetime(2025, 6, 15, 12, 30, 45, 123456), + id="with_microseconds_nonzero", + ), + pytest.param( + "2025-01-01T00:00:00Z", + datetime(2025, 1, 1, 0, 0, 0), + id="without_microseconds", + ), + pytest.param( + "2025-12-31T23:59:59Z", + datetime(2025, 12, 31, 23, 59, 59), + id="without_microseconds_end_of_day", + ), + pytest.param( + "2025-01-01", + datetime(2025, 1, 1, 0, 0, 0), + id="date_only_ab_datetime_parse_fallback", + ), + pytest.param( + "2025-01-01T00:00:00+05:30", + datetime(2024, 12, 31, 18, 30, 0), + id="with_timezone_offset_converted_to_utc", + ), + ], +) +def test_parse_start_date(start_date_str: str, expected: datetime) -> None: + reader = TestStreamReader() + assert reader._parse_start_date(start_date_str) == expected + + +def test_parse_start_date_respects_overridden_date_time_format() -> None: + """Verify that subclasses overriding DATE_TIME_FORMAT are honored by _parse_start_date.""" + + class CustomFormatReader(TestStreamReader): + DATE_TIME_FORMAT = "custom:%Y/%m/%d %H:%M:%S" + + reader = CustomFormatReader() + + assert reader._parse_start_date("custom:2025/01/01 00:00:00") == datetime(2025, 1, 1, 0, 0, 0) + + +def test_parse_start_date_invalid_raises() -> None: + reader = TestStreamReader() + with pytest.raises(ValueError): + reader._parse_start_date("not-a-date") + + def test_upload_with_file_transfer_reader(): stream_reader = TestStreamReaderWithDefaultUpload()