Skip to content

Commit 22baf60

Browse files
[python] Fix duplicate _ROW_ID when file exceeds read batch size (#7626)
When a data file contains more than 1024 rows, upsert_by_arrow_with_key fails with: `ValueError: Input data contains duplicate _ROW_ID values`. This PR fixes above issue by advancing first_row_id in DataFileBatchReader._assign_row_tracking after each batch.
1 parent 6a8167f commit 22baf60

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

paimon-python/pypaimon/read/reader/data_file_batch_reader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
211211
idx = self.system_fields[SpecialFields.ROW_ID.name]
212212
# Create a new array that fills with computed row IDs
213213
arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id + record_batch.num_rows), type=pa.int64())
214+
self.first_row_id += record_batch.num_rows
214215

215216
# Handle _SEQUENCE_NUMBER field
216217
if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():

paimon-python/pypaimon/tests/data_evolution_test.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,3 +1593,43 @@ def test_vortex_with_slice(self):
15931593
sliced = rb.new_read().to_arrow(scan.plan().splits())
15941594
self.assertEqual(sliced.num_rows, 3)
15951595
self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])
1596+
1597+
def test_large_file_read(self):
1598+
pa_schema = pa.schema([
1599+
('id', pa.int32()),
1600+
('name', pa.string()),
1601+
])
1602+
schema = Schema.from_pyarrow_schema(pa_schema, options={
1603+
'row-tracking.enabled': 'true',
1604+
'data-evolution.enabled': 'true',
1605+
})
1606+
self.catalog.create_table('default.test_large_file_row_id', schema, False)
1607+
table = self.catalog.get_table('default.test_large_file_row_id')
1608+
1609+
# Write >1024 rows in a single file
1610+
num_rows = 2000
1611+
data = pa.Table.from_pydict({
1612+
'id': list(range(num_rows)),
1613+
'name': [f'name_{i}' for i in range(num_rows)],
1614+
}, schema=pa_schema)
1615+
1616+
wb = table.new_batch_write_builder()
1617+
tw = wb.new_write()
1618+
tc = wb.new_commit()
1619+
tw.write_arrow(data)
1620+
tc.commit(tw.prepare_commit())
1621+
tw.close()
1622+
tc.close()
1623+
1624+
update_ids = list(range(0, 1500))
1625+
upsert_data = pa.Table.from_pydict({
1626+
'id': update_ids,
1627+
'name': [f'upsert_{i}' for i in update_ids],
1628+
}, schema=pa_schema)
1629+
1630+
wb = table.new_batch_write_builder()
1631+
tu = wb.new_update()
1632+
msgs = tu.upsert_by_arrow_with_key(upsert_data, ['id'])
1633+
tc = wb.new_commit()
1634+
tc.commit(msgs)
1635+
tc.close()

0 commit comments

Comments
 (0)