2828import pyarrow
2929import pyarrow as pa
3030import pyarrow .parquet as pq
31+ import pyarrow .orc as orc
3132import pytest
3233from packaging import version
3334from pyarrow .fs import AwsDefaultS3RetryStrategy , FileType , LocalFileSystem , S3FileSystem
@@ -2638,3 +2639,102 @@ def test_retry_strategy_not_found() -> None:
26382639 io = PyArrowFileIO (properties = {S3_RETRY_STRATEGY_IMPL : "pyiceberg.DoesNotExist" })
26392640 with pytest .warns (UserWarning , match = "Could not initialize S3 retry strategy: pyiceberg.DoesNotExist" ):
26402641 io .new_input ("s3://bucket/path/to/file" )
2642+
2643+
2644+ def test_write_and_read_orc (tmp_path ):
2645+ # Create a simple Arrow table
2646+ data = pa .table ({'a' : [1 , 2 , 3 ], 'b' : ['x' , 'y' , 'z' ]})
2647+ orc_path = tmp_path / 'test.orc'
2648+ orc .write_table (data , str (orc_path ))
2649+ # Read it back
2650+ orc_file = orc .ORCFile (str (orc_path ))
2651+ table_read = orc_file .read ()
2652+ assert table_read .equals (data )
2653+
2654+
2655+ def test_orc_file_format_integration (tmp_path ):
2656+ # This test mimics a minimal integration with PyIceberg's FileFormat enum and pyarrow.orc
2657+ from pyiceberg .manifest import FileFormat
2658+ import pyarrow .dataset as ds
2659+ data = pa .table ({'a' : [10 , 20 ], 'b' : ['foo' , 'bar' ]})
2660+ orc_path = tmp_path / 'iceberg.orc'
2661+ orc .write_table (data , str (orc_path ))
2662+ # Use PyArrow dataset API to read as ORC
2663+ dataset = ds .dataset (str (orc_path ), format = ds .OrcFileFormat ())
2664+ table_read = dataset .to_table ()
2665+ assert table_read .equals (data )
2666+
2667+
2668+ def test_iceberg_write_and_read_orc (tmp_path ):
2669+ """
2670+ Integration test: Write and read ORC via Iceberg API.
2671+ To run just this test:
2672+ pytest tests/io/test_pyarrow.py -k test_iceberg_write_and_read_orc
2673+ """
2674+ import pyarrow as pa
2675+ from pyiceberg .schema import Schema , NestedField
2676+ from pyiceberg .types import IntegerType , StringType
2677+ from pyiceberg .manifest import FileFormat , DataFileContent
2678+ from pyiceberg .table .metadata import TableMetadataV2
2679+ from pyiceberg .partitioning import PartitionSpec
2680+ from pyiceberg .io .pyarrow import write_file , PyArrowFileIO , ArrowScan
2681+ from pyiceberg .table import WriteTask , FileScanTask
2682+ import uuid
2683+
2684+ # Define schema and data
2685+ schema = Schema (
2686+ NestedField (1 , "id" , IntegerType (), required = True ),
2687+ NestedField (2 , "name" , StringType (), required = False ),
2688+ )
2689+ data = pa .table ({"id" : pa .array ([1 , 2 , 3 ], type = pa .int32 ()), "name" : ["a" , "b" , "c" ]})
2690+
2691+ # Create table metadata
2692+ table_metadata = TableMetadataV2 (
2693+ location = str (tmp_path ),
2694+ last_column_id = 2 ,
2695+ format_version = 2 ,
2696+ schemas = [schema ],
2697+ partition_specs = [PartitionSpec ()],
2698+ properties = {
2699+ "write.format.default" : "orc" ,
2700+ }
2701+ )
2702+ io = PyArrowFileIO ()
2703+
2704+ # Write ORC file using Iceberg API
2705+ write_uuid = uuid .uuid4 ()
2706+ tasks = [
2707+ WriteTask (
2708+ write_uuid = write_uuid ,
2709+ task_id = 0 ,
2710+ record_batches = data .to_batches (),
2711+ schema = schema ,
2712+ )
2713+ ]
2714+ data_files = list (write_file (io , table_metadata , iter (tasks )))
2715+ assert len (data_files ) == 1
2716+ data_file = data_files [0 ]
2717+ assert data_file .file_format == FileFormat .ORC
2718+ assert data_file .content == DataFileContent .DATA
2719+
2720+ # Read back using ArrowScan
2721+ scan = ArrowScan (
2722+ table_metadata = table_metadata ,
2723+ io = io ,
2724+ projected_schema = schema ,
2725+ row_filter = AlwaysTrue (),
2726+ case_sensitive = True ,
2727+ )
2728+ scan_task = FileScanTask (data_file = data_file )
2729+ table_read = scan .to_table ([scan_task ])
2730+
2731+ # Compare data ignoring schema metadata (like not null constraints)
2732+ assert table_read .num_rows == data .num_rows
2733+ assert table_read .num_columns == data .num_columns
2734+ assert table_read .column_names == data .column_names
2735+
2736+ # Compare actual column data values
2737+ for col_name in data .column_names :
2738+ original_values = data .column (col_name ).to_pylist ()
2739+ read_values = table_read .column (col_name ).to_pylist ()
2740+ assert original_values == read_values , f"Column { col_name } values don't match"
0 commit comments