-
Notifications
You must be signed in to change notification settings - Fork 485
Write support #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write support #41
Changes from 42 commits
7133054
ffecf72
25eb597
4cd493e
a726b1d
b88f736
f53626d
0c665ef
3f79dbd
02430bb
eb4dd62
c891382
aae5a57
cff3a1d
082387e
997b673
9d52906
8893cf3
55f27c9
9a0096b
4f5b710
926d947
50575a8
5482ae0
2fa01f4
580c824
254d7e8
f4ae6c5
760c0d4
3dba41a
bcc5176
6d5fbb1
3309129
12c4699
8ef1a06
aabfb09
149c3ec
17fd689
54e36ab
ab36ec3
d6df342
1398a2f
c426068
1861647
cebc781
4d0d11c
abda552
3cd5829
5f86b15
5044da6
e020efb
0b42471
a41abd0
286cf47
559618c
4153e78
54e75d6
158077c
bbc0b35
d441af9
e395a8f
2a65357
a013f35
abc0741
b817a15
48ba852
664e113
85ac0eb
7e8c04f
7baf3ec
ab020b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -103,7 +103,11 @@ | |
| OutputFile, | ||
| OutputStream, | ||
| ) | ||
| from pyiceberg.manifest import DataFile, FileFormat | ||
| from pyiceberg.manifest import ( | ||
| DataFile, | ||
| DataFileContent, | ||
| FileFormat, | ||
| ) | ||
| from pyiceberg.schema import ( | ||
| PartnerAccessor, | ||
| PreOrderSchemaVisitor, | ||
|
|
@@ -117,8 +121,9 @@ | |
| visit, | ||
| visit_with_partner, | ||
| ) | ||
| from pyiceberg.table import WriteTask, _generate_datafile_filename | ||
| from pyiceberg.transforms import TruncateTransform | ||
| from pyiceberg.typedef import EMPTY_DICT, Properties | ||
| from pyiceberg.typedef import EMPTY_DICT, Properties, Record | ||
| from pyiceberg.types import ( | ||
| BinaryType, | ||
| BooleanType, | ||
|
|
@@ -1445,16 +1450,13 @@ def parquet_path_to_id_mapping( | |
| def fill_parquet_file_metadata( | ||
| df: DataFile, | ||
| parquet_metadata: pq.FileMetaData, | ||
| file_size: int, | ||
| stats_columns: Dict[int, StatisticsCollector], | ||
| parquet_column_mapping: Dict[str, int], | ||
| ) -> None: | ||
| """ | ||
| Compute and fill the following fields of the DataFile object. | ||
|
|
||
| - file_format | ||
| - record_count | ||
| - file_size_in_bytes | ||
| - column_sizes | ||
| - value_counts | ||
| - null_value_counts | ||
|
|
@@ -1466,9 +1468,6 @@ def fill_parquet_file_metadata( | |
| Args: | ||
| df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled. | ||
| parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object. | ||
| file_size (int): The total compressed file size cannot be retrieved from the metadata and hence has to | ||
| be passed here. Depending on the kind of file system and pyarrow library call used, different | ||
| ways to obtain this value might be appropriate. | ||
| stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to | ||
| set the mode for column metrics collection | ||
| """ | ||
|
|
@@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata( | |
| del upper_bounds[field_id] | ||
| del null_value_counts[field_id] | ||
|
|
||
| df.file_format = FileFormat.PARQUET | ||
| df.record_count = parquet_metadata.num_rows | ||
| df.file_size_in_bytes = file_size | ||
| df.column_sizes = column_sizes | ||
| df.value_counts = value_counts | ||
| df.null_value_counts = null_value_counts | ||
| df.nan_value_counts = nan_value_counts | ||
| df.lower_bounds = lower_bounds | ||
| df.upper_bounds = upper_bounds | ||
| df.split_offsets = split_offsets | ||
|
|
||
|
|
||
| def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | ||
| task = next(tasks) | ||
|
|
||
| try: | ||
| _ = next(tasks) | ||
| # If there are more tasks, raise an exception | ||
| raise ValueError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") | ||
| except StopIteration: | ||
| pass | ||
|
|
||
| df = task.df | ||
|
|
||
| file_path = f'{table.location()}/data/{_generate_datafile_filename("parquet")}' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the Uniqueness is a combination of the task ID, the write-specific UUID, and the file counter for the task. The partition ordinal is used to preserve locality in file names. It would be nice to expose some of these options in the WriteTask, like a unique UUID, unique task ID, and part ordinal. Then the counter would be handled locally.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep in mind that we write a single file in the first iteration. I've moved the uuid and task-id to the |
||
| file_schema = schema_to_pyarrow(table.schema()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Fokko! I am working with @syun64 to test out the impending write feature. During the test, we realized the field ids are not being set in the written parquet file. The field_ids not written correctly in the parquet (current behavior) looks like: and the parquet schema after using a different metadata key for field id in the arrow schema to write the parquet file looks like: We feel it is a peculiar issue with pyarrow.parquet.ParquetWriter where we need to define the field_ids in the metadata of the pyarrow.schema conforming to a particular format like "PARQUET:field_id" instead of "field_id".
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @jqin61 for testing this as it is paramount that the field-IDs are written properly. I'm able to reproduce this locally: After changing this to Thanks for flagging this! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if I have Should I expect
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @robtandy thanks for chiming in here. I think the PyArrow to schema should also include the field-id metadata. When you create a new table, it should re-assign the field-ids if they are missing. |
||
|
|
||
| collected_metrics: List[pq.FileMetaData] = [] | ||
| fo = table.io.new_output(file_path) | ||
| with fo.create() as fos: | ||
|
Fokko marked this conversation as resolved.
Outdated
|
||
| with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: | ||
| writer.write_table(df) | ||
|
|
||
| df = DataFile( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's confusing that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, we should not do that 👍 |
||
| content=DataFileContent.DATA, | ||
| file_path=file_path, | ||
| file_format=FileFormat.PARQUET, | ||
| partition=Record(), | ||
| record_count=len(df), | ||
|
Fokko marked this conversation as resolved.
Outdated
|
||
| file_size_in_bytes=len(fo), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should also come from the write if possible so we don't have a S3 request here. |
||
| # Just copy these from the table for now | ||
| sort_order_id=table.sort_order().order_id, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the sort order of the table at the time the file was written, it is the sort used to order files in the data file. If we can't guarantee that the records are in this order we should not apply this metadata.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should come from the caller (write task?) and default to None. |
||
| spec_id=table.spec().spec_id, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an unpartitioned write, we need to ensure that this is the unpartitioned spec in the table.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We check if the partition spec is empty:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since Wouldn't it be easy to just pass the spec ID and partition tuple (an empty |
||
| equality_ids=table.schema().identifier_field_ids, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Data files are never written with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I messed up here, I thought it was referring to the |
||
| key_metadata=None, | ||
| ) | ||
| fill_parquet_file_metadata( | ||
| df=df, | ||
| parquet_metadata=collected_metrics[0], | ||
|
rdblue marked this conversation as resolved.
|
||
| stats_columns=compute_statistics_plan(table.schema(), table.properties), | ||
| parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), | ||
| ) | ||
| return iter([df]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,7 @@ | |
| from pyiceberg.io import FileIO, InputFile, OutputFile | ||
| from pyiceberg.partitioning import PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.typedef import Record | ||
| from pyiceberg.typedef import EMPTY_DICT, Record | ||
| from pyiceberg.types import ( | ||
| BinaryType, | ||
| BooleanType, | ||
|
|
@@ -60,6 +60,8 @@ | |
| DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 | ||
| DEFAULT_READ_VERSION: Literal[2] = 2 | ||
|
|
||
| INITIAL_SEQUENCE_NUMBER = 0 | ||
|
|
||
|
|
||
| class DataFileContent(int, Enum): | ||
| DATA = 0 | ||
|
|
@@ -504,7 +506,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition | |
| NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA), | ||
| NestedField(515, "sequence_number", LongType(), required=False, initial_default=0), | ||
| NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0), | ||
| NestedField(503, "added_snapshot_id", LongType(), required=False), | ||
| NestedField(503, "added_snapshot_id", LongType(), required=True), | ||
| NestedField(504, "added_files_count", IntegerType(), required=False), | ||
| NestedField(505, "existing_files_count", IntegerType(), required=False), | ||
| NestedField(506, "deleted_files_count", IntegerType(), required=False), | ||
|
|
@@ -517,6 +519,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition | |
|
|
||
| MANIFEST_FILE_SCHEMA_STRUCT = MANIFEST_FILE_SCHEMA.as_struct() | ||
|
|
||
|
|
||
| POSITIONAL_DELETE_SCHEMA = Schema( | ||
| NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType()) | ||
| ) | ||
|
|
@@ -665,7 +668,9 @@ class ManifestWriter(ABC): | |
| _min_data_sequence_number: Optional[int] | ||
| _partitions: List[Record] | ||
|
|
||
| def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str]): | ||
| def __init__( | ||
| self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT | ||
| ) -> None: | ||
| self.closed = False | ||
| self._spec = spec | ||
| self._schema = schema | ||
|
|
@@ -746,7 +751,7 @@ def to_manifest_file(self) -> ManifestFile: | |
| existing_rows_count=self._existing_rows, | ||
| deleted_rows_count=self._deleted_rows, | ||
| partitions=construct_partition_summaries(self._spec, self._schema, self._partitions), | ||
| key_metadatas=None, | ||
| key_metadata=None, | ||
| ) | ||
|
|
||
| def add_entry(self, entry: ManifestEntry) -> ManifestWriter: | ||
|
|
@@ -851,7 +856,7 @@ class ManifestListWriter(ABC): | |
| _commit_snapshot_id: int | ||
| _writer: AvroOutputFile[ManifestFile] | ||
|
|
||
| def __init__(self, output_file: OutputFile, meta: Dict[str, str]): | ||
| def __init__(self, output_file: OutputFile, meta: Dict[str, Any]): | ||
| self._output_file = output_file | ||
| self._meta = meta | ||
| self._manifest_files = [] | ||
|
|
@@ -884,7 +889,7 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite | |
|
|
||
|
|
||
| class ManifestListWriterV1(ManifestListWriter): | ||
| def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int): | ||
| def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]): | ||
| super().__init__( | ||
| output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"} | ||
| ) | ||
|
|
@@ -897,9 +902,11 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: | |
|
|
||
| class ManifestListWriterV2(ManifestListWriter): | ||
| _commit_snapshot_id: int | ||
| _sequence_number: int | ||
| _sequence_number: Optional[int] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is required for manifest list. This is always known when we write the manifest list because manifest lists are written for every commit attempt (after the sequence number has been updated).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, let me check! |
||
|
|
||
| def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int): | ||
| def __init__( | ||
| self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: Optional[int] | ||
| ): | ||
| super().__init__( | ||
| output_file, | ||
| { | ||
|
|
@@ -920,9 +927,9 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: | |
| # To validate this, check that the snapshot id matches the current commit | ||
| if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: | ||
| raise ValueError( | ||
| f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}" | ||
| f"Found unassigned sequence number for a manifest from snapshot: {self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}" | ||
| ) | ||
| wrapped_manifest_file.sequence_number = self._sequence_number | ||
| wrapped_manifest_file.sequence_number = self._sequence_number or INITIAL_SEQUENCE_NUMBER | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is correct because the sequence number should always be passed in. |
||
|
|
||
| if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ: | ||
| if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: | ||
|
|
@@ -931,12 +938,16 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: | |
| ) | ||
| # if the min sequence number is not determined, then there was no assigned sequence number for any file | ||
| # written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit | ||
| wrapped_manifest_file.min_sequence_number = self._sequence_number | ||
| wrapped_manifest_file.min_sequence_number = self._sequence_number or INITIAL_SEQUENCE_NUMBER | ||
| return wrapped_manifest_file | ||
|
|
||
|
|
||
| def write_manifest_list( | ||
| format_version: Literal[1, 2], output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int | ||
| format_version: Literal[1, 2], | ||
| output_file: OutputFile, | ||
| snapshot_id: int, | ||
| parent_snapshot_id: Optional[int], | ||
| sequence_number: Optional[int], | ||
| ) -> ManifestListWriter: | ||
| if format_version == 1: | ||
| return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.