Skip to content

Commit 19a7083

Browse files
devin-ai-integration[bot]bot_apkagarctfiZaneHyattAB
authored
fix: validate empty CSV column names and improve mismatch error messages (#1010)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai> Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu> Co-authored-by: Zane Hyatt <zane.hyatt@airbyte.io>
1 parent d3d1346 commit 19a7083

3 files changed

Lines changed: 160 additions & 31 deletions

File tree

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ class FileBasedSourceError(Enum):
2424
)
2525
ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
2626
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
27-
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
28-
ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
27+
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = (
28+
"CSV data row contains more columns than the header row defines."
29+
)
30+
ERROR_PARSING_RECORD_MISMATCHED_ROWS = (
31+
"CSV data row contains fewer columns than the header row defines."
32+
)
2933
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
3034
NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
3135
UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."

airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,26 @@ def read_data(
6565
doublequote=config_format.double_quote,
6666
quoting=csv.QUOTE_MINIMAL,
6767
)
68-
with stream_reader.open_file(file, file_read_mode, config_format.encoding, logger) as fp:
69-
try:
70-
headers = self._get_headers(fp, config_format, dialect_name)
71-
except UnicodeError:
72-
raise AirbyteTracedException(
73-
message=f"{FileBasedSourceError.ENCODING_ERROR.value} Expected encoding: {config_format.encoding}",
68+
try:
69+
with stream_reader.open_file(
70+
file, file_read_mode, config_format.encoding, logger
71+
) as fp:
72+
try:
73+
headers = self._get_headers(fp, config_format, dialect_name)
74+
except UnicodeError:
75+
raise AirbyteTracedException(
76+
message=f"{FileBasedSourceError.ENCODING_ERROR.value} Expected encoding: {config_format.encoding}",
77+
)
78+
79+
rows_to_skip = (
80+
config_format.skip_rows_before_header
81+
+ (1 if config_format.header_definition.has_header_row() else 0)
82+
+ config_format.skip_rows_after_header
7483
)
84+
self._skip_rows(fp, rows_to_skip)
85+
lineno += rows_to_skip
7586

76-
rows_to_skip = (
77-
config_format.skip_rows_before_header
78-
+ (1 if config_format.header_definition.has_header_row() else 0)
79-
+ config_format.skip_rows_after_header
80-
)
81-
self._skip_rows(fp, rows_to_skip)
82-
lineno += rows_to_skip
83-
84-
reader = csv.DictReader(fp, dialect=dialect_name, fieldnames=headers) # type: ignore
85-
try:
87+
reader = csv.DictReader(fp, dialect=dialect_name, fieldnames=headers) # type: ignore
8688
for row in reader:
8789
lineno += 1
8890

@@ -111,14 +113,11 @@ def read_data(
111113
lineno=lineno,
112114
)
113115
yield row
114-
finally:
115-
# due to RecordParseError or GeneratorExit
116-
csv.unregister_dialect(dialect_name)
116+
finally:
117+
csv.unregister_dialect(dialect_name)
117118

118119
def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str) -> List[str]:
119-
"""
120-
Assumes the fp is pointing to the beginning of the files and will reset it as such
121-
"""
120+
"""Assumes the fp is pointing to the beginning of the files and will reset it as such."""
122121
# Note that this method assumes the dialect has already been registered if we're parsing the headers
123122
if isinstance(config_format.header_definition, CsvHeaderUserProvided):
124123
return config_format.header_definition.column_names
@@ -134,6 +133,14 @@ def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str)
134133
reader = csv.reader(fp, dialect=dialect_name) # type: ignore
135134
headers = list(next(reader))
136135

136+
empty_count = sum(1 for h in headers if not h or h.isspace())
137+
if empty_count:
138+
raise AirbyteTracedException(
139+
message="CSV header row contains empty column name(s). Remove trailing delimiters or empty columns from the header row.",
140+
internal_message=f"Found {empty_count} empty/whitespace-only column name(s) in header: {headers}",
141+
failure_type=FailureType.config_error,
142+
)
143+
137144
fp.seek(0)
138145
return headers
139146

@@ -227,7 +234,7 @@ def parse_records(
227234
logger: logging.Logger,
228235
discovered_schema: Optional[Mapping[str, SchemaType]],
229236
) -> Iterable[Dict[str, Any]]:
230-
line_no = 0
237+
data_generator = None
231238
try:
232239
config_format = _extract_format(config)
233240
if discovered_schema:
@@ -244,19 +251,15 @@ def parse_records(
244251
config, file, stream_reader, logger, self.file_read_mode
245252
)
246253
for row in data_generator:
247-
line_no += 1
248254
yield CsvParser._to_nullable(
249255
cast_fn(row),
250256
deduped_property_types,
251257
config_format.null_values,
252258
config_format.strings_can_be_null,
253259
)
254-
except RecordParseError as parse_err:
255-
raise RecordParseError(
256-
FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no
257-
) from parse_err
258260
finally:
259-
data_generator.close()
261+
if data_generator is not None:
262+
data_generator.close()
260263

