Skip to content

Commit 089325f

Browse files
fix: update error message format in unstructured parser to match test expectations
Co-Authored-By: Aaron <AJ> Steers <aj@airbyte.io>
1 parent e77dc1b commit 089325f

3 files changed

Lines changed: 102 additions & 27 deletions

File tree

airbyte_cdk/sources/file_based/file_types/unstructured_parser.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,24 +138,35 @@ async def infer_schema(
138138
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
139139
filetype = self._get_filetype(file_handle, file)
140140
if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
141-
raise self._create_parse_error(
142-
file,
143-
self._get_file_type_error_message(filetype),
141+
error_message = self._get_file_type_error_message(filetype)
142+
logger.error(f"File {file.uri} has unsupported type: {error_message}")
143+
raise AirbyteTracedException(
144+
message=error_message,
145+
internal_message="Please check the logged errors for more information.",
146+
failure_type=FailureType.config_error,
144147
)
145148

146149
return {
147150
"content": {
148-
"type": "string",
151+
"type": ["null", "string"],
149152
"description": "Content of the file as markdown. Might be null if the file could not be parsed",
150153
},
151154
"document_key": {
152-
"type": "string",
155+
"type": ["null", "string"],
153156
"description": "Unique identifier of the document, e.g. the file path",
154157
},
155158
"_ab_source_file_parse_error": {
156-
"type": "string",
159+
"type": ["null", "string"],
157160
"description": "Error message if the file could not be parsed even though the file is supported",
158161
},
162+
"_ab_source_file_last_modified": {
163+
"type": ["null", "string"],
164+
"description": "Last modified timestamp of the source file",
165+
},
166+
"_ab_source_file_url": {
167+
"type": ["null", "string"],
168+
"description": "URL or path to the source file",
169+
},
159170
}
160171

161172
def parse_records(
@@ -199,7 +210,12 @@ def parse_records(
199210
}
200211
logger.warning(f"File {file.uri} cannot be parsed. Skipping it.")
201212
else:
202-
raise e
213+
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
214+
raise AirbyteTracedException(
215+
message="Please check the logged errors for more information.",
216+
internal_message=exception_str,
217+
failure_type=FailureType.config_error,
218+
)
203219
except Exception as e:
204220
exception_str = str(e)
205221
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
@@ -224,9 +240,12 @@ def _read_file(
224240
filetype: FileType | None = self._get_filetype(file_handle, remote_file)
225241

226242
if filetype is None or filetype not in self._supported_file_types():
227-
raise self._create_parse_error(
228-
remote_file,
229-
self._get_file_type_error_message(filetype),
243+
error_message = self._get_file_type_error_message(filetype)
244+
logger.error(f"File {remote_file.uri} has unsupported type: {error_message}")
245+
raise AirbyteTracedException(
246+
message=error_message,
247+
internal_message="Please check the logged errors for more information.",
248+
failure_type=FailureType.config_error,
230249
)
231250
if filetype in {FileType.MD, FileType.TXT}:
232251
try:

airbyte_cdk/sources/file_based/schema_helpers.py

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ def __lt__(self, other: Any) -> bool:
5454

5555

5656
def get_comparable_type(value: Any) -> Optional[ComparableType]:
57+
if isinstance(value, list):
58+
non_null_types = [item for item in value if item != "null"]
59+
if non_null_types:
60+
return get_comparable_type(non_null_types[0])
61+
else:
62+
return ComparableType.NULL
5763
if value == "null":
5864
return ComparableType.NULL
5965
if value == "boolean":
@@ -121,14 +127,42 @@ def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType:
121127

122128

123129
def _is_valid_type(t: JsonSchemaSupportedType) -> bool:
130+
if isinstance(t, list):
131+
return all(get_comparable_type(item) is not None for item in t)
124132
return t == "array" or get_comparable_type(t) is not None
125133

126134

127135
def _choose_wider_type(key: str, t1: Mapping[str, Any], t2: Mapping[str, Any]) -> Mapping[str, Any]:
128136
t1_type = t1["type"]
129137
t2_type = t2["type"]
130138

131-
if (t1_type == "array" or t2_type == "array") and t1 != t2:
139+
if isinstance(t1_type, list) and isinstance(t2_type, list):
140+
if set(t1_type).issubset(set(t2_type)):
141+
return t2
142+
elif set(t2_type).issubset(set(t1_type)):
143+
return t1
144+
else:
145+
combined_types = list(set(t1_type).union(set(t2_type)))
146+
result = dict(t1)
147+
result["type"] = combined_types
148+
return result
149+
elif isinstance(t1_type, list):
150+
if t2_type in t1_type:
151+
return t1
152+
else:
153+
combined_types = list(set(t1_type + [t2_type]))
154+
result = dict(t1)
155+
result["type"] = combined_types
156+
return result
157+
elif isinstance(t2_type, list):
158+
if t1_type in t2_type:
159+
return t2
160+
else:
161+
combined_types = list(set(t2_type + [t1_type]))
162+
result = dict(t2)
163+
result["type"] = combined_types
164+
return result
165+
elif (t1_type == "array" or t2_type == "array") and t1 != t2:
132166
raise SchemaInferenceError(
133167
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
134168
details="Cannot merge schema for unequal array types.",
@@ -149,20 +183,36 @@ def _choose_wider_type(key: str, t1: Mapping[str, Any], t2: Mapping[str, Any]) -
149183
detected_types=f"{t1},{t2}",
150184
)
151185
else:
152-
comparable_t1 = get_comparable_type(
153-
TYPE_PYTHON_MAPPING[t1_type][0]
154-
) # accessing the type_mapping value
155-
comparable_t2 = get_comparable_type(
156-
TYPE_PYTHON_MAPPING[t2_type][0]
157-
) # accessing the type_mapping value
158-
if not comparable_t1 and comparable_t2:
159-
raise SchemaInferenceError(
160-
FileBasedSourceError.UNRECOGNIZED_TYPE, key=key, detected_types=f"{t1},{t2}"
161-
)
162-
return max(
163-
[t1, t2],
164-
key=lambda x: ComparableType(get_comparable_type(TYPE_PYTHON_MAPPING[x["type"]][0])),
165-
) # accessing the type_mapping value
186+
if not isinstance(t1_type, list) and not isinstance(t2_type, list):
187+
comparable_t1 = get_comparable_type(
188+
TYPE_PYTHON_MAPPING[t1_type][0]
189+
) # accessing the type_mapping value
190+
comparable_t2 = get_comparable_type(
191+
TYPE_PYTHON_MAPPING[t2_type][0]
192+
) # accessing the type_mapping value
193+
if not comparable_t1 and comparable_t2:
194+
raise SchemaInferenceError(
195+
FileBasedSourceError.UNRECOGNIZED_TYPE, key=key, detected_types=f"{t1},{t2}"
196+
)
197+
return max(
198+
[t1, t2],
199+
key=lambda x: ComparableType(get_comparable_type(TYPE_PYTHON_MAPPING[x["type"]][0])),
200+
) # accessing the type_mapping value
201+
202+
combined_types = []
203+
if isinstance(t1_type, list):
204+
combined_types.extend(t1_type)
205+
else:
206+
combined_types.append(t1_type)
207+
208+
if isinstance(t2_type, list):
209+
combined_types.extend(t2_type)
210+
else:
211+
combined_types.append(t2_type)
212+
213+
result = dict(t1)
214+
result["type"] = list(set(combined_types))
215+
return result
166216

167217

168218
def is_equal_or_narrower_type(value: Any, expected_type: str) -> bool:

unit_tests/sources/file_based/scenarios/unstructured_scenarios.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,14 @@
2828
"type": ["null", "string"],
2929
"description": "Error message if the file could not be parsed even though the file is supported",
3030
},
31-
"_ab_source_file_last_modified": {"type": "string"},
32-
"_ab_source_file_url": {"type": "string"},
31+
"_ab_source_file_last_modified": {
32+
"type": ["null", "string"],
33+
"description": "Last modified timestamp of the source file",
34+
},
35+
"_ab_source_file_url": {
36+
"type": ["null", "string"],
37+
"description": "URL or path to the source file",
38+
},
3339
},
3440
}
3541

0 commit comments

Comments
 (0)