7474from pyiceberg .partitioning import UNPARTITIONED_PARTITION_SPEC , PartitionSpec
7575from pyiceberg .schema import Schema , SchemaVisitor , visit
7676from pyiceberg .serializers import FromInputFile
77- from pyiceberg .table import CommitTableRequest , CommitTableResponse , PropertyUtil , Table , TableProperties , update_table_metadata
78- from pyiceberg .table .metadata import new_table_metadata
77+ from pyiceberg .table import (
78+ CommitTableRequest ,
79+ CommitTableResponse ,
80+ PropertyUtil ,
81+ StagedTable ,
82+ Table ,
83+ TableProperties ,
84+ update_table_metadata ,
85+ )
7986from pyiceberg .table .sorting import UNSORTED_SORT_ORDER , SortOrder
8087from pyiceberg .typedef import EMPTY_DICT , Identifier , Properties
8188from pyiceberg .types import (
@@ -266,6 +273,26 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
266273 catalog = self ,
267274 )
268275
276+ def _convert_iceberg_into_hive (self , table : Table ) -> HiveTable :
277+ identifier_tuple = self .identifier_to_tuple_without_catalog (table .identifier )
278+ database_name , table_name = self .identifier_to_database_and_table (identifier_tuple , NoSuchTableError )
279+ current_time_millis = int (time .time () * 1000 )
280+
281+ return HiveTable (
282+ dbName = database_name ,
283+ tableName = table_name ,
284+ owner = table .properties [OWNER ] if table .properties and OWNER in table .properties else getpass .getuser (),
285+ createTime = current_time_millis // 1000 ,
286+ lastAccessTime = current_time_millis // 1000 ,
287+ sd = _construct_hive_storage_descriptor (
288+ table .schema (),
289+ table .location (),
290+ PropertyUtil .property_as_bool (self .properties , HIVE2_COMPATIBLE , HIVE2_COMPATIBLE_DEFAULT ),
291+ ),
292+ tableType = EXTERNAL_TABLE ,
293+ parameters = _construct_parameters (table .metadata_location ),
294+ )
295+
269296 def create_table (
270297 self ,
271298 identifier : Union [str , Identifier ],
@@ -292,45 +319,27 @@ def create_table(
292319 AlreadyExistsError: If a table with the name already exists.
293320 ValueError: If the identifier is invalid.
294321 """
295- schema : Schema = self ._convert_schema_if_needed (schema ) # type: ignore
296-
297322 properties = {** DEFAULT_PROPERTIES , ** properties }
298- database_name , table_name = self .identifier_to_database_and_table (identifier )
299- current_time_millis = int (time .time () * 1000 )
300-
301- location = self ._resolve_table_location (location , database_name , table_name )
302-
303- metadata_location = self ._get_metadata_location (location = location )
304- metadata = new_table_metadata (
305- location = location ,
323+ staged_table = self ._create_staged_table (
324+ identifier = identifier ,
306325 schema = schema ,
326+ location = location ,
307327 partition_spec = partition_spec ,
308328 sort_order = sort_order ,
309329 properties = properties ,
310330 )
311- io = load_file_io ({** self .properties , ** properties }, location = location )
312- self ._write_metadata (metadata , io , metadata_location )
331+ database_name , table_name = self .identifier_to_database_and_table (identifier )
313332
314- tbl = HiveTable (
315- dbName = database_name ,
316- tableName = table_name ,
317- owner = properties [OWNER ] if properties and OWNER in properties else getpass .getuser (),
318- createTime = current_time_millis // 1000 ,
319- lastAccessTime = current_time_millis // 1000 ,
320- sd = _construct_hive_storage_descriptor (
321- schema , location , PropertyUtil .property_as_bool (self .properties , HIVE2_COMPATIBLE , HIVE2_COMPATIBLE_DEFAULT )
322- ),
323- tableType = EXTERNAL_TABLE ,
324- parameters = _construct_parameters (metadata_location ),
325- )
333+ self ._write_metadata (staged_table .metadata , staged_table .io , staged_table .metadata_location )
334+ tbl = self ._convert_iceberg_into_hive (staged_table )
326335 try :
327336 with self ._client as open_client :
328337 open_client .create_table (tbl )
329338 hive_table = open_client .get_table (dbname = database_name , tbl_name = table_name )
330339 except AlreadyExistsException as e :
331340 raise TableAlreadyExistsError (f"Table { database_name } .{ table_name } already exists" ) from e
332341
333- return self ._convert_hive_into_iceberg (hive_table , io )
342+ return self ._convert_hive_into_iceberg (hive_table , staged_table . io )
334343
335344 def register_table (self , identifier : Union [str , Identifier ], metadata_location : str ) -> Table :
336345 """Register a new table using existing metadata.
@@ -404,8 +413,32 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
404413 metadata_location = new_metadata_location , previous_metadata_location = current_table .metadata_location
405414 )
406415 open_client .alter_table (dbname = database_name , tbl_name = table_name , new_tbl = hive_table )
407- except NoSuchObjectException as e :
408- raise NoSuchTableError (f"Table does not exist: { table_name } " ) from e
416+ except NoSuchObjectException :
417+ updated_metadata = update_table_metadata (
418+ base_metadata = self ._empty_table_metadata (), updates = table_request .updates , enforce_validation = True
419+ )
420+ new_metadata_version = 0
421+ new_metadata_location = self ._get_metadata_location (updated_metadata .location , new_metadata_version )
422+ io = self ._load_file_io (updated_metadata .properties , new_metadata_location )
423+ self ._write_metadata (
424+ updated_metadata ,
425+ io ,
426+ new_metadata_location ,
427+ )
428+
429+ tbl = self ._convert_iceberg_into_hive (
430+ StagedTable (
431+ identifier = (self .name , database_name , table_name ),
432+ metadata = updated_metadata ,
433+ metadata_location = new_metadata_location ,
434+ io = io ,
435+ catalog = self ,
436+ )
437+ )
438+ try :
439+ open_client .create_table (tbl )
440+ except AlreadyExistsException as e :
441+ raise TableAlreadyExistsError (f"Table { database_name } .{ table_name } already exists" ) from e
409442 finally :
410443 open_client .unlock (UnlockRequest (lockid = lock .lockid ))
411444
0 commit comments