Skip to content

Commit d89c9cb

Browse files
authored
Fixed api compatibility with DBR16 (#4712)
fixes #4331
1 parent d7a1c55 commit d89c9cb

2 files changed

Lines changed: 232 additions & 71 deletions

File tree

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 116 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -322,54 +322,77 @@ def _catalog_storage(self):
322322
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat # pylint: disable=protected-access
323323
)
324324

325-
@staticmethod
326-
def _get_entity_storage_locations(table_metadata):
327-
"""Obtain the entityStorageLocations property for table metadata, if the property is present."""
328-
# This is needed because:
329-
# - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for
330-
# as a constructor parameter for CatalogTable.
331-
# - We need to be compatible with earlier versions of DBR.
332-
# - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent
333-
# methods will be automatically created on the proxy but fail when invoked.
334-
# Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it.
335-
# (After trying to access it, dir() will also include it even though it doesn't exist.)
336-
return table_metadata.entityStorageLocations() if 'entityStorageLocations' in dir(table_metadata) else None
337-
338325
def _convert_hms_table_to_external(self, src_table: Table) -> bool:
339326
"""Converts a Hive metastore table to external using Spark JVM methods."""
340327
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
341328
inventory_table = self._tables_crawler.full_name
329+
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
342330
try:
343-
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
344331
table_identifier = self._table_identifier(src_table.name, database)
345332
old_table = self._catalog.getTableMetadata(table_identifier)
346-
entity_storage_locations = self._get_entity_storage_locations(old_table)
347-
new_table = self._catalog_table(
348-
old_table.identifier(),
349-
self._catalog_type('EXTERNAL'),
350-
old_table.storage(),
351-
old_table.schema(),
352-
old_table.provider(),
353-
old_table.partitionColumnNames(),
354-
old_table.bucketSpec(),
355-
old_table.owner(),
356-
old_table.createTime(),
357-
old_table.lastAccessTime(),
358-
old_table.createVersion(),
359-
old_table.properties(),
360-
old_table.stats(),
361-
old_table.viewText(),
362-
old_table.comment(),
363-
old_table.unsupportedFeatures(),
364-
old_table.tracksPartitionsInCatalog(),
365-
old_table.schemaPreservesCase(),
366-
old_table.ignoredProperties(),
367-
old_table.viewOriginalText(),
368-
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
369-
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
370-
# is present on the source table then the argument is needed.)
371-
*([entity_storage_locations] if entity_storage_locations is not None else []),
372-
)
333+
# two alternative ways to create the new_table object, one for DBR15 or older one for DBR16.
334+
# Since we can't detect the DBR version from code, we try to detect if the entityStorageLocations
335+
# accessor is present on the source table metadata object.
336+
# This is needed because:
337+
# - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for
338+
# as a constructor parameter for CatalogTable.
339+
# - We need to be compatible with earlier versions of DBR.
340+
# - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent
341+
# methods will be automatically created on the proxy but fail when invoked.
342+
# Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it.
343+
# (After trying to access it, dir() will also include it even though it doesn't exist.)
344+
if 'collation' in dir(old_table):
345+
logger.debug("Detected Collation property on table metadata, assuming DBR16+")
346+
new_table = self._catalog_table(
347+
old_table.identifier(),
348+
self._catalog_type('EXTERNAL'),
349+
old_table.storage(),
350+
old_table.schema(),
351+
old_table.provider(),
352+
old_table.partitionColumnNames(),
353+
old_table.bucketSpec(),
354+
old_table.owner(),
355+
old_table.createTime(),
356+
old_table.lastAccessTime(),
357+
old_table.createVersion(),
358+
old_table.properties(),
359+
old_table.stats(),
360+
old_table.viewText(),
361+
old_table.comment(),
362+
old_table.collation(),
363+
old_table.unsupportedFeatures(),
364+
old_table.tracksPartitionsInCatalog(),
365+
old_table.schemaPreservesCase(),
366+
old_table.ignoredProperties(),
367+
old_table.viewOriginalText(),
368+
old_table.entityStorageLocations(),
369+
old_table.resourceName(),
370+
)
371+
else:
372+
logger.debug("No Collation property on table metadata, assuming DBR15 or older")
373+
new_table = self._catalog_table(
374+
old_table.identifier(),
375+
self._catalog_type('EXTERNAL'),
376+
old_table.storage(),
377+
old_table.schema(),
378+
old_table.provider(),
379+
old_table.partitionColumnNames(),
380+
old_table.bucketSpec(),
381+
old_table.owner(),
382+
old_table.createTime(),
383+
old_table.lastAccessTime(),
384+
old_table.createVersion(),
385+
old_table.properties(),
386+
old_table.stats(),
387+
old_table.viewText(),
388+
old_table.comment(),
389+
old_table.unsupportedFeatures(),
390+
old_table.tracksPartitionsInCatalog(),
391+
old_table.schemaPreservesCase(),
392+
old_table.ignoredProperties(),
393+
old_table.viewOriginalText(),
394+
old_table.entityStorageLocations(),
395+
)
373396
self._catalog.alterTable(new_table)
374397
self._update_table_status(src_table, inventory_table)
375398
except Exception as e: # pylint: disable=broad-exception-caught
@@ -392,7 +415,6 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
392415
return False
393416
try:
394417
old_table = self._catalog.getTableMetadata(table_identifier)
395-
entity_storage_locations = self._get_entity_storage_locations(old_table)
396418
table_location = old_table.storage()
397419
new_location = self._catalog_storage(
398420
self._spark._jvm.scala.Some( # pylint: disable=protected-access
@@ -404,32 +426,58 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
404426
table_location.compressed(),
405427
table_location.properties(),
406428
)
407-
new_table = self._catalog_table(
408-
old_table.identifier(),
409-
old_table.tableType(),
410-
new_location,
411-
old_table.schema(),
412-
old_table.provider(),
413-
old_table.partitionColumnNames(),
414-
old_table.bucketSpec(),
415-
old_table.owner(),
416-
old_table.createTime(),
417-
old_table.lastAccessTime(),
418-
old_table.createVersion(),
419-
old_table.properties(),
420-
old_table.stats(),
421-
old_table.viewText(),
422-
old_table.comment(),
423-
old_table.unsupportedFeatures(),
424-
old_table.tracksPartitionsInCatalog(),
425-
old_table.schemaPreservesCase(),
426-
old_table.ignoredProperties(),
427-
old_table.viewOriginalText(),
428-
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
429-
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
430-
# is present on the source table then the argument is needed.)
431-
*([entity_storage_locations] if entity_storage_locations is not None else []),
432-
)
429+
if 'collation' in dir(old_table):
430+
logger.debug("Detected Collation property on table metadata, assuming DBR16+")
431+
new_table = self._catalog_table(
432+
old_table.identifier(),
433+
old_table.tableType(),
434+
new_location,
435+
old_table.schema(),
436+
old_table.provider(),
437+
old_table.partitionColumnNames(),
438+
old_table.bucketSpec(),
439+
old_table.owner(),
440+
old_table.createTime(),
441+
old_table.lastAccessTime(),
442+
old_table.createVersion(),
443+
old_table.properties(),
444+
old_table.stats(),
445+
old_table.viewText(),
446+
old_table.comment(),
447+
old_table.collation(),
448+
old_table.unsupportedFeatures(),
449+
old_table.tracksPartitionsInCatalog(),
450+
old_table.schemaPreservesCase(),
451+
old_table.ignoredProperties(),
452+
old_table.viewOriginalText(),
453+
old_table.entityStorageLocations(),
454+
old_table.resourceName(),
455+
)
456+
else:
457+
logger.debug("No Collation property on table metadata, assuming DBR15 or older")
458+
new_table = self._catalog_table(
459+
old_table.identifier(),
460+
old_table.tableType(),
461+
new_location,
462+
old_table.schema(),
463+
old_table.provider(),
464+
old_table.partitionColumnNames(),
465+
old_table.bucketSpec(),
466+
old_table.owner(),
467+
old_table.createTime(),
468+
old_table.lastAccessTime(),
469+
old_table.createVersion(),
470+
old_table.properties(),
471+
old_table.stats(),
472+
old_table.viewText(),
473+
old_table.comment(),
474+
old_table.unsupportedFeatures(),
475+
old_table.tracksPartitionsInCatalog(),
476+
old_table.schemaPreservesCase(),
477+
old_table.ignoredProperties(),
478+
old_table.viewOriginalText(),
479+
old_table.entityStorageLocations(),
480+
)
433481
self._catalog.alterTable(new_table)
434482
except Exception as e: # pylint: disable=broad-exception-caught
435483
logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True)

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,81 @@
5050
def mock_pyspark(mocker):
5151
pyspark_sql_session = mocker.Mock()
5252
sys.modules["pyspark.sql.session"] = pyspark_sql_session
53+
yield pyspark_sql_session
54+
55+
56+
@pytest.fixture
57+
def mock_table_location(mocker):
58+
table_location = mocker.Mock()
59+
table_location.inputFormat.return_value = "org.apache.hadoop.mapred.TextInputFormat"
60+
table_location.outputFormat.return_value = "org.apache.hadoop.mapred.TextOutputFormat"
61+
table_location.serde.return_value = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
62+
table_location.compressed.return_value = False
63+
table_location.properties.return_value = {
64+
"serialization.format": "1",
65+
"field.delim": ",",
66+
}
67+
return table_location
68+
69+
70+
@pytest.fixture
71+
def mock_hms_table_dbr15(mocker, mock_table_location):
72+
table_metadata = mocker.Mock()
73+
table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_dbfs"
74+
table_metadata.tableType.return_value = "MANAGED"
75+
table_metadata.storage.return_value = mock_table_location
76+
table_metadata.schema.return_value = "db1_src"
77+
table_metadata.provider.return_value = "DELTA"
78+
table_metadata.partitionColumnNames.return_value = []
79+
table_metadata.bucketSpec.return_value = None
80+
table_metadata.owner.return_value = "owner"
81+
table_metadata.createTime.return_value = 1600000000
82+
table_metadata.lastAccessTime.return_value = None
83+
table_metadata.createVersion.return_value = None
84+
table_metadata.properties.return_value = {}
85+
table_metadata.stats.return_value = None
86+
table_metadata.viewText.return_value = None
87+
table_metadata.comment.return_value = None
88+
table_metadata.unsupportedFeatures.return_value = []
89+
table_metadata.tracksPartitionsInCatalog.return_value = False
90+
table_metadata.schemaPreservesCase.return_value = False
91+
table_metadata.ignoredProperties.return_value = []
92+
table_metadata.viewOriginalText.return_value = None
93+
table_metadata.entityStorageLocations.return_value = {}
94+
mock_catalog = mocker.Mock()
95+
mock_catalog.getTableMetadata.return_value = table_metadata
96+
return mock_catalog
97+
98+
99+
@pytest.fixture
100+
def mock_hms_table_dbr16(mocker, mock_table_location):
101+
table_metadata = mocker.Mock()
102+
table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_mnt"
103+
table_metadata.tableType.return_value = "MANAGED"
104+
table_metadata.storage.return_value = mock_table_location
105+
table_metadata.schema.return_value = "db1_src"
106+
table_metadata.provider.return_value = "DELTA"
107+
table_metadata.partitionColumnNames.return_value = ["dt"]
108+
table_metadata.bucketSpec.return_value = None
109+
table_metadata.owner.return_value = "owner2"
110+
table_metadata.createTime.return_value = 1610000000
111+
table_metadata.lastAccessTime.return_value = None
112+
table_metadata.createVersion.return_value = None
113+
table_metadata.properties.return_value = {"some": "prop"}
114+
table_metadata.stats.return_value = None
115+
table_metadata.viewText.return_value = None
116+
table_metadata.comment.return_value = "test comment"
117+
table_metadata.collation.return_value = None
118+
table_metadata.unsupportedFeatures.return_value = []
119+
table_metadata.tracksPartitionsInCatalog.return_value = True
120+
table_metadata.schemaPreservesCase.return_value = False
121+
table_metadata.ignoredProperties.return_value = []
122+
table_metadata.viewOriginalText.return_value = None
123+
table_metadata.entityStorageLocations.return_value = {"default": "s3://some_location/table"}
124+
table_metadata.resourceName.return_value = "hive_metastore.db1_src.managed_mnt"
125+
mock_catalog = mocker.Mock()
126+
mock_catalog.getTableMetadata.return_value = table_metadata
127+
return mock_catalog
53128

