2222from abc import abstractmethod
2323from collections import defaultdict
2424from concurrent .futures import Future
25- from dataclasses import dataclass
25+ from dataclasses import dataclass , field
2626from functools import cached_property
2727from typing import TYPE_CHECKING , Any , Callable , Dict , Generic , List , Optional , Set , Tuple
2828
@@ -480,8 +480,8 @@ def _deleted_entries(self) -> List[ManifestEntry]:
480480
481481@dataclass (init = False )
482482class RewriteManifestsResult :
483- rewritten_manifests : List [ManifestFile ]
484- added_manifests : List [ManifestFile ]
483+ rewritten_manifests : List [ManifestFile ] = field ( default_factory = list )
484+ added_manifests : List [ManifestFile ] = field ( default_factory = list )
485485
486486 def __init__ (
487487 self ,
@@ -544,7 +544,11 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
544544
545545
546546class _RewriteManifests (_SnapshotProducer ["_RewriteManifests" ]):
547+ _table : Table
548+ _spec_id : int
547549 _target_size_bytes : int
550+ _min_count_to_merge : int
551+ _merge_enabled : bool
548552 rewritten_manifests : List [ManifestFile ] = []
549553 added_manifests : List [ManifestFile ] = []
550554 kept_manifests : List [ManifestFile ] = []
@@ -559,10 +563,15 @@ def __init__(
559563 ):
560564 from pyiceberg .table import TableProperties
561565
562- _table : Table
563- _spec : PartitionSpec
564-
565566 super ().__init__ (Operation .REPLACE , transaction , io , snapshot_properties = snapshot_properties )
567+
568+ snapshot = self ._table .current_snapshot ()
569+ if self ._spec_id and self ._spec_id not in self ._table .specs ():
570+ raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
571+
572+ if not snapshot :
573+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
574+
566575 self ._target_size_bytes = property_as_int (
567576 self ._transaction .table_metadata .properties ,
568577 TableProperties .MANIFEST_TARGET_SIZE_BYTES ,
@@ -571,6 +580,17 @@ def __init__(
571580 self ._table = table
572581 self ._spec_id = spec_id or table .spec ().spec_id
573582
583+ self ._min_count_to_merge = property_as_int (
584+ self ._transaction .table_metadata .properties ,
585+ TableProperties .MANIFEST_MIN_MERGE_COUNT ,
586+ TableProperties .MANIFEST_MIN_MERGE_COUNT_DEFAULT ,
587+ ) # type: ignore
588+ self ._merge_enabled = property_as_bool (
589+ self ._transaction .table_metadata .properties ,
590+ TableProperties .MANIFEST_MERGE_ENABLED ,
591+ TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
592+ )
593+
574594 def _summary (self , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> Summary :
575595 from pyiceberg .table import TableProperties
576596
@@ -583,10 +603,10 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
583603 ssc .set_partition_summary_limit (partition_summary_limit )
584604
585605 props = {
586- "manifests-kept" : str ( len ([])) ,
606+ "manifests-kept" : "0" ,
587607 "manifests-created" : str (len (self .added_manifests )),
588608 "manifests-replaced" : str (len (self .rewritten_manifests )),
589- "entries-processed" : str ( len ([])) ,
609+ "entries-processed" : "0" ,
590610 }
591611 previous_snapshot = (
592612 self ._transaction .table_metadata .snapshot_by_id (self ._parent_snapshot_id )
@@ -601,28 +621,25 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
601621 )
602622
603623 def rewrite_manifests (self ) -> RewriteManifestsResult :
604- data_result = self ._find_matching_manifests (ManifestContent .DATA )
624+ snapshot = self ._table .current_snapshot ()
625+ if not snapshot :
626+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
627+
628+ data_result = self ._find_matching_manifests (snapshot , ManifestContent .DATA )
605629
606630 self .rewritten_manifests .extend (data_result .rewritten_manifests )
607631 self .added_manifests .extend (data_result .added_manifests )
608632
609- deletes_result = self ._find_matching_manifests (ManifestContent .DELETES )
633+ deletes_result = self ._find_matching_manifests (snapshot , ManifestContent .DELETES )
610634 self .rewritten_manifests .extend (deletes_result .rewritten_manifests )
611635 self .added_manifests .extend (deletes_result .added_manifests )
612636
613- if not self .rewritten_manifests :
637+ if len ( self .rewritten_manifests ) == 0 :
614638 return RewriteManifestsResult (rewritten_manifests = [], added_manifests = [])
615639
616640 return RewriteManifestsResult (rewritten_manifests = self .rewritten_manifests , added_manifests = self .added_manifests )
617641
618- def _find_matching_manifests (self , content : ManifestContent ) -> RewriteManifestsResult :
619- snapshot = self ._table .current_snapshot ()
620- if self ._spec_id and self ._spec_id not in self ._table .specs ():
621- raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
622-
623- if not snapshot :
624- raise ValueError ("Cannot rewrite manifests without a current snapshot" )
625-
642+ def _find_matching_manifests (self , snapshot : Snapshot , content : ManifestContent ) -> RewriteManifestsResult :
626643 manifests = [
627644 manifest
628645 for manifest in snapshot .manifests (io = self ._io )
@@ -631,8 +648,8 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests
631648
632649 data_manifest_merge_manager = _ManifestMergeManager (
633650 target_size_bytes = self ._target_size_bytes ,
634- min_count_to_merge = 2 ,
635- merge_enabled = True ,
651+ min_count_to_merge = self . _min_count_to_merge ,
652+ merge_enabled = self . _merge_enabled ,
636653 snapshot_producer = self ,
637654 )
638655 new_manifests = data_manifest_merge_manager .merge_manifests (manifests = manifests )
@@ -668,10 +685,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
668685 return [self ._copy_manifest_file (manifest , self .snapshot_id ) for manifest in self .added_manifests ]
669686
670687 def _deleted_entries (self ) -> List [ManifestEntry ]:
671- """To determine if we need to record any deleted manifest entries.
672-
673- In case of an append, nothing is deleted.
674- """
688+ """To determine if we need to record any deleted manifest entries."""
675689 return []
676690
677691
0 commit comments