Skip to content

Commit 27f1735

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
feat: add skip_rows_before_header and skip_rows_after_header to CsvDecoder
Add skip_rows_before_header and skip_rows_after_header optional integer properties (default: 0) to the declarative CsvDecoder schema and parser. This enables Connector Builder users to parse CSV/TSV responses that contain metadata lines before or after the header row. The implementation follows the same pattern already used by file-based connectors (CsvFormat in csv_format.py) and mirrors the approach used when set_values_to_none was added to CsvDecoder. Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 7f41401 commit 27f1735

File tree

4 files changed

+125
-66
lines changed

4 files changed

+125
-66
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3886,6 +3886,16 @@ definitions:
38863886
type: array
38873887
items:
38883888
type: string
3889+
skip_rows_before_header:
3890+
title: Skip Rows Before Header
3891+
description: The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.
3892+
type: integer
3893+
default: 0
3894+
skip_rows_after_header:
3895+
title: Skip Rows After Header
3896+
description: The number of rows to skip after the header row.
3897+
type: integer
3898+
default: 0
38893899
AsyncJobStatusMap:
38903900
description: Matches the api job status to Async Job Status.
38913901
type: object

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,24 +104,35 @@ class CsvParser(Parser):
104104
encoding: Optional[str] = "utf-8"
105105
delimiter: Optional[str] = ","
106106
set_values_to_none: Optional[List[str]] = None
107+
skip_rows_before_header: int = 0
108+
skip_rows_after_header: int = 0
107109

108110
def _get_delimiter(self) -> Optional[str]:
109-
"""
110-
Get delimiter from the configuration. Check for the escape character and decode it.
111-
"""
111+
"""Get delimiter from the configuration. Check for the escape character and decode it."""
112112
if self.delimiter is not None:
113113
if self.delimiter.startswith("\\"):
114114
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")
115115

116116
return self.delimiter
117117

118+
@staticmethod
119+
def _skip_rows(text_data: TextIOWrapper, rows_to_skip: int) -> None:
120+
"""Skip a specified number of rows from the current position in the text stream."""
121+
for _ in range(rows_to_skip):
122+
text_data.readline()
123+
118124
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
119-
"""
120-
Parse CSV data from decompressed bytes.
121-
"""
125+
"""Parse CSV data from decompressed bytes."""
122126
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
127+
self._skip_rows(text_data, self.skip_rows_before_header)
123128
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
129+
# After DictReader reads the header (first row it encounters), skip additional rows
130+
# We need to handle skip_rows_after_header by consuming rows from the reader
131+
skipped_after_header = 0
124132
for row in reader:
133+
if skipped_after_header < self.skip_rows_after_header:
134+
skipped_after_header += 1
135+
continue
125136
if self.set_values_to_none:
126137
row = {k: (None if v in self.set_values_to_none else v) for k, v in row.items()}
127138
yield row

0 commit comments

Comments
 (0)