Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,22 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError
database_name, table_name = self.identifier_to_database_and_table(identifier)
io = self._load_file_io(location=metadata_location)
metadata_file = io.new_input(metadata_location)
staged_table = StagedTable(
identifier=(database_name, table_name),
metadata=FromInputFile.table_metadata(metadata_file),
metadata_location=metadata_location,
io=io,
catalog=self,
)
tbl = self._convert_iceberg_into_hive(staged_table)
with self._client as open_client:
Comment thread
kevinjqliu marked this conversation as resolved.
self._create_hive_table(open_client, tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)

return self._convert_hive_into_iceberg(hive_table)

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError
Expand Down
81 changes: 81 additions & 0 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,87 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None:
catalog.create_table("table", schema=table_schema_simple)


@pytest.mark.parametrize("hive2_compatible", [True, False])
@patch("time.time", MagicMock(return_value=12345))
def test_register_table(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you feel about moving this test to tests/integration/test_register_table.py? We can do a test both for Hive and the Rest catalog. The big upside here is that we don't have to mock everything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration test against real local Hivemetastore is better, I'll try to implement this. Did not notice the great integration test coverage before.

table_schema_with_all_types: Schema,
hive_database: HiveDatabase,
hive_table: HiveTable,
hive2_compatible: bool,
metadata_with_owner_location: str,
) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
if hive2_compatible:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"})

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
catalog._client.__enter__().register_table.return_value = None
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_database.return_value = hive_database

catalog.register_table(("default", "table"), metadata_location=metadata_with_owner_location)

catalog._client.__enter__().create_table.assert_called_with(
HiveTable(
tableName="table",
dbName="default",
owner="test",
createTime=12345,
lastAccessTime=12345,
retention=None,
sd=StorageDescriptor(
cols=[
FieldSchema(name="x", type="bigint", comment=None), # Corrected columns
FieldSchema(name="y", type="bigint", comment="comment"),
FieldSchema(name="z", type="bigint", comment=None),
],
location="s3://bucket/test/location", # Corrected location
inputFormat="org.apache.hadoop.mapred.FileInputFormat",
outputFormat="org.apache.hadoop.mapred.FileOutputFormat",
compressed=None,
numBuckets=None,
serdeInfo=SerDeInfo(
name=None,
serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
parameters=None,
description=None,
serializerClass=None,
deserializerClass=None,
serdeType=None,
),
bucketCols=None,
sortCols=None,
parameters=None,
skewedInfo=None,
storedAsSubDirectories=None,
),
partitionKeys=None,
parameters={"EXTERNAL": "TRUE", "table_type": "ICEBERG", "metadata_location": metadata_with_owner_location},
viewOriginalText=None,
viewExpandedText=None,
tableType="EXTERNAL_TABLE",
privileges=None,
temporary=False,
rewriteEnabled=None,
creationMetadata=None,
catName=None,
ownerType=1,
writeId=-1,
isStatsCompliant=None,
colStats=None,
accessType=None,
requiredReadCapabilities=None,
requiredWriteCapabilities=None,
id=None,
fileMetadata=None,
dictionary=None,
txnId=None,
)
)
assert catalog.table_exists(identifier="default.table")


@pytest.mark.parametrize("hive2_compatible", [True, False])
@patch("time.time", MagicMock(return_value=12345))
def test_create_table(
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,17 @@ def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
return metadata_location


@pytest.fixture(scope="session")
def metadata_with_owner_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO

metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json")
metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
metadata.properties["owner"] = "test"
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
return metadata_location


@pytest.fixture(scope="session")
def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down