Skip to content

Commit 53c696f

Browse files
devin-ai-integration[bot]agarctfidarynaishchenkooctavia-squidington-iiigithub-code-quality[bot]
authored
feat(file-based): Add Calamine-first with Openpyxl fallback for Excel parser (#850)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu> Co-authored-by: darynaishchenko <darina.ishchenko17@gmail.com> Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <169859663+agarctfi@users.noreply.github.com>
1 parent a67a6cf commit 53c696f

File tree

6 files changed

+207
-19
lines changed

6 files changed

+207
-19
lines changed

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ class RecordParseError(BaseFileBasedSourceError):
9292
pass
9393

9494

95+
class ExcelCalamineParsingError(BaseFileBasedSourceError):
96+
"""Raised when Calamine engine fails to parse an Excel file."""
97+
98+
pass
99+
100+
95101
class SchemaInferenceError(BaseFileBasedSourceError):
96102
pass
97103

airbyte_cdk/sources/file_based/file_types/excel_parser.py

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import logging
6+
import warnings
67
from io import IOBase
78
from pathlib import Path
89
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union
@@ -17,6 +18,7 @@
1718
)
1819
from airbyte_cdk.sources.file_based.exceptions import (
1920
ConfigValidationError,
21+
ExcelCalamineParsingError,
2022
FileBasedSourceError,
2123
RecordParseError,
2224
)
@@ -64,7 +66,7 @@ async def infer_schema(
6466
fields: Dict[str, str] = {}
6567

6668
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
67-
df = self.open_and_parse_file(fp)
69+
df = self.open_and_parse_file(fp, logger, file)
6870
for column, df_type in df.dtypes.items():
6971
# Choose the broadest data type if the column's data type differs in dataframes
7072
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
@@ -92,7 +94,7 @@ def parse_records(
9294
discovered_schema: Optional[Mapping[str, SchemaType]] = None,
9395
) -> Iterable[Dict[str, Any]]:
9496
"""
95-
Parses records from an Excel file based on the provided configuration.
97+
Parses records from an Excel file with fallback error handling.
9698
9799
Args:
98100
config (FileBasedStreamConfig): Configuration for the file-based stream.
@@ -111,7 +113,7 @@ def parse_records(
111113
try:
112114
# Open and parse the file using the stream reader
113115
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
114-
df = self.open_and_parse_file(fp)
116+
df = self.open_and_parse_file(fp, logger, file)
115117
# Yield records as dictionaries
116118
# DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
117119
# DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
@@ -180,15 +182,93 @@ def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None:
180182
logger.info(f"Expected ExcelFormat, got {excel_format}")
181183
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
182184

183-
@staticmethod
184-
def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame:
185+
def _open_and_parse_file_with_calamine(
186+
self,
187+
fp: Union[IOBase, str, Path],
188+
logger: logging.Logger,
189+
file: RemoteFile,
190+
) -> pd.DataFrame:
191+
"""Opens and parses Excel file using Calamine engine.
192+
193+
Args:
194+
fp: File pointer to the Excel file.
195+
logger: Logger for logging information and errors.
196+
file: Remote file information for logging context.
197+
198+
Returns:
199+
pd.DataFrame: Parsed data from the Excel file.
200+
201+
Raises:
202+
ExcelCalamineParsingError: If Calamine fails to parse the file.
203+
"""
204+
try:
205+
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
206+
except BaseException as exc:
207+
# Calamine engine raises PanicException(child of BaseException) if Calamine fails to parse the file.
208+
# Checking if ValueError in exception arg to know if it was actually an error during parsing due to invalid values in cells.
209+
# Otherwise, raise an exception.
210+
if "ValueError" in str(exc):
211+
logger.warning(
212+
f"Calamine parsing failed for {file.file_uri_for_logging}, falling back to openpyxl: {exc}"
213+
)
214+
raise ExcelCalamineParsingError(
215+
f"Calamine engine failed to parse {file.file_uri_for_logging}",
216+
filename=file.uri,
217+
) from exc
218+
raise exc
219+
220+
def _open_and_parse_file_with_openpyxl(
221+
self,
222+
fp: Union[IOBase, str, Path],
223+
logger: logging.Logger,
224+
file: RemoteFile,
225+
) -> pd.DataFrame:
226+
"""Opens and parses Excel file using Openpyxl engine.
227+
228+
Args:
229+
fp: File pointer to the Excel file.
230+
logger: Logger for logging information and errors.
231+
file: Remote file information for logging context.
232+
233+
Returns:
234+
pd.DataFrame: Parsed data from the Excel file.
185235
"""
186-
Opens and parses the Excel file.
236+
# Some file-like objects are not seekable.
237+
if hasattr(fp, "seek"):
238+
try:
239+
fp.seek(0) # type: ignore [union-attr]
240+
except OSError as exc:
241+
logger.info(
242+
f"Could not rewind stream for {file.file_uri_for_logging}; "
243+
f"proceeding with openpyxl from current position: {exc}"
244+
)
245+
246+
with warnings.catch_warnings(record=True) as warning_records:
247+
warnings.simplefilter("always")
248+
df = pd.ExcelFile(fp, engine="openpyxl").parse() # type: ignore [arg-type, call-overload]
249+
250+
for warning in warning_records:
251+
logger.warning(f"Openpyxl warning for {file.file_uri_for_logging}: {warning.message}")
252+
253+
return df # type: ignore [no-any-return]
254+
255+
def open_and_parse_file(
256+
self,
257+
fp: Union[IOBase, str, Path],
258+
logger: logging.Logger,
259+
file: RemoteFile,
260+
) -> pd.DataFrame:
261+
"""Opens and parses the Excel file with Calamine-first and Openpyxl fallback.
187262
188263
Args:
189264
fp: File pointer to the Excel file.
265+
logger: Logger for logging information and errors.
266+
file: Remote file information for logging context.
190267
191268
Returns:
192269
pd.DataFrame: Parsed data from the Excel file.
193270
"""
194-
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
271+
try:
272+
return self._open_and_parse_file_with_calamine(fp, logger, file)
273+
except ExcelCalamineParsingError:
274+
return self._open_and_parse_file_with_openpyxl(fp, logger, file)

airbyte_cdk/sources/file_based/remote_file.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ class RemoteFile(BaseModel):
1717
last_modified: datetime
1818
mime_type: Optional[str] = None
1919

20+
@property
21+
def file_uri_for_logging(self) -> str:
22+
"""Returns a user-friendly identifier for logging."""
23+
return self.uri
24+
2025

2126
class UploadableRemoteFile(RemoteFile, ABC):
2227
"""
@@ -49,13 +54,6 @@ def source_file_relative_path(self) -> str:
4954
"""
5055
return self.uri
5156

52-
@property
53-
def file_uri_for_logging(self) -> str:
54-
"""
55-
Returns the URI for the file being logged.
56-
"""
57-
return self.uri
58-
5957
@property
6058
def source_uri(self) -> str:
6159
"""

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pdf2image = { version = "1.16.3", optional = true }
7373
pyarrow = { version = "^19.0.0", optional = true }
7474
pytesseract = { version = "0.3.10", optional = true } # Used indirectly by unstructured library
7575
python-calamine = { version = "0.2.3", optional = true } # TODO: Remove if unused
76+
openpyxl = { version = "^3.1.0", optional = true }
7677
python-snappy = { version = "0.7.3", optional = true } # TODO: remove if unused
7778
tiktoken = { version = "0.8.0", optional = true }
7879
nltk = { version = "3.9.1", optional = true }
@@ -120,7 +121,7 @@ deptry = "^0.23.0"
120121
dagger-io = "0.19.0"
121122

122123
[tool.poetry.extras]
123-
file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfminer.six", "unstructured.pytesseract", "pytesseract", "markdown", "python-calamine", "python-snappy"]
124+
file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfminer.six", "unstructured.pytesseract", "pytesseract", "markdown", "python-calamine", "openpyxl", "python-snappy"]
124125
vector-db-based = ["langchain_community", "langchain_core", "langchain_text_splitters", "openai", "cohere", "tiktoken"]
125126
sql = ["sqlalchemy"]
126127
dev = ["pytest"]
@@ -252,6 +253,7 @@ DEP002 = [
252253
"cohere",
253254
"markdown",
254255
"openai",
256+
"openpyxl",
255257
"pdf2image",
256258
"pdfminer.six",
257259
"pytesseract",

unit_tests/sources/file_based/file_types/test_excel_parser.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
import datetime
7+
import warnings
78
from io import BytesIO
89
from unittest.mock import MagicMock, Mock, mock_open, patch
910

@@ -136,3 +137,104 @@ def test_file_read_error(mock_stream_reader, mock_logger, file_config, remote_fi
136137
list(
137138
parser.parse_records(file_config, remote_file, mock_stream_reader, mock_logger)
138139
)
140+
141+
142+
class FakePanic(BaseException):
143+
"""Simulates the PyO3 PanicException which does not inherit from Exception."""
144+
145+
146+
def test_open_and_parse_file_falls_back_to_openpyxl(mock_logger):
147+
parser = ExcelParser()
148+
fp = BytesIO(b"test")
149+
remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now())
150+
151+
fallback_df = pd.DataFrame({"a": [1]})
152+
153+
calamine_excel_file = MagicMock()
154+
155+
def calamine_parse_side_effect():
156+
raise FakePanic(
157+
"failed to construct date: PyErr { type: <class 'ValueError'>, value: ValueError('year 20225 is out of range'), traceback: None }"
158+
)
159+
160+
calamine_excel_file.parse.side_effect = calamine_parse_side_effect
161+
162+
openpyxl_excel_file = MagicMock()
163+
164+
def openpyxl_parse_side_effect():
165+
warnings.warn("Cell A146 has invalid date", UserWarning)
166+
return fallback_df
167+
168+
openpyxl_excel_file.parse.side_effect = openpyxl_parse_side_effect
169+
170+
with (
171+
patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel,
172+
):
173+
mock_excel.side_effect = [calamine_excel_file, openpyxl_excel_file]
174+
175+
result = parser.open_and_parse_file(fp, mock_logger, remote_file)
176+
177+
pd.testing.assert_frame_equal(result, fallback_df)
178+
assert mock_logger.warning.call_count == 2
179+
assert "Openpyxl warning" in mock_logger.warning.call_args_list[1].args[0]
180+
181+
182+
def test_open_and_parse_file_does_not_swallow_system_exit(mock_logger):
183+
"""Test that SystemExit is not caught by the BaseException handler.
184+
185+
This test ensures that critical system-level exceptions like SystemExit and KeyboardInterrupt
186+
are not accidentally caught and suppressed by our BaseException handler in the Calamine parsing
187+
method. These exceptions should always propagate up to allow proper program termination.
188+
"""
189+
parser = ExcelParser()
190+
fp = BytesIO(b"test")
191+
remote_file = RemoteFile(uri="s3://mybucket/test.xlsx", last_modified=datetime.datetime.now())
192+
193+
with patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel:
194+
mock_excel.return_value.parse.side_effect = SystemExit()
195+
196+
with pytest.raises(SystemExit):
197+
parser.open_and_parse_file(fp, mock_logger, remote_file)
198+
199+
200+
@pytest.mark.parametrize(
201+
"exc_cls",
202+
[
203+
pytest.param(OSError, id="os-error"),
204+
],
205+
)
206+
def test_openpyxl_logs_info_when_seek_fails(mock_logger, remote_file, exc_cls):
207+
"""Test that openpyxl logs info when seek fails on non-seekable files.
208+
209+
This test ensures that when falling back to openpyxl, if the file pointer
210+
cannot be rewound (seek fails with OSError), an info-level log is emitted
211+
and parsing proceeds from the current position.
212+
"""
213+
parser = ExcelParser()
214+
fallback_df = pd.DataFrame({"a": [1]})
215+
216+
class FakeFP:
217+
"""Fake file-like object with a seek method that raises an exception."""
218+
219+
def __init__(self, exc):
220+
self._exc = exc
221+
222+
def seek(self, *args, **kwargs):
223+
raise self._exc("not seekable")
224+
225+
fp = FakeFP(exc_cls)
226+
227+
openpyxl_excel_file = MagicMock()
228+
openpyxl_excel_file.parse.return_value = fallback_df
229+
230+
with patch("airbyte_cdk.sources.file_based.file_types.excel_parser.pd.ExcelFile") as mock_excel:
231+
mock_excel.return_value = openpyxl_excel_file
232+
233+
result = parser._open_and_parse_file_with_openpyxl(fp, mock_logger, remote_file)
234+
235+
pd.testing.assert_frame_equal(result, fallback_df)
236+
mock_logger.info.assert_called_once()
237+
msg = mock_logger.info.call_args[0][0]
238+
assert "Could not rewind stream" in msg
239+
assert remote_file.file_uri_for_logging in msg
240+
mock_excel.assert_called_once_with(fp, engine="openpyxl")

0 commit comments

Comments
 (0)