@@ -1761,54 +1761,67 @@ def data_file_statistics_from_parquet_metadata(
17611761
17621762
17631763def write_file (io : FileIO , table_metadata : TableMetadata , tasks : Iterator [WriteTask ]) -> Iterator [DataFile ]:
1764- task = next (tasks )
1765-
1766- try :
1767- _ = next (tasks )
1768- # If there are more tasks, raise an exception
1769- raise NotImplementedError ("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208" )
1770- except StopIteration :
1771- pass
1772-
1773- parquet_writer_kwargs = _get_parquet_writer_kwargs (table_metadata .properties )
1774-
1775- file_path = f'{ table_metadata .location } /data/{ task .generate_data_file_filename ("parquet" )} '
17761764 schema = table_metadata .schema ()
17771765 arrow_file_schema = schema .as_arrow ()
1766+ parquet_writer_kwargs = _get_parquet_writer_kwargs (table_metadata .properties )
17781767
1779- fo = io .new_output (file_path )
17801768 row_group_size = PropertyUtil .property_as_int (
17811769 properties = table_metadata .properties ,
17821770 property_name = TableProperties .PARQUET_ROW_GROUP_SIZE_BYTES ,
17831771 default = TableProperties .PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT ,
17841772 )
1785- with fo .create (overwrite = True ) as fos :
1786- with pq .ParquetWriter (fos , schema = arrow_file_schema , ** parquet_writer_kwargs ) as writer :
1787- writer .write_table (task .df , row_group_size = row_group_size )
1788-
1789- statistics = data_file_statistics_from_parquet_metadata (
1790- parquet_metadata = writer .writer .metadata ,
1791- stats_columns = compute_statistics_plan (schema , table_metadata .properties ),
1792- parquet_column_mapping = parquet_path_to_id_mapping (schema ),
1793- )
1794- data_file = DataFile (
1795- content = DataFileContent .DATA ,
1796- file_path = file_path ,
1797- file_format = FileFormat .PARQUET ,
1798- partition = Record (),
1799- file_size_in_bytes = len (fo ),
1800- # After this has been fixed:
1801- # https://github.com/apache/iceberg-python/issues/271
1802- # sort_order_id=task.sort_order_id,
1803- sort_order_id = None ,
1804- # Just copy these from the table for now
1805- spec_id = table_metadata .default_spec_id ,
1806- equality_ids = None ,
1807- key_metadata = None ,
1808- ** statistics .to_serialized_dict (),
1809- )
18101773
1811- return iter ([data_file ])
1774+ def write_parquet (task : WriteTask ) -> DataFile :
1775+ file_path = f'{ table_metadata .location } /data/{ task .generate_data_file_filename ("parquet" )} '
1776+ fo = io .new_output (file_path )
1777+ with fo .create (overwrite = True ) as fos :
1778+ with pq .ParquetWriter (fos , schema = arrow_file_schema , ** parquet_writer_kwargs ) as writer :
1779+ writer .write (pa .Table .from_batches (task .record_batches ), row_group_size = row_group_size )
1780+
1781+ statistics = data_file_statistics_from_parquet_metadata (
1782+ parquet_metadata = writer .writer .metadata ,
1783+ stats_columns = compute_statistics_plan (schema , table_metadata .properties ),
1784+ parquet_column_mapping = parquet_path_to_id_mapping (schema ),
1785+ )
1786+ data_file = DataFile (
1787+ content = DataFileContent .DATA ,
1788+ file_path = file_path ,
1789+ file_format = FileFormat .PARQUET ,
1790+ partition = Record (),
1791+ file_size_in_bytes = len (fo ),
1792+ # After this has been fixed:
1793+ # https://github.com/apache/iceberg-python/issues/271
1794+ # sort_order_id=task.sort_order_id,
1795+ sort_order_id = None ,
1796+ # Just copy these from the table for now
1797+ spec_id = table_metadata .default_spec_id ,
1798+ equality_ids = None ,
1799+ key_metadata = None ,
1800+ ** statistics .to_serialized_dict (),
1801+ )
1802+
1803+ return data_file
1804+
1805+ executor = ExecutorFactory .get_or_create ()
1806+ data_files = executor .map (write_parquet , tasks )
1807+
1808+ return iter (data_files )
1809+
1810+
1811+ def bin_pack_arrow_table (tbl : pa .Table , target_file_size : int ) -> Iterator [List [pa .RecordBatch ]]:
1812+ from pyiceberg .utils .bin_packing import PackingIterator
1813+
1814+ avg_row_size_bytes = tbl .nbytes / tbl .num_rows
1815+ target_rows_per_file = target_file_size // avg_row_size_bytes
1816+ batches = tbl .to_batches (max_chunksize = target_rows_per_file )
1817+ bin_packed_record_batches = PackingIterator (
1818+ items = batches ,
1819+ target_weight = target_file_size ,
1820+ lookback = len (batches ), # ignore lookback
1821+ weight_func = lambda x : x .nbytes ,
1822+ largest_bin_first = False ,
1823+ )
1824+ return bin_packed_record_batches
18121825
18131826
18141827def parquet_files_to_data_files (io : FileIO , table_metadata : TableMetadata , file_paths : Iterator [str ]) -> Iterator [DataFile ]:
0 commit comments