Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ def append(self, df: pa.Table) -> None:

# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
data_files = _dataframe_to_data_files(self, write_uuid=merge.commit_uuid, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

Expand Down Expand Up @@ -1032,7 +1032,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T

# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
data_files = _dataframe_to_data_files(self, write_uuid=merge.commit_uuid, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

Expand Down Expand Up @@ -2331,13 +2331,12 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int,
return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'


def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
def _dataframe_to_data_files(table: Table, write_uuid: uuid.UUID, df: pa.Table) -> Iterable[DataFile]:
from pyiceberg.io.pyarrow import write_file

if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

write_uuid = uuid.uuid4()
counter = itertools.count(0)

# This is an iter, so we don't have to materialize everything every time
Expand All @@ -2346,12 +2345,12 @@ def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:


class _MergingSnapshotProducer:
commit_uuid: uuid.UUID
_operation: Operation
_table: Table
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_commit_uuid: uuid.UUID

def __init__(self, operation: Operation, table: Table) -> None:
self._operation = operation
Expand All @@ -2360,7 +2359,7 @@ def __init__(self, operation: Operation, table: Table) -> None:
# Since we only support the main branch for now
self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None
self._added_data_files = []
self._commit_uuid = uuid.uuid4()
self.commit_uuid = uuid.uuid4()

def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -2405,7 +2404,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid)
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self.commit_uuid)
with write_manifest(
format_version=self._table.format_version,
spec=self._table.spec(),
Expand All @@ -2431,7 +2430,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if deleted_entries:
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self._commit_uuid)
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self.commit_uuid)
with write_manifest(
format_version=self._table.format_version,
spec=self._table.spec(),
Expand Down Expand Up @@ -2495,7 +2494,7 @@ def commit(self) -> Snapshot:
summary = self._summary()

manifest_list_file_path = _generate_manifest_list_path(
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self.commit_uuid
)
with write_manifest_list(
format_version=self._table.metadata.format_version,
Expand Down