261264
@property
262265
def file_read_mode(self) -> FileReadMode:

unit_tests/sources/file_based/file_types/test_csv_parser.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Any, Dict, Generator, List, Set
1212
from unittest import TestCase, mock
1313
from unittest.mock import Mock
14+
from uuid import uuid4
1415

1516
import pytest
1617

@@ -785,3 +786,124 @@ def test_encoding_is_passed_to_stream_reader() -> None:
785786
mock.call().__exit__(None, None, None),
786787
]
787788
)
789+
790+
791+
@pytest.mark.parametrize(
792+
"header_row, expected_empty_count",
793+
[
794+
pytest.param("col1,col2,col3,,,", 3, id="trailing_empty_columns"),
795+
pytest.param("col1,,col3", 1, id="middle_empty_column"),
796+
pytest.param(",col2,col3", 1, id="leading_empty_column"),
797+
pytest.param("col1,col2, ", 1, id="whitespace_only_column"),
798+
],
799+
)
800+
def test_get_headers_raises_on_empty_column_names(
801+
header_row: str, expected_empty_count: int
802+
) -> None:
803+
csv_reader = _CsvReader()
804+
config_format = CsvFormat()
805+
fp = io.StringIO(header_row)
806+
807+
dialect_name = f"test_{uuid4()}"
808+
csv.register_dialect(
809+
dialect_name,
810+
delimiter=config_format.delimiter,
811+
quotechar=config_format.quote_char,
812+
escapechar=config_format.escape_char,
813+
doublequote=config_format.double_quote,
814+
quoting=csv.QUOTE_MINIMAL,
815+
)
816+
817+
try:
818+
with pytest.raises(AirbyteTracedException) as exc_info:
819+
csv_reader._get_headers(fp, config_format, dialect_name)
820+
821+
assert exc_info.value.failure_type == FailureType.config_error
822+
assert "empty column name" in exc_info.value.message
823+
assert f"{expected_empty_count} empty" in exc_info.value.internal_message
824+
finally:
825+
csv.unregister_dialect(dialect_name)
826+
827+
828+
def test_get_headers_accepts_valid_headers() -> None:
829+
csv_reader = _CsvReader()
830+
config_format = CsvFormat()
831+
fp = io.StringIO("col1,col2,col3")
832+
833+
dialect_name = f"test_{uuid4()}"
834+
csv.register_dialect(
835+
dialect_name,
836+
delimiter=config_format.delimiter,
837+
quotechar=config_format.quote_char,
838+
escapechar=config_format.escape_char,
839+
doublequote=config_format.double_quote,
840+
quoting=csv.QUOTE_MINIMAL,
841+
)
842+
843+
try:
844+
headers = csv_reader._get_headers(fp, config_format, dialect_name)
845+
assert headers == ["col1", "col2", "col3"]
846+
finally:
847+
csv.unregister_dialect(dialect_name)
848+
849+
850+
def test_read_data_raises_on_empty_column_names() -> None:
851+
config_format = CsvFormat()
852+
config = Mock()
853+
config.name = "config_name"
854+
config.format = config_format
855+
856+
file = RemoteFile(uri="test.csv", last_modified=datetime.now())
857+
stream_reader = Mock(spec=AbstractFileBasedStreamReader)
858+
logger = Mock(spec=logging.Logger)
859+
csv_reader = _CsvReader()
860+
861+
stream_reader.open_file.return_value = (
862+
CsvFileBuilder().with_data(["col1,col2,col3,,,", "v1,v2,v3,v4,v5,v6"]).build()
863+
)
864+
865+
with pytest.raises(AirbyteTracedException) as exc_info:
866+
list(
867+
csv_reader.read_data(
868+
config,
869+
file,
870+
stream_reader,
871+
logger,
872+
FileReadMode.READ,
873+
)
874+
)
875+
876+
assert exc_info.value.failure_type == FailureType.config_error
877+
assert "empty column name" in exc_info.value.message
878+
879+
880+
def test_parse_records_preserves_mismatch_error_detail() -> None:
881+
config_format = CsvFormat()
882+
config = FileBasedStreamConfig(
883+
name="test",
884+
validation_policy="Emit Record",
885+
file_type="csv",
886+
format=config_format,
887+
)
888+
889+
file = RemoteFile(uri="test.csv", last_modified=datetime.now())
890+
stream_reader = Mock()
891+
mock_obj = stream_reader.open_file.return_value
892+
mock_obj.__enter__ = Mock(return_value=io.StringIO("header\ntoo many values,value,value,value"))
893+
mock_obj.__exit__ = Mock(return_value=None)
894+
895+
parser = CsvParser()
896+
897+
with pytest.raises(RecordParseError) as exc_info:
898+
list(
899+
parser.parse_records(
900+
config,
901+
file,
902+
stream_reader,
903+
logger,
904+
{"properties": {"header": {"type": "string"}}},
905+
)
906+
)
907+
908+
error_msg = str(exc_info.value)
909+
assert "more columns than the header" in error_msg

0 commit comments

Comments
 (0)