@@ -2582,7 +2582,16 @@ def write_orc(task: WriteTask) -> DataFile:
25822582 fo = io .new_output (file_path )
25832583 with fo .create (overwrite = True ) as fos :
25842584 orc .write_table (arrow_table , fos )
2585- # You may want to add statistics extraction here if needed
2585+
2586+ # Extract statistics from the written ORC file
2587+ orc_file = orc .ORCFile (fo .to_input_file ().open ())
2588+ statistics = data_file_statistics_from_orc_metadata (
2589+ orc_metadata = orc_file ,
2590+ stats_columns = compute_statistics_plan (file_schema , table_metadata .properties ),
2591+ orc_column_mapping = orc_column_to_id_mapping (file_schema ),
2592+ arrow_table = arrow_table ,
2593+ )
2594+
25862595 data_file = DataFile .from_args (
25872596 content = DataFileContent .DATA ,
25882597 file_path = file_path ,
@@ -2593,7 +2602,7 @@ def write_orc(task: WriteTask) -> DataFile:
25932602 spec_id = table_metadata .default_spec_id ,
25942603 equality_ids = None ,
25952604 key_metadata = None ,
2596- # statistics=... (if you implement ORC stats)
2605+ ** statistics . to_serialized_dict (),
25972606 )
25982607 return data_file
25992608
@@ -2891,3 +2900,180 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
28912900 field_array = arrow_table [path_parts [0 ]]
28922901 # Navigate into the struct using the remaining path parts
28932902 return pc .struct_field (field_array , path_parts [1 :])
2903+
2904+
2905+ def data_file_statistics_from_orc_metadata (
2906+ orc_metadata : "orc.ORCFile" ,
2907+ stats_columns : Dict [int , StatisticsCollector ],
2908+ orc_column_mapping : Dict [str , int ],
2909+ arrow_table : Optional [pa .Table ] = None ,
2910+ ) -> DataFileStatistics :
2911+ """
2912+ Compute and return DataFileStatistics that includes the following.
2913+
2914+ - record_count
2915+ - column_sizes
2916+ - value_counts
2917+ - null_value_counts
2918+ - nan_value_counts
2919+ - column_aggregates
2920+ - split_offsets
2921+
2922+ Args:
2923+ orc_metadata (pyarrow.orc.ORCFile): A pyarrow ORC file object.
2924+ stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
2925+ set the mode for column metrics collection
2926+ orc_column_mapping (Dict[str, int]): The mapping of the ORC column name to the field ID
2927+ arrow_table (pa.Table, optional): The original arrow table that was written, used for row count
2928+ """
2929+ column_sizes : Dict [int , int ] = {}
2930+ value_counts : Dict [int , int ] = {}
2931+ split_offsets : List [int ] = []
2932+
2933+ null_value_counts : Dict [int , int ] = {}
2934+ nan_value_counts : Dict [int , int ] = {}
2935+
2936+ col_aggs = {}
2937+
2938+ invalidate_col : Set [int ] = set ()
2939+
2940+ # Get row count from the arrow table if available, otherwise use a default
2941+ if arrow_table is not None :
2942+ record_count = arrow_table .num_rows
2943+ else :
2944+ # Fallback: ORC doesn't provide num_rows like Parquet, so we'll use a default
2945+ record_count = 0
2946+
2947+ # ORC files have a single stripe structure, unlike Parquet's row groups
2948+ # We'll process the file-level statistics
2949+ for col_name , field_id in orc_column_mapping .items ():
2950+ stats_col = stats_columns [field_id ]
2951+
2952+ # Initialize column sizes (ORC doesn't provide per-column size like Parquet)
2953+ column_sizes [field_id ] = 0 # ORC doesn't provide detailed column size info
2954+
2955+ if stats_col .mode == MetricsMode (MetricModeTypes .NONE ):
2956+ continue
2957+
2958+ # Get column statistics from ORC metadata
2959+ try :
2960+ # ORC provides file-level statistics
2961+ # Note: ORC statistics are more limited than Parquet
2962+ # We'll use the available statistics and set defaults for missing ones
2963+
2964+ # For ORC, we'll use the total number of values as value count
2965+ # This is a simplification since ORC doesn't provide per-column value counts like Parquet
2966+ value_counts [field_id ] = record_count
2967+
2968+ # ORC doesn't provide null counts in the same way as Parquet
2969+ # We'll set this to 0 for now, as ORC doesn't expose null counts easily
2970+ null_value_counts [field_id ] = 0
2971+
2972+ if stats_col .mode == MetricsMode (MetricModeTypes .COUNTS ):
2973+ continue
2974+
2975+ if field_id not in col_aggs :
2976+ col_aggs [field_id ] = StatsAggregator (
2977+ stats_col .iceberg_type , _primitive_to_physical (stats_col .iceberg_type ), stats_col .mode .length
2978+ )
2979+
2980+ # ORC doesn't provide min/max statistics in the same way as Parquet
2981+ # We'll skip the min/max aggregation for ORC files
2982+ # This is a limitation of ORC's metadata structure compared to Parquet
2983+
2984+ except Exception as e :
2985+ invalidate_col .add (field_id )
2986+ logger .warning (f"Failed to extract ORC statistics for column { col_name } : { e } " )
2987+
2988+ # ORC doesn't have split offsets like Parquet
2989+ # We'll use an empty list or a single offset at 0
2990+ split_offsets = [0 ] if record_count > 0 else []
2991+
2992+ # Clean up invalid columns
2993+ for field_id in invalidate_col :
2994+ col_aggs .pop (field_id , None )
2995+ null_value_counts .pop (field_id , None )
2996+
2997+ return DataFileStatistics (
2998+ record_count = record_count ,
2999+ column_sizes = column_sizes ,
3000+ value_counts = value_counts ,
3001+ null_value_counts = null_value_counts ,
3002+ nan_value_counts = nan_value_counts ,
3003+ column_aggregates = col_aggs ,
3004+ split_offsets = split_offsets ,
3005+ )
3006+
3007+
3008+ class ID2OrcColumn :
3009+ field_id : int
3010+ orc_column : str
3011+
3012+ def __init__ (self , field_id : int , orc_column : str ):
3013+ self .field_id = field_id
3014+ self .orc_column = orc_column
3015+
3016+
3017+ class ID2OrcColumnVisitor (PreOrderSchemaVisitor [List [ID2OrcColumn ]]):
3018+ _field_id : int = 0
3019+ _path : List [str ]
3020+
3021+ def __init__ (self ) -> None :
3022+ self ._path = []
3023+
3024+ def schema (self , schema : Schema , struct_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3025+ return struct_result ()
3026+
3027+ def struct (self , struct : StructType , field_results : List [Callable [[], List [ID2OrcColumn ]]]) -> List [ID2OrcColumn ]:
3028+ return list (itertools .chain (* [result () for result in field_results ]))
3029+
3030+ def field (self , field : NestedField , field_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3031+ self ._field_id = field .field_id
3032+ self ._path .append (field .name )
3033+ result = field_result ()
3034+ self ._path .pop ()
3035+ return result
3036+
3037+ def list (self , list_type : ListType , element_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3038+ self ._field_id = list_type .element_id
3039+ self ._path .append ("list.element" )
3040+ result = element_result ()
3041+ self ._path .pop ()
3042+ return result
3043+
3044+ def map (
3045+ self ,
3046+ map_type : MapType ,
3047+ key_result : Callable [[], List [ID2OrcColumn ]],
3048+ value_result : Callable [[], List [ID2OrcColumn ]],
3049+ ) -> List [ID2OrcColumn ]:
3050+ self ._field_id = map_type .key_id
3051+ self ._path .append ("key_value.key" )
3052+ k = key_result ()
3053+ self ._path .pop ()
3054+ self ._field_id = map_type .value_id
3055+ self ._path .append ("key_value.value" )
3056+ v = value_result ()
3057+ self ._path .pop ()
3058+ return k + v
3059+
3060+ def primitive (self , primitive : PrimitiveType ) -> List [ID2OrcColumn ]:
3061+ return [ID2OrcColumn (field_id = self ._field_id , orc_column = "." .join (self ._path ))]
3062+
3063+
3064+ def orc_column_to_id_mapping (
3065+ schema : Schema ,
3066+ ) -> Dict [str , int ]:
3067+ """
3068+ Create a mapping from ORC column names to Iceberg field IDs.
3069+
3070+ Args:
3071+ schema: The Iceberg schema
3072+
3073+ Returns:
3074+ A dictionary mapping ORC column names to field IDs
3075+ """
3076+ result : Dict [str , int ] = {}
3077+ for pair in pre_order_visit (schema , ID2OrcColumnVisitor ()):
3078+ result [pair .orc_column ] = pair .field_id
3079+ return result
0 commit comments