Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ tbl.append(df)

# or

tbl.merge_append(df)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant to expose this to the public API for a couple of reasons:

  • Unsure if folks know what the impact is between choosing fast- or merge appends.
  • It might also be that we do appends as part of the operation (upserts as an obvious one).
  • Another method to the public API :)

How about having something similar as in Java, to control this using a table property: https://iceberg.apache.org/docs/1.5.2/configuration/#table-behavior-properties

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! I am also +1 on let it controlled by the config. I made merge_append a separate API to mirror the Java side implementation, which has newAppend and newFastAppend APIs. But it seems better to just make the commit.manifest-merge.enabled default to False on python side.

I will still keep FastAppend and MergeAppend as separate class, and keep merge_append in UpdateSnapshot class to ensure clarity, although the current MergeAppend is purely FastAppend + manifest merge.

Just curious, why not Java side newAppend return an FastAppend impl when commit.manifest-merge.enabled is False. Is it due to some backward compatibiilty issue?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think the use-case of the Java library is slightly different, since that's mostly used in query engines.

Is it due to some backward compatibiilty issue?

I think it is for historical reasons, since the fast-append was added later on :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I like how you split it out in classes, it is much cleaner now 👍


# or

tbl.overwrite(df)
```

Expand Down
15 changes: 15 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |

## Table behavior options

| Key | Options | Default | Description |
| ------------------------------------ | ------------------- | ------- | ----------------------------------------------------------- |
| `commit.manifest.target-size-bytes` | Size in bytes | 8MB | Target size when merging manifest files |
| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files |
| `commit.manifest-merge.enabled` | Boolean | True | Controls whether to automatically merge manifests on writes |

<!-- prettier-ignore-start -->

!!! note "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table merge manifests on writes and respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead.

<!-- prettier-ignore-end -->

# FileIO

Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.
Expand Down
67 changes: 67 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ class ManifestEntry(Record):
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})

def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)

def _wrap_existing(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
Expand Down Expand Up @@ -654,6 +694,7 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

def __init__(
self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT
Expand All @@ -673,6 +714,7 @@ def __init__(
self._deleted_rows = 0
self._min_data_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -763,6 +805,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
Expand Down
Loading