54129

55130
def test_migrate_dbfs_root_tables_should_produce_proper_queries(ws, mock_pyspark):
@@ -204,7 +279,10 @@ def test_migrate_external_tables_should_produce_proper_queries(ws, mock_pyspark)
204279
]
205280

206281

207-
def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspark):
282+
@pytest.mark.parametrize("dbr_16", [True, False])
283+
def test_migrate_managed_table_as_external_tables_with_conversion(
284+
ws, mock_pyspark, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, caplog, mocker
285+
):
208286
errors = {}
209287
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
210288
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
@@ -217,16 +295,47 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa
217295
table_migrate = TablesMigrator(
218296
table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations
219297
)
220-
table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL")
221298

299+
caplog.set_level(logging.DEBUG)
300+
301+
# Mock Spark session to return different table metadata based on DBR version
302+
mock_session = mocker.Mock()
303+
mock_session_state = mocker.Mock()
304+
if dbr_16:
305+
mock_session_state.catalog.return_value = mock_hms_table_dbr16
306+
else:
307+
mock_session_state.catalog.return_value = mock_hms_table_dbr15
308+
mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access
309+
mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session
310+
table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL")
222311
external_locations.resolve_mount.assert_not_called()
223312
migrate_grants.apply.assert_not_called()
224313
assert backend.queries == [
225314
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='db1_src' AND name='managed_other';"
226315
]
316+
if dbr_16:
317+
assert "DBR16" in caplog.text
318+
else:
319+
assert "DBR15" in caplog.text
320+
321+
322+
@pytest.mark.parametrize("dbr_16", [True, False])
323+
def test_convert_wasbs_to_adls_gen2(
324+
ws, mock_pyspark, caplog, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, mocker
325+
):
326+
caplog.set_level(logging.DEBUG)
327+
328+
# Mock Spark session to return different table metadata based on DBR version
329+
mock_session = mocker.Mock()
330+
mock_session_state = mocker.Mock()
331+
if dbr_16:
332+
mock_session_state.catalog.return_value = mock_hms_table_dbr16
333+
else:
334+
mock_session_state.catalog.return_value = mock_hms_table_dbr15
227335

336+
mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access
337+
mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session
228338

229-
def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark):
230339
errors = {}
231340
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
232341
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
@@ -245,6 +354,10 @@ def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark):
245354
assert backend.queries == [
246355
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET location = 'abfss://bucket/test/table1' WHERE catalog='hive_metastore' AND database='db1_src' AND name='wasbs_src';"
247356
]
357+
if dbr_16:
358+
assert "DBR16" in caplog.text
359+
else:
360+
assert "DBR15" in caplog.text
248361

249362

250363
def test_migrate_managed_table_as_external_tables_without_conversion(ws, mock_pyspark):

0 commit comments

Comments
 (0)