Skip to content

Commit f1c1f8f

Browse files
authored
Allow upserting into an empty table (#1699)
When you upsert into an empty table, it fails because it tries to search for records to be updated 🤔
1 parent f62f67e commit f1c1f8f

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

pyiceberg/table/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,9 @@ def overwrite(
579579
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
580580
)
581581

582-
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
582+
if overwrite_filter != AlwaysFalse():
583+
# Only delete when the filter is != AlwaysFalse
584+
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
583585

584586
with self._append_snapshot_producer(snapshot_properties) as append_files:
585587
# skip writing data files if the dataframe is empty

pyiceberg/table/upsert_util.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6767

6868
non_key_cols = list(all_columns - join_cols_set)
6969

70+
if len(target_table) == 0:
71+
# When the target table is empty, there is nothing to update :)
72+
return source_table.schema.empty_table()
73+
7074
match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])
7175

7276
matching_source_rows = source_table.filter(match_expr)

tests/table/test_upsert.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,42 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
372372
assert upd.rows_inserted == 1
373373

374374

375+
def test_upsert_into_empty_table(catalog: Catalog) -> None:
376+
identifier = "default.test_upsert_into_empty_table"
377+
_drop_table(catalog, identifier)
378+
379+
schema = Schema(
380+
NestedField(1, "city", StringType(), required=True),
381+
NestedField(2, "inhabitants", IntegerType(), required=True),
382+
# Mark City as the identifier field, also known as the primary-key
383+
identifier_field_ids=[1],
384+
)
385+
386+
tbl = catalog.create_table(identifier, schema=schema)
387+
388+
arrow_schema = pa.schema(
389+
[
390+
pa.field("city", pa.string(), nullable=False),
391+
pa.field("inhabitants", pa.int32(), nullable=False),
392+
]
393+
)
394+
395+
# Write some data
396+
df = pa.Table.from_pylist(
397+
[
398+
{"city": "Amsterdam", "inhabitants": 921402},
399+
{"city": "San Francisco", "inhabitants": 808988},
400+
{"city": "Drachten", "inhabitants": 45019},
401+
{"city": "Paris", "inhabitants": 2103000},
402+
],
403+
schema=arrow_schema,
404+
)
405+
upd = tbl.upsert(df)
406+
407+
assert upd.rows_updated == 0
408+
assert upd.rows_inserted == 4
409+
410+
375411
def test_create_match_filter_single_condition() -> None:
376412
"""
377413
Test create_match_filter with a composite key where the source yields exactly one unique key.

0 commit comments

Comments
 (0)