Skip to content

Commit 4817b00

Browse files
committed
Allow upserting into an empty table
1 parent 4e9c66d commit 4817b00

File tree

3 files changed

+48
-2
lines changed

3 files changed

+48
-2
lines changed

pyiceberg/table/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,9 @@ def overwrite(
579579
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
580580
)
581581

582-
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
582+
if overwrite_filter != AlwaysFalse():
583+
# Only delete when the filter is != AlwaysFalse
584+
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
583585

584586
with self._append_snapshot_producer(snapshot_properties) as append_files:
585587
# skip writing data files if the dataframe is empty

pyiceberg/table/upsert_util.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre
3636
unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])
3737

3838
if len(join_cols) == 1:
39-
return In(join_cols[0], unique_keys[0].to_pylist())
39+
keys = unique_keys[0].to_pylist()
40+
if len(keys) > 0:
41+
return AlwaysFalse()
42+
else:
43+
return In(join_cols[0], keys)
4044
else:
4145
filters: List[BooleanExpression] = [
4246
cast(BooleanExpression, And(*[EqualTo(col, row[col]) for col in join_cols])) for row in unique_keys.to_pylist()
@@ -67,6 +71,10 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6771

6872
non_key_cols = list(all_columns - join_cols_set)
6973

74+
if len(target_table) == 0:
75+
# When the target table is empty, there is nothing to update :)
76+
return source_table.schema.empty_table()
77+
7078
match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])
7179

7280
matching_source_rows = source_table.filter(match_expr)

tests/table/test_upsert.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,42 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
371371
assert upd.rows_inserted == 1
372372

373373

374+
def test_upsert_into_empty_table(catalog: Catalog) -> None:
375+
identifier = "default.test_upsert_into_empty_table"
376+
_drop_table(catalog, identifier)
377+
378+
schema = Schema(
379+
NestedField(1, "city", StringType(), required=True),
380+
NestedField(2, "inhabitants", IntegerType(), required=True),
381+
# Mark City as the identifier field, also known as the primary-key
382+
identifier_field_ids=[1],
383+
)
384+
385+
tbl = catalog.create_table(identifier, schema=schema)
386+
387+
arrow_schema = pa.schema(
388+
[
389+
pa.field("city", pa.string(), nullable=False),
390+
pa.field("inhabitants", pa.int32(), nullable=False),
391+
]
392+
)
393+
394+
# Write some data
395+
df = pa.Table.from_pylist(
396+
[
397+
{"city": "Amsterdam", "inhabitants": 921402},
398+
{"city": "San Francisco", "inhabitants": 808988},
399+
{"city": "Drachten", "inhabitants": 45019},
400+
{"city": "Paris", "inhabitants": 2103000},
401+
],
402+
schema=arrow_schema,
403+
)
404+
upd = tbl.upsert(df)
405+
406+
assert upd.rows_updated == 0
407+
assert upd.rows_inserted == 4
408+
409+
374410
def test_create_match_filter_single_condition() -> None:
375411
"""
376412
Test create_match_filter with a composite key where the source yields exactly one unique key.

0 commit comments

Comments
 (0)