@@ -160,7 +160,9 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
160160 """
161161 Check if the `table_schema` is compatible with `other_schema`.
162162
163- Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.
163+ The schemas are compatible if:
164+ - All fields in `other_schema` are present in `table_schema`. (other_schema <= table_schema)
165+ - All required fields in `table_schema` are present in `other_schema`.
164166
165167 Raises:
166168 ValueError: If the schemas are not compatible.
@@ -169,15 +171,18 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
169171
170172 name_mapping = table_schema .name_mapping
171173 try :
172- task_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
174+ other_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
173175 except ValueError as e :
174176 other_schema = _pyarrow_to_schema_without_ids (other_schema )
175177 additional_names = set (other_schema .column_names ) - set (table_schema .column_names )
176178 raise ValueError (
177179 f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
178180 ) from e
179181
180- if table_schema .as_struct () != task_schema .as_struct ():
182+ missing_table_schema_fields = {field for field in other_schema .fields if field not in table_schema .fields }
183+ required_table_schema_fields = {field for field in table_schema .fields if field .required }
184+ missing_required_fields = {field for field in required_table_schema_fields if field not in other_schema .fields }
185+ if missing_table_schema_fields or missing_required_fields :
181186 from rich .console import Console
182187 from rich .table import Table as RichTable
183188
@@ -190,7 +195,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
190195
191196 for lhs in table_schema .fields :
192197 try :
193- rhs = task_schema .find_field (lhs .field_id )
198+ rhs = other_schema .find_field (lhs .field_id )
194199 rich_table .add_row ("✅" if lhs == rhs else "❌" , str (lhs ), str (rhs ))
195200 except ValueError :
196201 rich_table .add_row ("❌" , str (lhs ), "Missing" )
@@ -482,7 +487,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
482487 f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: { unsupported_partitions } ."
483488 )
484489
485- # _check_schema_compatible(self._table.schema(), other_schema=df.schema)
490+ _check_schema_compatible (self ._table .schema (), other_schema = df .schema )
486491
487492 with self .update_snapshot (snapshot_properties = snapshot_properties ).fast_append () as update_snapshot :
488493 # skip writing data files if the dataframe is empty
@@ -519,7 +524,7 @@ def overwrite(
519524 if len (self ._table .spec ().fields ) > 0 :
520525 raise ValueError ("Cannot write to partitioned tables" )
521526
522- # _check_schema_compatible(self._table.schema(), other_schema=df.schema)
527+ _check_schema_compatible (self ._table .schema (), other_schema = df .schema )
523528
524529 with self .update_snapshot (snapshot_properties = snapshot_properties ).overwrite () as update_snapshot :
525530 # skip writing data files if the dataframe is empty
0 commit comments