Skip to content

Commit 3a276a5

Browse files
Merge main branch and regenerate poetry.lock
Resolves merge conflicts with main branch while preserving the langchain-core version bump to ^1.2.5 (resolves to 1.2.6) to address CVE-2025-68664. Co-Authored-By: unknown <>
2 parents aa6c244 + 80bcfc5 commit 3a276a5

9 files changed

Lines changed: 252 additions & 51 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2110,11 +2110,11 @@ def create_default_stream(
21102110
name=stream_name,
21112111
json_schema=schema_loader.get_json_schema,
21122112
primary_key=get_primary_key_from_stream(primary_key),
2113-
cursor_field=concurrent_cursor.cursor_field
2114-
if hasattr(concurrent_cursor, "cursor_field")
2115-
else CursorField(
2116-
cursor_field_key=""
2117-
), # FIXME we should have the cursor field has part of the interface of cursor,
2113+
cursor_field=(
2114+
concurrent_cursor.cursor_field
2115+
if hasattr(concurrent_cursor, "cursor_field")
2116+
else None
2117+
),
21182118
logger=logging.getLogger(f"airbyte.{stream_name}"),
21192119
cursor=concurrent_cursor,
21202120
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
@@ -4339,8 +4339,14 @@ def _get_catalog_defined_cursor_field(
43394339
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
43404340

43414341
# Depending on the operation is being performed, there may not be a configured stream yet. In this
4342-
# case we return None which will then use the default cursor field defined on the cursor model
4343-
if not configured_stream or not configured_stream.cursor_field:
4342+
# case we return None which will then use the default cursor field defined on the cursor model.
4343+
# We also treat cursor_field: [""] (list with empty string) as no cursor field, since this can
4344+
# occur when the platform serializes "no cursor configured" streams incorrectly.
4345+
if (
4346+
not configured_stream
4347+
or not configured_stream.cursor_field
4348+
or not configured_stream.cursor_field[0]
4349+
):
43444350
return None
43454351
elif len(configured_stream.cursor_field) > 1:
43464352
raise ValueError(

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ class RecordParseError(BaseFileBasedSourceError):
9292
pass
9393

9494

95+
class ExcelCalamineParsingError(BaseFileBasedSourceError):
96+
"""Raised when Calamine engine fails to parse an Excel file."""
97+
98+
pass
99+
100+
95101
class SchemaInferenceError(BaseFileBasedSourceError):
96102
pass
97103

airbyte_cdk/sources/file_based/file_types/excel_parser.py

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import logging
6+
import warnings
67
from io import IOBase
78
from pathlib import Path
89
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union
@@ -17,6 +18,7 @@
1718
)
1819
from airbyte_cdk.sources.file_based.exceptions import (
1920
ConfigValidationError,
21+
ExcelCalamineParsingError,
2022
FileBasedSourceError,
2123
RecordParseError,
2224
)
@@ -64,7 +66,7 @@ async def infer_schema(
6466
fields: Dict[str, str] = {}
6567

6668
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
67-
df = self.open_and_parse_file(fp)
69+
df = self.open_and_parse_file(fp, logger, file)
6870
for column, df_type in df.dtypes.items():
6971
# Choose the broadest data type if the column's data type differs in dataframes
7072
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
@@ -92,7 +94,7 @@ def parse_records(
9294
discovered_schema: Optional[Mapping[str, SchemaType]] = None,
9395
) -> Iterable[Dict[str, Any]]:
9496
"""
95-
Parses records from an Excel file based on the provided configuration.
97+
Parses records from an Excel file with fallback error handling.
9698
9799
Args:
98100
config (FileBasedStreamConfig): Configuration for the file-based stream.
@@ -111,7 +113,7 @@ def parse_records(
111113
try:
112114
# Open and parse the file using the stream reader
113115
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
114-
df = self.open_and_parse_file(fp)
116+
df = self.open_and_parse_file(fp, logger, file)
115117
# Yield records as dictionaries
116118
# DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
117119
# DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
@@ -180,15 +182,93 @@ def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None:
180182
logger.info(f"Expected ExcelFormat, got {excel_format}")
181183
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
182184

183-
@staticmethod
184-
def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame:
185+
def _open_and_parse_file_with_calamine(
186+
self,
187+
fp: Union[IOBase, str, Path],
188+
logger: logging.Logger,
189+
file: RemoteFile,
190+
) -> pd.DataFrame:
191+
"""Opens and parses Excel file using Calamine engine.
192+
193+
Args:
194+
fp: File pointer to the Excel file.
195+
logger: Logger for logging information and errors.
196+
file: Remote file information for logging context.
197+
198+
Returns:
199+
pd.DataFrame: Parsed data from the Excel file.
200+
201+
Raises:
202+
ExcelCalamineParsingError: If Calamine fails to parse the file.
203+
"""
204+
try:
205+
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
206+
except BaseException as exc:
207+
# Calamine engine raises PanicException(child of BaseException) if Calamine fails to parse the file.
208+
# Checking if ValueError in exception arg to know if it was actually an error during parsing due to invalid values in cells.
209+
# Otherwise, raise an exception.
210+
if "ValueError" in str(exc):
211+
logger.warning(
212+
f"Calamine parsing failed for {file.file_uri_for_logging}, falling back to openpyxl: {exc}"
213+
)
214+
raise ExcelCalamineParsingError(
215+
f"Calamine engine failed to parse {file.file_uri_for_logging}",
216+
filename=file.uri,
217+
) from exc
218+
raise exc
219+
220+
def _open_and_parse_file_with_openpyxl(
221+
self,
222+
fp: Union[IOBase, str, Path],
223+
logger: logging.Logger,
224+
file: RemoteFile,
225+
) -> pd.DataFrame:
226+
"""Opens and parses Excel file using Openpyxl engine.
227+
228+
Args:
229+
fp: File pointer to the Excel file.
230+
logger: Logger for logging information and errors.
231+
file: Remote file information for logging context.
232+
233+
Returns:
234+
pd.DataFrame: Parsed data from the Excel file.
185235
"""
186-
Opens and parses the Excel file.
236+
# Some file-like objects are not seekable.
237+
if hasattr(fp, "seek"):
238+
try:
239+
fp.seek(0) # type: ignore [union-attr]
240+
except OSError as exc:
241+
logger.info(
242+
f"Could not rewind stream for {file.file_uri_for_logging}; "
243+
f"proceeding with openpyxl from current position: {exc}"
244+
)
245+
246+
with warnings.catch_warnings(record=True) as warning_records:
247+
warnings.simplefilter("always")
248+
df = pd.ExcelFile(fp, engine="openpyxl").parse() # type: ignore [arg-type, call-overload]
249+
250+
for warning in warning_records:
251+
logger.warning(f"Openpyxl warning for {file.file_uri_for_logging}: {warning.message}")
252+
253+
return df # type: ignore [no-any-return]
254+
255+
def open_and_parse_file(
256+
self,
257+
fp: Union[IOBase, str, Path],
258+
logger: logging.Logger,
259+
file: RemoteFile,
260+
) -> pd.DataFrame:
261+
"""Opens and parses the Excel file with Calamine-first and Openpyxl fallback.
187262
188263
Args:
189264
fp: File pointer to the Excel file.
265+
logger: Logger for logging information and errors.
266+
file: Remote file information for logging context.
190267
191268
Returns:
192269
pd.DataFrame: Parsed data from the Excel file.
193270
"""
194-
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
271+
try:
272+
return self._open_and_parse_file_with_calamine(fp, logger, file)
273+
except ExcelCalamineParsingError:
274+
return self._open_and_parse_file_with_openpyxl(fp, logger, file)

airbyte_cdk/sources/file_based/remote_file.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ class RemoteFile(BaseModel):
1717
last_modified: datetime
1818
mime_type: Optional[str] = None
1919

20+
@property
21+
def file_uri_for_logging(self) -> str:
22+
"""Returns a user-friendly identifier for logging."""
23+
return self.uri
24+
2025

2126
class UploadableRemoteFile(RemoteFile, ABC):
2227
"""
@@ -49,13 +54,6 @@ def source_file_relative_path(self) -> str:
4954
"""
5055
return self.uri
5156

52-
@property
53-
def file_uri_for_logging(self) -> str:
54-
"""
55-
Returns the URI for the file being logged.
56-
"""
57-
return self.uri
58-
5957
@property
6058
def source_uri(self) -> str:
6159
"""

airbyte_cdk/sources/streams/concurrent/helpers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def get_cursor_field_from_stream(stream: Stream) -> Optional[str]:
3434
raise ValueError(
3535
f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}"
3636
)
37-
elif len(stream.cursor_field) == 0:
37+
elif len(stream.cursor_field) == 0 or not stream.cursor_field[0]:
38+
# Treat cursor_field: [""] (list with empty string) as no cursor field
3839
return None
3940
else:
4041
return stream.cursor_field[0]

poetry.lock

Lines changed: 28 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)