1818import math
1919import os
2020import random
21+ import re
2122import time
2223import uuid
2324from datetime import date , datetime , timedelta
4445from pyiceberg .catalog .sql import SqlCatalog
4546from pyiceberg .exceptions import CommitFailedException , NoSuchTableError
4647from pyiceberg .expressions import And , EqualTo , GreaterThanOrEqual , In , LessThan , Not
47- from pyiceberg .io .pyarrow import _dataframe_to_data_files
48+ from pyiceberg .io .pyarrow import _dataframe_to_data_files , UnsupportedPyArrowTypeException
4849from pyiceberg .partitioning import PartitionField , PartitionSpec
4950from pyiceberg .schema import Schema
5051from pyiceberg .table import TableProperties
@@ -2249,18 +2250,27 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
22492250
22502251
22512252@pytest .mark .integration
2252- def test_nanosecond_support_on_catalog (session_catalog : Catalog ) -> None :
2253+ def test_nanosecond_support_on_catalog (
2254+ session_catalog : Catalog , arrow_table_schema_with_all_timestamp_precisions : pa .Schema
2255+ ) -> None :
22532256 identifier = "default.test_nanosecond_support_on_catalog"
2254- # Create a pyarrow table with a nanosecond timestamp column
2255- table = pa .Table .from_arrays (
2256- [
2257- pa .array ([datetime .now ()], type = pa .timestamp ("ns" )),
2258- pa .array ([datetime .now ()], type = pa .timestamp ("ns" , tz = "America/New_York" )),
2259- ],
2260- names = ["timestamp_ns" , "timestamptz_ns" ],
2261- )
22622257
2263- _create_table (session_catalog , identifier , {"format-version" : "3" }, schema = table .schema )
2258+ catalog = load_catalog ("default" , type = "in-memory" )
2259+ catalog .create_namespace ("ns" )
2260+
2261+ _create_table (session_catalog , identifier , {"format-version" : "3" }, schema = arrow_table_schema_with_all_timestamp_precisions )
2262+
2263+ with pytest .raises (NotImplementedError , match = "Writing V3 is not yet supported" ):
2264+ catalog .create_table (
2265+ "ns.table1" , schema = arrow_table_schema_with_all_timestamp_precisions , properties = {"format-version" : "3" }
2266+ )
2267+
2268+ with pytest .raises (
2269+ UnsupportedPyArrowTypeException , match = re .escape ("Column 'timestamp_ns' has an unsupported type: timestamp[ns]" )
2270+ ):
2271+ _create_table (
2272+ session_catalog , identifier , {"format-version" : "2" }, schema = arrow_table_schema_with_all_timestamp_precisions
2273+ )
22642274
22652275
22662276@pytest .mark .parametrize ("format_version" , [1 , 2 ])
@@ -2281,14 +2291,7 @@ def test_stage_only_delete(
22812291 original_count = len (tbl .scan ().to_arrow ())
22822292 assert original_count == 3
22832293
2284- files_to_delete = []
2285- for file_task in tbl .scan ().plan_files ():
2286- files_to_delete .append (file_task .file )
2287- assert len (files_to_delete ) > 0
2288-
2289- with tbl .transaction () as txn :
2290- with txn .update_snapshot (branch = None ).delete () as delete :
2291- delete .delete_by_predicate (EqualTo ("int" , 9 ))
2294+ tbl .delete ("int = 9" , branch = None )
22922295
22932296 # a new delete snapshot is added
22942297 snapshots = tbl .snapshots ()
@@ -2298,16 +2301,11 @@ def test_stage_only_delete(
22982301 assert len (tbl .scan ().to_arrow ()) == original_count
22992302
23002303 # Write to main branch
2301- with tbl .transaction () as txn :
2302- with txn .update_snapshot ().fast_append () as fast_append :
2303- for data_file in _dataframe_to_data_files (
2304- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2305- ):
2306- fast_append .append_data_file (data_file = data_file )
2304+ tbl .append (arrow_table_with_null )
23072305
23082306 # Main ref has changed
23092307 assert current_snapshot != tbl .metadata .current_snapshot_id
2310- assert len (tbl .scan ().to_arrow ()) == 3
2308+ assert len (tbl .scan ().to_arrow ()) == 6
23112309 snapshots = tbl .snapshots ()
23122310 assert len (snapshots ) == 3
23132311
@@ -2327,7 +2325,7 @@ def test_stage_only_delete(
23272325
23282326@pytest .mark .integration
23292327@pytest .mark .parametrize ("format_version" , [1 , 2 ])
2330- def test_stage_only_fast_append (
2328+ def test_stage_only_append (
23312329 spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
23322330) -> None :
23332331 identifier = f"default.test_stage_only_fast_append_files_v{ format_version } "
@@ -2340,12 +2338,7 @@ def test_stage_only_fast_append(
23402338 assert original_count == 3
23412339
23422340 # Write to staging branch
2343- with tbl .transaction () as txn :
2344- with txn .update_snapshot (branch = None ).fast_append () as fast_append :
2345- for data_file in _dataframe_to_data_files (
2346- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2347- ):
2348- fast_append .append_data_file (data_file = data_file )
2341+ tbl .append (arrow_table_with_null , branch = None )
23492342
23502343 # Main ref has not changed and data is not yet appended
23512344 assert current_snapshot == tbl .metadata .current_snapshot_id
@@ -2355,12 +2348,7 @@ def test_stage_only_fast_append(
23552348 assert len (snapshots ) == 2
23562349
23572350 # Write to main branch
2358- with tbl .transaction () as txn :
2359- with txn .update_snapshot ().fast_append () as fast_append :
2360- for data_file in _dataframe_to_data_files (
2361- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2362- ):
2363- fast_append .append_data_file (data_file = data_file )
2351+ tbl .append (arrow_table_with_null )
23642352
23652353 # Main ref has changed
23662354 assert current_snapshot != tbl .metadata .current_snapshot_id
@@ -2382,119 +2370,49 @@ def test_stage_only_fast_append(
23822370 assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
23832371
23842372
2385-
2386- @pytest .mark .integration
2387- @pytest .mark .parametrize ("format_version" , [1 , 2 ])
2388- def test_stage_only_merge_append (
2389- spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
2390- ) -> None :
2391- identifier = f"default.test_stage_only_merge_append_files_v{ format_version } "
2392- tbl = _create_table (session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ])
2393-
2394- current_snapshot = tbl .metadata .current_snapshot_id
2395- assert current_snapshot is not None
2396-
2397- original_count = len (tbl .scan ().to_arrow ())
2398- assert original_count == 3
2399-
2400- with tbl .transaction () as txn :
2401- with txn .update_snapshot (branch = None ).merge_append () as merge_append :
2402- for data_file in _dataframe_to_data_files (
2403- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2404- ):
2405- merge_append .append_data_file (data_file = data_file )
2406-
2407- # Main ref has not changed and data is not yet appended
2408- assert current_snapshot == tbl .metadata .current_snapshot_id
2409- assert len (tbl .scan ().to_arrow ()) == original_count
2410-
2411- # There should be a new staged snapshot
2412- snapshots = tbl .snapshots ()
2413- assert len (snapshots ) == 2
2414-
2415- # Write to main branch
2416- with tbl .transaction () as txn :
2417- with txn .update_snapshot ().fast_append () as fast_append :
2418- for data_file in _dataframe_to_data_files (
2419- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2420- ):
2421- fast_append .append_data_file (data_file = data_file )
2422-
2423- # Main ref has changed
2424- assert current_snapshot != tbl .metadata .current_snapshot_id
2425- assert len (tbl .scan ().to_arrow ()) == 6
2426- snapshots = tbl .snapshots ()
2427- assert len (snapshots ) == 3
2428-
2429- rows = spark .sql (
2430- f"""
2431- SELECT operation, parent_id
2432- FROM { identifier } .snapshots
2433- ORDER BY committed_at ASC
2434- """
2435- ).collect ()
2436- operations = [row .operation for row in rows ]
2437- parent_snapshot_id = [row .parent_id for row in rows ]
2438- assert operations == ["append" , "append" , "append" ]
2439- # both subsequent parent id should be the first snapshot id
2440- assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
2441-
2442-
24432373@pytest .mark .integration
24442374@pytest .mark .parametrize ("format_version" , [1 , 2 ])
24452375def test_stage_only_overwrite_files (
24462376 spark : SparkSession , session_catalog : Catalog , arrow_table_with_null : pa .Table , format_version : int
24472377) -> None :
24482378 identifier = f"default.test_stage_only_overwrite_files_v{ format_version } "
24492379 tbl = _create_table (session_catalog , identifier , {"format-version" : str (format_version )}, [arrow_table_with_null ])
2380+ first_snapshot = tbl .metadata .current_snapshot_id
24502381
2451- current_snapshot = tbl . metadata . current_snapshot_id
2452- assert current_snapshot is not None
2382+ # duplicate data with a new insert
2383+ tbl . append ( arrow_table_with_null )
24532384
2385+ second_snapshot = tbl .metadata .current_snapshot_id
2386+ assert second_snapshot is not None
24542387 original_count = len (tbl .scan ().to_arrow ())
2455- assert original_count == 3
2388+ assert original_count == 6
24562389
2457- files_to_delete = []
2458- for file_task in tbl .scan ().plan_files ():
2459- files_to_delete .append (file_task .file )
2460- assert len (files_to_delete ) > 0
2461-
2462- with tbl .transaction () as txn :
2463- with txn .update_snapshot (branch = None ).overwrite () as overwrite :
2464- for data_file in _dataframe_to_data_files (
2465- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2466- ):
2467- overwrite .append_data_file (data_file = data_file )
2468- overwrite .delete_data_file (files_to_delete [0 ])
2469-
2470- assert current_snapshot == tbl .metadata .current_snapshot_id
2390+ # write to non-main branch
2391+ tbl .overwrite (arrow_table_with_null , branch = None )
2392+ assert second_snapshot == tbl .metadata .current_snapshot_id
24712393 assert len (tbl .scan ().to_arrow ()) == original_count
24722394 snapshots = tbl .snapshots ()
2473- assert len (snapshots ) == 2
2395+ # overwrite will create 2 snapshots
2396+ assert len (snapshots ) == 4
24742397
2475- # Write to main branch
2476- with tbl .transaction () as txn :
2477- with txn .update_snapshot ().fast_append () as fast_append :
2478- for data_file in _dataframe_to_data_files (
2479- table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2480- ):
2481- fast_append .append_data_file (data_file = data_file )
2398+ # Write to main branch again
2399+ tbl .append (arrow_table_with_null )
24822400
24832401 # Main ref has changed
2484- assert current_snapshot != tbl .metadata .current_snapshot_id
2485- assert len (tbl .scan ().to_arrow ()) == 6
2402+ assert second_snapshot != tbl .metadata .current_snapshot_id
2403+ assert len (tbl .scan ().to_arrow ()) == 9
24862404 snapshots = tbl .snapshots ()
2487- assert len (snapshots ) == 3
2405+ assert len (snapshots ) == 5
24882406
24892407 rows = spark .sql (
24902408 f"""
2491- SELECT operation, parent_id
2409+ SELECT operation, parent_id, snapshot_id
24922410 FROM { identifier } .snapshots
24932411 ORDER BY committed_at ASC
24942412 """
24952413 ).collect ()
24962414 operations = [row .operation for row in rows ]
24972415 parent_snapshot_id = [row .parent_id for row in rows ]
2498- assert operations == ["append" , "overwrite " , "append" ]
2499- # both subsequent parent id should be the first snapshot id
2500- assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
2416+ assert operations == ["append" , "append" , "delete" , "append " , "append" ]
2417+
2418+ assert parent_snapshot_id == [None , first_snapshot , second_snapshot , second_snapshot , second_snapshot ]
0 commit comments