@@ -161,7 +161,9 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
161161 """
162162 Check if the `table_schema` is compatible with `other_schema`.
163163
164- Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.
164+ The schemas are compatible if:
165+ - All fields in `other_schema` are present in `table_schema`. (other_schema <= table_schema)
166+ - All required fields in `table_schema` are present in `other_schema`.
165167
166168 Raises:
167169 ValueError: If the schemas are not compatible.
@@ -170,15 +172,18 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
170172
171173 name_mapping = table_schema .name_mapping
172174 try :
173- task_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
175+ other_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
174176 except ValueError as e :
175177 other_schema = _pyarrow_to_schema_without_ids (other_schema )
176178 additional_names = set (other_schema .column_names ) - set (table_schema .column_names )
177179 raise ValueError (
178180 f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
179181 ) from e
180182
181- if table_schema .as_struct () != task_schema .as_struct ():
183+ missing_table_schema_fields = {field for field in other_schema .fields if field not in table_schema .fields }
184+ required_table_schema_fields = {field for field in table_schema .fields if field .required }
185+ missing_required_fields = {field for field in required_table_schema_fields if field not in other_schema .fields }
186+ if missing_table_schema_fields or missing_required_fields :
182187 from rich .console import Console
183188 from rich .table import Table as RichTable
184189
@@ -191,7 +196,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") ->
191196
192197 for lhs in table_schema .fields :
193198 try :
194- rhs = task_schema .find_field (lhs .field_id )
199+ rhs = other_schema .find_field (lhs .field_id )
195200 rich_table .add_row ("✅" if lhs == rhs else "❌" , str (lhs ), str (rhs ))
196201 except ValueError :
197202 rich_table .add_row ("❌" , str (lhs ), "Missing" )
@@ -483,7 +488,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
483488 f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: { unsupported_partitions } ."
484489 )
485490
486- # _check_schema_compatible(self._table.schema(), other_schema=df.schema)
491+ _check_schema_compatible (self ._table .schema (), other_schema = df .schema )
487492
488493 with self .update_snapshot (snapshot_properties = snapshot_properties ).fast_append () as update_snapshot :
489494 # skip writing data files if the dataframe is empty
@@ -520,7 +525,7 @@ def overwrite(
520525 if len (self ._table .spec ().fields ) > 0 :
521526 raise ValueError ("Cannot write to partitioned tables" )
522527
523- # _check_schema_compatible(self._table.schema(), other_schema=df.schema)
528+ _check_schema_compatible (self ._table .schema (), other_schema = df .schema )
524529
525530 with self .update_snapshot (snapshot_properties = snapshot_properties ).overwrite () as update_snapshot :
526531 # skip writing data files if the dataframe is empty
0 commit comments