9595 HDFS_KERB_TICKET ,
9696 HDFS_PORT ,
9797 HDFS_USER ,
98+ PYARROW_USE_LARGE_TYPES_ON_READ ,
9899 S3_ACCESS_KEY_ID ,
99100 S3_CONNECT_TIMEOUT ,
100101 S3_ENDPOINT ,
158159from pyiceberg .utils .config import Config
159160from pyiceberg .utils .datetime import millis_to_datetime
160161from pyiceberg .utils .deprecated import deprecated
161- from pyiceberg .utils .properties import get_first_property_value , property_as_int
162+ from pyiceberg .utils .properties import get_first_property_value , property_as_bool , property_as_int
162163from pyiceberg .utils .singleton import Singleton
163164from pyiceberg .utils .truncate import truncate_upper_bound_binary_string , truncate_upper_bound_text_string
164165
@@ -835,6 +836,10 @@ def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
835836 return visit_pyarrow (schema , _ConvertToLargeTypes ())
836837
837838
839+ def _pyarrow_schema_ensure_small_types (schema : pa .Schema ) -> pa .Schema :
840+ return visit_pyarrow (schema , _ConvertToSmallTypes ())
841+
842+
838843@singledispatch
839844def visit_pyarrow (obj : Union [pa .DataType , pa .Schema ], visitor : PyArrowSchemaVisitor [T ]) -> T :
840845 """Apply a pyarrow schema visitor to any point within a schema.
@@ -876,7 +881,6 @@ def _(obj: Union[pa.ListType, pa.LargeListType, pa.FixedSizeListType], visitor:
876881 visitor .before_list_element (obj .value_field )
877882 result = visit_pyarrow (obj .value_type , visitor )
878883 visitor .after_list_element (obj .value_field )
879-
880884 return visitor .list (obj , result )
881885
882886
@@ -1145,6 +1149,30 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType:
11451149 return primitive
11461150
11471151
1152+ class _ConvertToSmallTypes (PyArrowSchemaVisitor [Union [pa .DataType , pa .Schema ]]):
1153+ def schema (self , schema : pa .Schema , struct_result : pa .StructType ) -> pa .Schema :
1154+ return pa .schema (struct_result )
1155+
1156+ def struct (self , struct : pa .StructType , field_results : List [pa .Field ]) -> pa .StructType :
1157+ return pa .struct (field_results )
1158+
1159+ def field (self , field : pa .Field , field_result : pa .DataType ) -> pa .Field :
1160+ return field .with_type (field_result )
1161+
1162+ def list (self , list_type : pa .ListType , element_result : pa .DataType ) -> pa .DataType :
1163+ return pa .list_ (element_result )
1164+
1165+ def map (self , map_type : pa .MapType , key_result : pa .DataType , value_result : pa .DataType ) -> pa .DataType :
1166+ return pa .map_ (key_result , value_result )
1167+
1168+ def primitive (self , primitive : pa .DataType ) -> pa .DataType :
1169+ if primitive == pa .large_string ():
1170+ return pa .string ()
1171+ elif primitive == pa .large_binary ():
1172+ return pa .binary ()
1173+ return primitive
1174+
1175+
11481176class _ConvertToIcebergWithoutIDs (_ConvertToIceberg ):
11491177 """
11501178 Converts PyArrowSchema to Iceberg Schema with all -1 ids.
@@ -1169,6 +1197,7 @@ def _task_to_record_batches(
11691197 positional_deletes : Optional [List [ChunkedArray ]],
11701198 case_sensitive : bool ,
11711199 name_mapping : Optional [NameMapping ] = None ,
1200+ use_large_types : bool = True ,
11721201) -> Iterator [pa .RecordBatch ]:
11731202 _ , _ , path = PyArrowFileIO .parse_location (task .file .file_path )
11741203 arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
@@ -1197,7 +1226,9 @@ def _task_to_record_batches(
11971226 # https://github.com/apache/arrow/issues/41884
11981227 # https://github.com/apache/arrow/issues/43183
11991228 # Would be good to remove this later on
1200- schema = _pyarrow_schema_ensure_large_types (physical_schema ),
1229+ schema = _pyarrow_schema_ensure_large_types (physical_schema )
1230+ if use_large_types
1231+ else (_pyarrow_schema_ensure_small_types (physical_schema )),
12011232 # This will push down the query to Arrow.
12021233 # But in case there are positional deletes, we have to apply them first
12031234 filter = pyarrow_filter if not positional_deletes else None ,
@@ -1219,7 +1250,9 @@ def _task_to_record_batches(
12191250 arrow_table = pa .Table .from_batches ([batch ])
12201251 arrow_table = arrow_table .filter (pyarrow_filter )
12211252 batch = arrow_table .to_batches ()[0 ]
1222- yield _to_requested_schema (projected_schema , file_project_schema , batch , downcast_ns_timestamp_to_us = True )
1253+ yield _to_requested_schema (
1254+ projected_schema , file_project_schema , batch , downcast_ns_timestamp_to_us = True , use_large_types = use_large_types
1255+ )
12231256 current_index += len (batch )
12241257
12251258
@@ -1232,10 +1265,19 @@ def _task_to_table(
12321265 positional_deletes : Optional [List [ChunkedArray ]],
12331266 case_sensitive : bool ,
12341267 name_mapping : Optional [NameMapping ] = None ,
1268+ use_large_types : bool = True ,
12351269) -> Optional [pa .Table ]:
12361270 batches = list (
12371271 _task_to_record_batches (
1238- fs , task , bound_row_filter , projected_schema , projected_field_ids , positional_deletes , case_sensitive , name_mapping
1272+ fs ,
1273+ task ,
1274+ bound_row_filter ,
1275+ projected_schema ,
1276+ projected_field_ids ,
1277+ positional_deletes ,
1278+ case_sensitive ,
1279+ name_mapping ,
1280+ use_large_types ,
12391281 )
12401282 )
12411283
@@ -1303,6 +1345,8 @@ def project_table(
13031345 # When FsSpec is not installed
13041346 raise ValueError (f"Expected PyArrowFileIO or FsspecFileIO, got: { io } " ) from e
13051347
1348+ use_large_types = property_as_bool (io .properties , PYARROW_USE_LARGE_TYPES_ON_READ , True )
1349+
13061350 bound_row_filter = bind (table_metadata .schema (), row_filter , case_sensitive = case_sensitive )
13071351
13081352 projected_field_ids = {
@@ -1322,6 +1366,7 @@ def project_table(
13221366 deletes_per_file .get (task .file .file_path ),
13231367 case_sensitive ,
13241368 table_metadata .name_mapping (),
1369+ use_large_types ,
13251370 )
13261371 for task in tasks
13271372 ]
@@ -1394,6 +1439,8 @@ def project_batches(
13941439 # When FsSpec is not installed
13951440 raise ValueError (f"Expected PyArrowFileIO or FsspecFileIO, got: { io } " ) from e
13961441
1442+ use_large_types = property_as_bool (io .properties , PYARROW_USE_LARGE_TYPES_ON_READ , True )
1443+
13971444 bound_row_filter = bind (table_metadata .schema (), row_filter , case_sensitive = case_sensitive )
13981445
13991446 projected_field_ids = {
@@ -1414,6 +1461,7 @@ def project_batches(
14141461 deletes_per_file .get (task .file .file_path ),
14151462 case_sensitive ,
14161463 table_metadata .name_mapping (),
1464+ use_large_types ,
14171465 )
14181466 for batch in batches :
14191467 if limit is not None :
@@ -1447,12 +1495,13 @@ def _to_requested_schema(
14471495 batch : pa .RecordBatch ,
14481496 downcast_ns_timestamp_to_us : bool = False ,
14491497 include_field_ids : bool = False ,
1498+ use_large_types : bool = True ,
14501499) -> pa .RecordBatch :
14511500 # We could re-use some of these visitors
14521501 struct_array = visit_with_partner (
14531502 requested_schema ,
14541503 batch ,
1455- ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us , include_field_ids ),
1504+ ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us , include_field_ids , use_large_types ),
14561505 ArrowAccessor (file_schema ),
14571506 )
14581507 return pa .RecordBatch .from_struct_array (struct_array )
@@ -1462,20 +1511,31 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
14621511 _file_schema : Schema
14631512 _include_field_ids : bool
14641513 _downcast_ns_timestamp_to_us : bool
1514+ _use_large_types : bool
14651515
1466- def __init__ (self , file_schema : Schema , downcast_ns_timestamp_to_us : bool = False , include_field_ids : bool = False ) -> None :
1516+ def __init__ (
1517+ self ,
1518+ file_schema : Schema ,
1519+ downcast_ns_timestamp_to_us : bool = False ,
1520+ include_field_ids : bool = False ,
1521+ use_large_types : bool = True ,
1522+ ) -> None :
14671523 self ._file_schema = file_schema
14681524 self ._include_field_ids = include_field_ids
14691525 self ._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1526+ self ._use_large_types = use_large_types
14701527
14711528 def _cast_if_needed (self , field : NestedField , values : pa .Array ) -> pa .Array :
14721529 file_field = self ._file_schema .find_field (field .field_id )
14731530
14741531 if field .field_type .is_primitive :
14751532 if field .field_type != file_field .field_type :
1476- return values . cast (
1477- schema_to_pyarrow ( promote (file_field .field_type , field .field_type ), include_field_ids = self ._include_field_ids )
1533+ target_schema = schema_to_pyarrow (
1534+ promote (file_field .field_type , field .field_type ), include_field_ids = self ._include_field_ids
14781535 )
1536+ if not self ._use_large_types :
1537+ target_schema = _pyarrow_schema_ensure_small_types (target_schema )
1538+ return values .cast (target_schema )
14791539 elif (target_type := schema_to_pyarrow (field .field_type , include_field_ids = self ._include_field_ids )) != values .type :
14801540 if field .field_type == TimestampType ():
14811541 # Downcasting of nanoseconds to microseconds
@@ -1547,12 +1607,13 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional
15471607
15481608 def list (self , list_type : ListType , list_array : Optional [pa .Array ], value_array : Optional [pa .Array ]) -> Optional [pa .Array ]:
15491609 if isinstance (list_array , (pa .ListArray , pa .LargeListArray , pa .FixedSizeListArray )) and value_array is not None :
1610+ list_initializer = pa .large_list if isinstance (list_array , pa .LargeListArray ) else pa .list_
15501611 if isinstance (value_array , pa .StructArray ):
15511612 # This can be removed once this has been fixed:
15521613 # https://github.com/apache/arrow/issues/38809
15531614 list_array = pa .LargeListArray .from_arrays (list_array .offsets , value_array )
15541615 value_array = self ._cast_if_needed (list_type .element_field , value_array )
1555- arrow_field = pa . large_list (self ._construct_field (list_type .element_field , value_array .type ))
1616+ arrow_field = list_initializer (self ._construct_field (list_type .element_field , value_array .type ))
15561617 return list_array .cast (arrow_field )
15571618 else :
15581619 return None
0 commit comments