2020from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Set , Tuple
2121
2222from pyiceberg .conversions import from_bytes
23- from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , PartitionFieldSummary
23+ from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , ManifestFile , PartitionFieldSummary
2424from pyiceberg .partitioning import PartitionSpec
2525from pyiceberg .table .snapshots import Snapshot , ancestors_of
2626from pyiceberg .types import PrimitiveType
@@ -523,7 +523,62 @@ def history(self) -> "pa.Table":
523523
524524 return pa .Table .from_pylist (history , schema = history_schema )
525525
526- def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
526+ def _files_by_manifest (
527+ self , manifest_list : ManifestFile , data_file_filter : Optional [Set [DataFileContent ]] = None
528+ ) -> List [Dict [str , Any ]]:
529+ files : list [dict [str , Any ]] = []
530+ schema = self .tbl .metadata .schema ()
531+ io = self .tbl .io
532+
533+ for manifest_entry in manifest_list .fetch_manifest_entry (io ):
534+ data_file = manifest_entry .data_file
535+ if data_file_filter and data_file .content not in data_file_filter :
536+ continue
537+ column_sizes = data_file .column_sizes or {}
538+ value_counts = data_file .value_counts or {}
539+ null_value_counts = data_file .null_value_counts or {}
540+ nan_value_counts = data_file .nan_value_counts or {}
541+ lower_bounds = data_file .lower_bounds or {}
542+ upper_bounds = data_file .upper_bounds or {}
543+ readable_metrics = {
544+ schema .find_column_name (field .field_id ): {
545+ "column_size" : column_sizes .get (field .field_id ),
546+ "value_count" : value_counts .get (field .field_id ),
547+ "null_value_count" : null_value_counts .get (field .field_id ),
548+ "nan_value_count" : nan_value_counts .get (field .field_id ),
549+ "lower_bound" : from_bytes (field .field_type , lower_bound )
550+ if (lower_bound := lower_bounds .get (field .field_id ))
551+ else None ,
552+ "upper_bound" : from_bytes (field .field_type , upper_bound )
553+ if (upper_bound := upper_bounds .get (field .field_id ))
554+ else None ,
555+ }
556+ for field in self .tbl .metadata .schema ().fields
557+ }
558+ files .append (
559+ {
560+ "content" : data_file .content ,
561+ "file_path" : data_file .file_path ,
562+ "file_format" : data_file .file_format ,
563+ "spec_id" : data_file .spec_id ,
564+ "record_count" : data_file .record_count ,
565+ "file_size_in_bytes" : data_file .file_size_in_bytes ,
566+ "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
567+ "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
568+ "null_value_counts" : dict (data_file .null_value_counts ) if data_file .null_value_counts is not None else None ,
569+ "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
570+ "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
571+ "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
572+ "key_metadata" : data_file .key_metadata ,
573+ "split_offsets" : data_file .split_offsets ,
574+ "equality_ids" : data_file .equality_ids ,
575+ "sort_order_id" : data_file .sort_order_id ,
576+ "readable_metrics" : readable_metrics ,
577+ }
578+ )
579+ return files
580+
581+ def _get_files_schema (self ) -> "pa.Schema" :
527582 import pyarrow as pa
528583
529584 from pyiceberg .io .pyarrow import schema_to_pyarrow
@@ -570,70 +625,27 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
570625 pa .field ("readable_metrics" , pa .struct (readable_metrics_struct ), nullable = True ),
571626 ]
572627 )
628+ return files_schema
629+
630+ def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
631+ import pyarrow as pa
573632
574633 files : list [dict [str , Any ]] = []
575634
576635 if not snapshot_id and not self .tbl .metadata .current_snapshot ():
577636 return pa .Table .from_pylist (
578637 files ,
579- schema = files_schema ,
638+ schema = self . _get_files_schema () ,
580639 )
581640 snapshot = self ._get_snapshot (snapshot_id )
582641
583642 io = self .tbl .io
584643 for manifest_list in snapshot .manifests (io ):
585- for manifest_entry in manifest_list .fetch_manifest_entry (io ):
586- data_file = manifest_entry .data_file
587- if data_file_filter and data_file .content not in data_file_filter :
588- continue
589- column_sizes = data_file .column_sizes or {}
590- value_counts = data_file .value_counts or {}
591- null_value_counts = data_file .null_value_counts or {}
592- nan_value_counts = data_file .nan_value_counts or {}
593- lower_bounds = data_file .lower_bounds or {}
594- upper_bounds = data_file .upper_bounds or {}
595- readable_metrics = {
596- schema .find_column_name (field .field_id ): {
597- "column_size" : column_sizes .get (field .field_id ),
598- "value_count" : value_counts .get (field .field_id ),
599- "null_value_count" : null_value_counts .get (field .field_id ),
600- "nan_value_count" : nan_value_counts .get (field .field_id ),
601- "lower_bound" : from_bytes (field .field_type , lower_bound )
602- if (lower_bound := lower_bounds .get (field .field_id ))
603- else None ,
604- "upper_bound" : from_bytes (field .field_type , upper_bound )
605- if (upper_bound := upper_bounds .get (field .field_id ))
606- else None ,
607- }
608- for field in self .tbl .metadata .schema ().fields
609- }
610- files .append (
611- {
612- "content" : data_file .content ,
613- "file_path" : data_file .file_path ,
614- "file_format" : data_file .file_format ,
615- "spec_id" : data_file .spec_id ,
616- "record_count" : data_file .record_count ,
617- "file_size_in_bytes" : data_file .file_size_in_bytes ,
618- "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
619- "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
620- "null_value_counts" : dict (data_file .null_value_counts )
621- if data_file .null_value_counts is not None
622- else None ,
623- "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
624- "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
625- "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
626- "key_metadata" : data_file .key_metadata ,
627- "split_offsets" : data_file .split_offsets ,
628- "equality_ids" : data_file .equality_ids ,
629- "sort_order_id" : data_file .sort_order_id ,
630- "readable_metrics" : readable_metrics ,
631- }
632- )
644+ files .extend (self ._files_by_manifest (manifest_list , data_file_filter ))
633645
634646 return pa .Table .from_pylist (
635647 files ,
636- schema = files_schema ,
648+ schema = self . _get_files_schema () ,
637649 )
638650
639651 def files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
@@ -657,3 +669,35 @@ def all_manifests(self) -> "pa.Table":
657669 lambda args : self ._generate_manifests_table (* args ), [(snapshot , True ) for snapshot in snapshots ]
658670 )
659671 return pa .concat_tables (manifests_by_snapshots )
672+
673+ def _all_files (self , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
674+ import pyarrow as pa
675+
676+ snapshots = self .tbl .snapshots ()
677+ if not snapshots :
678+ return pa .Table .from_pylist ([], schema = self ._get_files_schema ())
679+
680+ executor = ExecutorFactory .get_or_create ()
681+ all_manifest_files_by_snapshot : Iterator [List [ManifestFile ]] = executor .map (
682+ lambda args : args [0 ].manifests (self .tbl .io ), [(snapshot ,) for snapshot in snapshots ]
683+ )
684+ all_manifest_files = list (
685+ {(manifest .manifest_path , manifest ) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list }
686+ )
687+ all_files_by_manifest : Iterator [List [Dict [str , Any ]]] = executor .map (
688+ lambda args : self ._files_by_manifest (* args ), [(manifest , data_file_filter ) for _ , manifest in all_manifest_files ]
689+ )
690+ all_files_list = [file for files in all_files_by_manifest for file in files ]
691+ return pa .Table .from_pylist (
692+ all_files_list ,
693+ schema = self ._get_files_schema (),
694+ )
695+
696+ def all_files (self ) -> "pa.Table" :
697+ return self ._all_files ()
698+
699+ def all_data_files (self ) -> "pa.Table" :
700+ return self ._all_files ({DataFileContent .DATA })
701+
702+ def all_delete_files (self ) -> "pa.Table" :
703+ return self ._all_files ({DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
0 commit comments