Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time
from abc import ABC, abstractmethod
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from io import IOBase
from os import makedirs, path
Expand All @@ -24,6 +24,7 @@
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse


class FileReadMode(Enum):
Expand Down Expand Up @@ -98,14 +99,41 @@ def get_matching_files(
"""
...

def _parse_start_date(self, start_date_str: str) -> datetime:
"""Parse a start_date string, supporting both with and without microseconds.

AbstractFileBasedSpec accepts start_date in multiple formats as described by its
pattern_descriptor: "YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ".
The primary format (self.DATE_TIME_FORMAT) includes microseconds, but the spec also
allows the shorter "YYYY-MM-DDTHH:mm:ssZ" variant. This method tries the primary
format first and falls back to the shorter format without microseconds.

Note: this fallback is only relevant for start_date values provided by the user in the
connector configuration. Cursor values persisted in connector state are always formatted
using the default DATE_TIME_FORMAT (with microseconds).
"""
try:
return datetime.strptime(start_date_str, self.DATE_TIME_FORMAT)
except ValueError:
try:
return datetime.strptime(start_date_str, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
# ab_datetime_parse may return a timezone-aware datetime (e.g. for inputs
# like "2025-01-01T00:00:00+05:30"). We convert to UTC first so the offset
# is applied correctly, then strip tzinfo to produce a naive UTC datetime
# compatible with RemoteFile.last_modified comparisons.
return (
ab_datetime_parse(start_date_str).astimezone(timezone.utc).replace(tzinfo=None)
)

def filter_files_by_globs_and_start_date(
self, files: List[RemoteFile], globs: List[str]
) -> Iterable[RemoteFile]:
"""
Utility method for filtering files based on globs.
"""
start_date = (
datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
self._parse_start_date(self.config.start_date)
if self.config and self.config.start_date
else None
)
Expand Down
64 changes: 64 additions & 0 deletions unit_tests/sources/file_based/test_file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ def documentation_url(cls) -> AnyUrl:
set(),
id="all_csvs_modified_before_start_date",
),
pytest.param(
["**/*.csv"],
{"start_date": "2023-06-01T03:54:07Z", "streams": []},
{"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"},
set(),
id="all_csvs_start_date_without_microseconds",
),
pytest.param(
["**/*.csv"],
{"start_date": "2023-06-05T03:54:07.000Z", "streams": []},
Expand Down Expand Up @@ -494,6 +501,63 @@ def test_preserve_sub_directories_scenarios(
assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path)


@pytest.mark.parametrize(
"start_date_str, expected",
[
pytest.param(
"2025-01-01T00:00:00.000000Z",
datetime(2025, 1, 1, 0, 0, 0),
id="with_microseconds_zero",
),
pytest.param(
"2025-06-15T12:30:45.123456Z",
datetime(2025, 6, 15, 12, 30, 45, 123456),
id="with_microseconds_nonzero",
),
pytest.param(
"2025-01-01T00:00:00Z",
datetime(2025, 1, 1, 0, 0, 0),
id="without_microseconds",
),
pytest.param(
"2025-12-31T23:59:59Z",
datetime(2025, 12, 31, 23, 59, 59),
id="without_microseconds_end_of_day",
),
pytest.param(
"2025-01-01",
datetime(2025, 1, 1, 0, 0, 0),
id="date_only_ab_datetime_parse_fallback",
),
pytest.param(
"2025-01-01T00:00:00+05:30",
datetime(2024, 12, 31, 18, 30, 0),
id="with_timezone_offset_converted_to_utc",
),
],
)
def test_parse_start_date(start_date_str: str, expected: datetime) -> None:
reader = TestStreamReader()
assert reader._parse_start_date(start_date_str) == expected
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def test_parse_start_date_respects_overridden_date_time_format() -> None:
"""Verify that subclasses overriding DATE_TIME_FORMAT are honored by _parse_start_date."""

class CustomFormatReader(TestStreamReader):
DATE_TIME_FORMAT = "custom:%Y/%m/%d %H:%M:%S"

reader = CustomFormatReader()

assert reader._parse_start_date("custom:2025/01/01 00:00:00") == datetime(2025, 1, 1, 0, 0, 0)


def test_parse_start_date_invalid_raises() -> None:
reader = TestStreamReader()
with pytest.raises(ValueError):
reader._parse_start_date("not-a-date")


def test_upload_with_file_transfer_reader():
stream_reader = TestStreamReaderWithDefaultUpload()

Expand Down
Loading