From 134c1ebda345cb27ae6f88f76f53daab806caf0d Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Sun, 26 Apr 2026 02:35:41 +0530 Subject: [PATCH 01/12] feat(datalake): add GCS/S3 Iceberg table ingestion support (#22644) --- .../source/database/datalake/clients/gcs.py | 59 ++- .../source/database/datalake/clients/s3.py | 74 +++- .../source/database/datalake/metadata.py | 25 +- .../src/metadata/readers/dataframe/json.py | 15 +- .../metadata/utils/datalake/datalake_utils.py | 20 + .../tests/unit/readers/test_json_reader.py | 80 ++++ .../source/database/test_iceberg_discovery.py | 394 ++++++++++++++++++ 7 files changed, 635 insertions(+), 32 deletions(-) create mode 100644 ingestion/tests/unit/source/database/test_iceberg_discovery.py diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index b3c275ec061f..9fd649494f86 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -14,9 +14,10 @@ """ import os +import re from copy import deepcopy from functools import partial -from typing import Callable, Iterable, List, Optional, Set, Tuple # noqa: UP035 +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 from google.cloud import storage @@ -107,21 +108,61 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str] for bucket in self._client.list_buckets(): yield bucket.name + _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") + + @staticmethod + def _should_skip_gcs_cold_storage(blob) -> bool: + storage_class = getattr(blob, "storage_class", None) + return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES) + + def _classify_gcs_blob( + self, + iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045 + regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045 + blob, + ) -> None: + match = self._ICEBERG_METADATA_RE.match(blob.name) + if match: + table_dir, version = match.group(1), int(match.group(2)) + existing = iceberg_tables.get(table_dir) + if existing is None or version > existing[0]: + iceberg_tables[table_dir] = (version, blob.name, blob.size) + else: + regular_files.append((blob.name, blob.size)) + def get_table_names( self, bucket_name: str, prefix: Optional[str], # noqa: UP045 skip_cold_storage: bool = False, ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 + """ + Lists tables in a GCS bucket using a single pass. + + Iceberg table directories are identified by blobs matching + ``/metadata/v.metadata.json``. Only the blob with the + highest integer version is yielded per table directory. Regular files + not under any Iceberg table directory are also yielded. + """ bucket = self._client.get_bucket(bucket_name) - - for key in bucket.list_blobs(prefix=prefix): - if skip_cold_storage: - storage_class = getattr(key, "storage_class", None) - if storage_class and storage_class in GCS_COLD_STORAGE_CLASSES: - logger.debug(f"Skipping cold storage object: {key.name} (storage_class: {storage_class})") - continue - yield key.name, key.size + iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 + regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006 + + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + logger.debug( + f"Skipping cold storage object: {blob.name} " + f"(storage_class: {getattr(blob, 'storage_class', None)})" + ) + continue + self._classify_gcs_blob(iceberg_tables, regular_files, blob) + + iceberg_dirs = set(iceberg_tables.keys()) + for _, metadata_blob_path, size in iceberg_tables.values(): + yield metadata_blob_path, size + for file_path, size in regular_files: + if not any(file_path.startswith(d + "/") for d in iceberg_dirs): + yield file_path, size def close(self, service_connection): os.environ.pop("GOOGLE_CLOUD_PROJECT", "") diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 68f69225e894..413017381c77 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -13,8 +13,9 @@ Datalake S3 Client """ +import re from functools import partial -from typing import Callable, Iterable, Optional, Set, Tuple # noqa: UP035 +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 from metadata.clients.aws_client import AWSClient from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import ( @@ -61,31 +62,72 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str] for bucket in self._client.list_buckets()["Buckets"]: yield bucket["Name"] + _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") + + @staticmethod + def _should_skip_s3_cold_storage(key: dict) -> bool: + storage_class = key.get("StorageClass", "STANDARD") + archive_status = key.get("ArchiveStatus", "") + return storage_class in S3_COLD_STORAGE_CLASSES or archive_status in { + "ARCHIVE_ACCESS", + "DEEP_ARCHIVE_ACCESS", + } + + def _classify_s3_object( + self, + iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045 + regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045 + key_name: str, + size: Optional[int], # noqa: UP045 + ) -> None: + match = self._ICEBERG_METADATA_RE.match(key_name) + if match: + table_dir, version = match.group(1), int(match.group(2)) + existing = iceberg_tables.get(table_dir) + if existing is None or version > existing[0]: + iceberg_tables[table_dir] = (version, key_name, size) + else: + regular_files.append((key_name, size)) + def get_table_names( self, bucket_name: str, prefix: Optional[str], # noqa: UP045 skip_cold_storage: bool = False, ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 - kwargs = {"Bucket": bucket_name} - + """ + Lists tables in an S3 bucket using a single pass. + + Iceberg table directories are identified by objects matching + ``/metadata/v.metadata.json``. Only the object with the + highest integer version is yielded per table directory. Regular files + not under any Iceberg table directory are also yielded. + """ + kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006 if prefix: kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" + iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 + regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006 + for key in list_s3_objects(self._client, **kwargs): - if skip_cold_storage: - storage_class = key.get("StorageClass", "STANDARD") - archive_status = key.get("ArchiveStatus", "") - if storage_class in S3_COLD_STORAGE_CLASSES or archive_status in { - "ARCHIVE_ACCESS", - "DEEP_ARCHIVE_ACCESS", - }: - logger.debug( - f"Skipping cold storage object: {key['Key']} " - f"(StorageClass: {storage_class}, ArchiveStatus: {archive_status})" - ) - continue - yield key["Key"], key.get("Size") + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + logger.debug( + f"Skipping cold storage object: {key_name} " + f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, " + f"ArchiveStatus: {key.get('ArchiveStatus', '')})" + ) + continue + self._classify_s3_object(iceberg_tables, regular_files, key_name, size) + + iceberg_dirs = set(iceberg_tables.keys()) + for _, metadata_key, size in iceberg_tables.values(): + yield metadata_key, size + for file_path, size in regular_files: + if not any(file_path.startswith(d + "/") for d in iceberg_dirs): + yield file_path, size def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045 for page in self._client.get_paginator("list_objects_v2").paginate( diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 402be49e210a..716b8444d21d 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -67,6 +67,7 @@ DataFrameColumnParser, fetch_dataframe_first_chunk, get_file_format_type, + get_iceberg_table_name_from_metadata_path, ) from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -201,7 +202,7 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab def get_tables_name_and_type( # pylint: disable=too-many-branches self, - ) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int]]]: # noqa: UP006, UP045 + ) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int], str]]: # noqa: UP006, UP045 """ Handle table and views. @@ -238,18 +239,29 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches logger.debug(f"Object filtered due to unsupported file type: {key_name}") continue - yield table_name, TableType.Regular, file_extension, file_size + table_type = ( + TableType.Iceberg + if get_iceberg_table_name_from_metadata_path(key_name) is not None + else TableType.Regular + ) + yield table_name, table_type, file_extension, file_size, key_name def yield_table( self, - table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int]], # noqa: UP006, UP045 + table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int], str], # noqa: UP006, UP045 ) -> Iterable[Either[CreateTableRequest]]: """ From topology. Prepare a table request and pass it to the sink. Uses first chunk only for schema inference to avoid loading entire file. """ - table_name, table_type, table_extension, file_size = table_name_and_type + ( + table_name, + table_type, + table_extension, + file_size, + fetch_key, + ) = table_name_and_type schema_name = self.context.get().database_schema try: table_constraints = None @@ -257,7 +269,7 @@ def yield_table( config_source=self.config_source, client=self.client.client, file_fqn=DatalakeTableSchemaWrapper( - key=table_name, + key=fetch_key, bucket_name=schema_name, file_extension=table_extension, file_size=file_size, @@ -326,7 +338,8 @@ def standardize_table_name( schema: str, table: str, # pylint: disable=unused-argument ) -> str: - return table + iceberg_name = get_iceberg_table_name_from_metadata_path(table) + return iceberg_name if iceberg_name is not None else table def filter_dl_table(self, table_name: str): """Filters Datalake Tables based on filterPattern""" diff --git a/ingestion/src/metadata/readers/dataframe/json.py b/ingestion/src/metadata/readers/dataframe/json.py index de87b4b95b13..120bf17eef18 100644 --- a/ingestion/src/metadata/readers/dataframe/json.py +++ b/ingestion/src/metadata/readers/dataframe/json.py @@ -120,7 +120,20 @@ def _read_json_object( content = content.decode(UTF_8, errors="ignore") if isinstance(content, bytes) else content data = json.loads(content) - raw_data = content if isinstance(data, dict) and data.get("$schema") else None + raw_data = ( + content + if isinstance(data, dict) + and ( + data.get("$schema") is not None # JSON Schema files + or data.get("format-version") + is not None # Apache Iceberg table metadata + or ( # Delta Lake / Iceberg schema structure + isinstance(data.get("schema"), dict) + and isinstance(data.get("schema", {}).get("fields"), list) + ) + ) + else None + ) data = [data] if isinstance(data, dict) else data def chunk_generator(): diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index bd4b903b67cc..a24a7fde21f9 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -17,6 +17,7 @@ import ast import json import random +import re import traceback from typing import Any, Dict, List, Optional, Union, cast # noqa: UP035 @@ -149,6 +150,25 @@ def fetch_dataframe_first_chunk( return None +_ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$") + + +def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> Optional[str]: + """ + Extracts the Iceberg table directory name from a metadata file path. + + Examples: + "warehouse/orders/metadata/v2.metadata.json" -> "orders" + "my_prefix/sales/metadata/v1.metadata.json" -> "sales" + "simple/metadata/v3.metadata.json" -> "simple" + "data/orders.json" -> None + + Returns None if the path does not match the Iceberg metadata pattern. + """ + match = _ICEBERG_METADATA_PATH_RE.search(metadata_path) + return match.group(1) if match else None + + def get_file_format_type(key_name, metadata_entry=None): for supported_types in SupportedTypes: if key_name.lower().endswith(supported_types.value.lower()): diff --git a/ingestion/tests/unit/readers/test_json_reader.py b/ingestion/tests/unit/readers/test_json_reader.py index 6465cee006c5..2cd30eea3182 100644 --- a/ingestion/tests/unit/readers/test_json_reader.py +++ b/ingestion/tests/unit/readers/test_json_reader.py @@ -261,6 +261,86 @@ def test_empty_json_lines(self): total_rows = sum(len(chunk) for chunk in chunks) self.assertEqual(total_rows, 2) + def test_raw_data_set_for_iceberg_metadata(self): + iceberg_metadata = json.dumps( + { + "format-version": 2, + "table-uuid": "abc-123", + "location": "gs://bucket/warehouse/orders", + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "type": "long", "required": True}, + {"id": 2, "name": "name", "type": "string", "required": False}, + ], + }, + } + ).encode("utf-8") + + _, raw_data = JSONDataFrameReader._read_json_object(iceberg_metadata) + + assert raw_data is not None + + def test_iceberg_columns_parsed_correctly(self): + from metadata.utils.datalake.datalake_utils import JsonDataFrameColumnParser + + iceberg_metadata = json.dumps( + { + "format-version": 2, + "table-uuid": "abc-123", + "location": "gs://bucket/warehouse/orders", + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "type": "long", "required": True}, + {"id": 2, "name": "name", "type": "string", "required": False}, + ], + }, + } + ).encode("utf-8") + + _, raw_data = JSONDataFrameReader._read_json_object(iceberg_metadata) + assert raw_data is not None + + import pandas as pd + + from metadata.generated.schema.entity.data.table import DataType + + empty_df = pd.DataFrame() + parser = JsonDataFrameColumnParser(data_frame=empty_df, raw_data=raw_data) + columns = parser.get_columns() + + assert len(columns) == 2 + column_names = [col.name.root for col in columns] + assert "id" in column_names + assert "name" in column_names + + id_col = next(col for col in columns if col.name.root == "id") + name_col = next(col for col in columns if col.name.root == "name") + assert id_col.dataType in {DataType.INT, DataType.BIGINT, DataType.LONG} + assert name_col.dataType in {DataType.STRING, DataType.VARCHAR, DataType.TEXT} + + def test_raw_data_none_for_regular_json(self): + regular_json = json.dumps([{"col1": "val1", "col2": 42}]).encode("utf-8") + + _, raw_data = JSONDataFrameReader._read_json_object(regular_json) + + assert raw_data is None + + def test_raw_data_set_for_json_schema(self): + json_schema = json.dumps( + { + "$schema": "http://json-schema.org/draft-07/schema", + "properties": {"id": {"type": "integer"}}, + } + ).encode("utf-8") + + _, raw_data = JSONDataFrameReader._read_json_object(json_schema) + + assert raw_data is not None + if __name__ == "__main__": unittest.main() diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py new file mode 100644 index 000000000000..861e6b3378c9 --- /dev/null +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -0,0 +1,394 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for Iceberg table directory detection in DatalakeGcsClient and DatalakeS3Client. +""" +import sys +import types +from unittest.mock import MagicMock, patch + +# Stub google.cloud.storage so this test file runs without the google-cloud-storage +# package being installed. The logic under test (_get_iceberg_tables, get_table_names) +# only interacts with the storage client through our own mock objects. +_gcloud_mod = types.ModuleType("google.cloud") +_storage_mod = types.ModuleType("google.cloud.storage") +_storage_mod.Client = MagicMock +sys.modules.setdefault("google", types.ModuleType("google")) +sys.modules["google.cloud"] = _gcloud_mod +sys.modules["google.cloud.storage"] = _storage_mod + +from metadata.ingestion.source.database.datalake.clients.gcs import ( # noqa: E402 + DatalakeGcsClient, +) +from metadata.ingestion.source.database.datalake.clients.s3 import ( # noqa: E402 + DatalakeS3Client, +) + + +def _make_blob( + name: str, size: int = 1024, storage_class: str = "STANDARD" +) -> MagicMock: + blob = MagicMock() + blob.name = name + blob.size = size + blob.storage_class = storage_class + return blob + + +def _make_gcs_client(blobs: list) -> DatalakeGcsClient: + mock_storage_client = MagicMock() + mock_bucket = MagicMock() + mock_storage_client.get_bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = blobs + mock_bucket.get_blob.side_effect = lambda name: next( + (b for b in blobs if b.name == name), None + ) + client = DatalakeGcsClient.__new__(DatalakeGcsClient) + client._client = mock_storage_client + client._temp_credentials_file_path_list = [] + return client + + +class TestGcsIcebergDiscovery: + def test_gcs_iceberg_table_detected(self): + blobs = [ + _make_blob("warehouse/orders/metadata/v1.metadata.json", size=500), + _make_blob("warehouse/orders/metadata/v2.metadata.json", size=600), + _make_blob("warehouse/orders/data/00000-0-abc.parquet", size=8192), + _make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216), + ] + client = _make_gcs_client(blobs) + mock_bucket = client._client.get_bucket("bucket") + + result = client._get_iceberg_tables(mock_bucket, prefix="warehouse") + + assert result == { + "warehouse/orders": "warehouse/orders/metadata/v2.metadata.json" + } + + def test_gcs_iceberg_yields_one_table_per_directory(self): + blobs = [ + _make_blob("warehouse/orders/metadata/v1.metadata.json", size=500), + _make_blob("warehouse/orders/metadata/v2.metadata.json", size=600), + _make_blob("warehouse/orders/data/00000-0-abc.parquet", size=8192), + _make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/v2.metadata.json" + assert size == 600 + + def test_gcs_multiple_iceberg_tables(self): + blobs = [ + _make_blob("warehouse/orders/metadata/v1.metadata.json", size=400), + _make_blob("warehouse/products/metadata/v1.metadata.json", size=500), + _make_blob("warehouse/products/metadata/v2.metadata.json", size=600), + _make_blob("warehouse/orders/data/00000.parquet", size=8192), + _make_blob("warehouse/products/data/00000.parquet", size=4096), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 2 + names = {r[0] for r in results} + assert "warehouse/orders/metadata/v1.metadata.json" in names + assert "warehouse/products/metadata/v2.metadata.json" in names + + def test_gcs_fallback_for_non_iceberg(self): + blobs = [ + _make_blob("data/orders.csv", size=1024), + _make_blob("data/products.parquet", size=2048), + _make_blob("data/users.json", size=512), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix="data")) + + assert len(results) == 3 + names = {r[0] for r in results} + assert "data/orders.csv" in names + assert "data/products.parquet" in names + assert "data/users.json" in names + + def test_gcs_mixed_iceberg_and_regular_files(self): + """ + If ANY Iceberg table is detected, the client switches to Iceberg mode + and yields only Iceberg tables. Regular files in the same bucket are + not yielded in this scan — they are assumed to be data files belonging + to Iceberg tables or unrelated objects outside the warehouse prefix. + This is intentional: mixing Iceberg and non-Iceberg tables in the same + prefix should be avoided in practice. + """ + blobs = [ + _make_blob("warehouse/orders/metadata/v1.metadata.json", size=400), + _make_blob("regular_files/data.csv", size=1024), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix=None)) + + assert len(results) == 1 + name, _ = results[0] + assert name == "warehouse/orders/metadata/v1.metadata.json" + + +class TestS3IcebergDiscovery: + def _make_s3_client(self, keys: list) -> DatalakeS3Client: + mock_boto_client = MagicMock() + client = DatalakeS3Client.__new__(DatalakeS3Client) + client._client = mock_boto_client + client._session = None + + s3_objects = [{"Key": k, "Size": 1024} for k in keys] + + with patch( + "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", + return_value=s3_objects, + ): + result = client._get_iceberg_tables("my-bucket", prefix="warehouse") + + self._mock_boto_client = mock_boto_client + self._s3_objects = s3_objects + return client, result + + def test_s3_iceberg_table_detected(self): + keys = [ + "warehouse/orders/metadata/v1.metadata.json", + "warehouse/orders/metadata/v2.metadata.json", + "warehouse/orders/data/00000-0-abc.parquet", + ] + _, result = self._make_s3_client(keys) + + assert result == { + "warehouse/orders": "warehouse/orders/metadata/v2.metadata.json" + } + + def test_s3_iceberg_yields_one_table_per_directory(self): + keys = [ + "warehouse/orders/metadata/v1.metadata.json", + "warehouse/orders/metadata/v2.metadata.json", + "warehouse/orders/data/00000-0-abc.parquet", + ] + mock_boto_client = MagicMock() + mock_boto_client.head_object.return_value = {"ContentLength": 600} + + client = DatalakeS3Client.__new__(DatalakeS3Client) + client._client = mock_boto_client + client._session = None + + s3_objects = [{"Key": k, "Size": 1024} for k in keys] + + with patch( + "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", + return_value=s3_objects, + ): + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/v2.metadata.json" + assert size == 600 + + def test_s3_fallback_for_non_iceberg(self): + keys = [ + "data/orders.csv", + "data/products.parquet", + "data/users.json", + ] + mock_boto_client = MagicMock() + client = DatalakeS3Client.__new__(DatalakeS3Client) + client._client = mock_boto_client + client._session = None + + s3_objects = [{"Key": k, "Size": 512} for k in keys] + + with patch( + "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", + return_value=s3_objects, + ): + results = list(client.get_table_names("my-bucket", prefix="data")) + + assert len(results) == 3 + names = {r[0] for r in results} + assert "data/orders.csv" in names + assert "data/products.parquet" in names + assert "data/users.json" in names + + +class TestIcebergTableNameHelper: + """Tests for get_iceberg_table_name_from_metadata_path (Slice 3).""" + + def test_iceberg_table_name_extracted_correctly(self): + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + assert ( + get_iceberg_table_name_from_metadata_path( + "warehouse/orders/metadata/v2.metadata.json" + ) + == "orders" + ) + assert ( + get_iceberg_table_name_from_metadata_path( + "my_prefix/sales/metadata/v1.metadata.json" + ) + == "sales" + ) + assert ( + get_iceberg_table_name_from_metadata_path( + "simple/metadata/v3.metadata.json" + ) + == "simple" + ) + + def test_non_iceberg_path_returns_none(self): + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + assert get_iceberg_table_name_from_metadata_path("data/orders.json") is None + assert ( + get_iceberg_table_name_from_metadata_path("warehouse/orders.json") is None + ) + assert get_iceberg_table_name_from_metadata_path("metadata/v1.json") is None + assert ( + get_iceberg_table_name_from_metadata_path("orders/metadata/snapshot.avro") + is None + ) + + def test_table_type_iceberg_for_metadata_files(self): + from metadata.generated.schema.entity.data.table import TableType + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + key_name = "warehouse/orders/metadata/v1.metadata.json" + table_type = ( + TableType.Iceberg + if get_iceberg_table_name_from_metadata_path(key_name) is not None + else TableType.Regular + ) + assert table_type == TableType.Iceberg + + def test_table_type_regular_for_normal_files(self): + from metadata.generated.schema.entity.data.table import TableType + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + for key_name in ["data/orders.parquet", "data/users.csv", "logs/events.json"]: + table_type = ( + TableType.Iceberg + if get_iceberg_table_name_from_metadata_path(key_name) is not None + else TableType.Regular + ) + assert ( + table_type == TableType.Regular + ), f"Expected Regular for {key_name}, got {table_type}" + + +class TestSlice4FetchKeyCorrectness: + """ + Regression tests for Slice 4: verifies that the blob key passed to + fetch_dataframe_first_chunk is always the original metadata path, + not the human-readable display name. + """ + + def test_yield_table_uses_metadata_path_not_display_name(self): + """ + The 5-tuple yielded by get_tables_name_and_type() must carry + key_name (original blob path) separately from table_name (display name). + + For an Iceberg table: + table_name = "orders" (display, from standardize_table_name) + key_name = "warehouse/orders/metadata/v2.metadata.json" (fetch path) + + DatalakeTableSchemaWrapper must be constructed with key=key_name, + NOT key=table_name. + """ + from metadata.generated.schema.entity.data.table import TableType + from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper + from metadata.readers.dataframe.reader_factory import SupportedTypes + + display_name = "orders" + original_key = "warehouse/orders/metadata/v2.metadata.json" + file_extension = SupportedTypes.JSON + file_size = 1024 + + tuple_5 = ( + display_name, + TableType.Iceberg, + file_extension, + file_size, + original_key, + ) + table_name, table_type, table_extension, t_file_size, fetch_key = tuple_5 + + wrapper = DatalakeTableSchemaWrapper( + key=fetch_key, + bucket_name="my-bucket", + file_extension=table_extension, + file_size=t_file_size, + ) + + assert ( + wrapper.key == original_key + ), f"fetch key should be original blob path, got {wrapper.key!r}" + assert ( + wrapper.key != display_name + ), f"fetch key must NOT be the display name '{display_name}'" + assert table_name == display_name + + def test_non_iceberg_fetch_key_equals_table_name(self): + """ + For non-Iceberg tables, key_name == table_name (standardize_table_name + returns the path unchanged), so the 5-tuple element is redundant but + harmless. This test confirms the invariant holds. + """ + from metadata.generated.schema.entity.data.table import TableType + from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper + from metadata.readers.dataframe.reader_factory import SupportedTypes + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + key_name = "data/orders.parquet" + table_name = ( + key_name # standardize_table_name returns unchanged for non-Iceberg + ) + + assert get_iceberg_table_name_from_metadata_path(key_name) is None + + tuple_5 = ( + table_name, + TableType.Regular, + SupportedTypes.PARQUET, + 2048, + key_name, + ) + _, _, _, _, fetch_key = tuple_5 + + wrapper = DatalakeTableSchemaWrapper( + key=fetch_key, + bucket_name="my-bucket", + file_extension=SupportedTypes.PARQUET, + file_size=2048, + ) + + assert wrapper.key == "data/orders.parquet" + assert wrapper.key == table_name From b7b8904ca57847352e11cc4c3c11c084009c099f Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Sun, 26 Apr 2026 03:23:53 +0530 Subject: [PATCH 02/12] fix(datalake): address gitar-bot review on Iceberg ingestion (#22644) - Integer version comparison (v10 > v9) via regex group capture - Single-pass listing: eliminates double bucket scan for non-Iceberg buckets - Mixed buckets: regular files outside Iceberg dirs are now yielded - Removes extra head_object/get_blob API calls (use listing size directly) - Fix get_tables_name_and_type return type annotation to 5-tuple - Update tests: remove _get_iceberg_tables direct calls, add v10 regression --- .../source/database/test_iceberg_discovery.py | 123 +++++++++++------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py index 861e6b3378c9..d7f9b17b0242 100644 --- a/ingestion/tests/unit/source/database/test_iceberg_discovery.py +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -60,6 +60,7 @@ def _make_gcs_client(blobs: list) -> DatalakeGcsClient: class TestGcsIcebergDiscovery: def test_gcs_iceberg_table_detected(self): + """Latest version of Iceberg metadata is yielded; data/ blobs are suppressed.""" blobs = [ _make_blob("warehouse/orders/metadata/v1.metadata.json", size=500), _make_blob("warehouse/orders/metadata/v2.metadata.json", size=600), @@ -67,13 +68,13 @@ def test_gcs_iceberg_table_detected(self): _make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216), ] client = _make_gcs_client(blobs) - mock_bucket = client._client.get_bucket("bucket") - result = client._get_iceberg_tables(mock_bucket, prefix="warehouse") + results = list(client.get_table_names("my-bucket", prefix="warehouse")) - assert result == { - "warehouse/orders": "warehouse/orders/metadata/v2.metadata.json" - } + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/v2.metadata.json" + assert size == 600 def test_gcs_iceberg_yields_one_table_per_directory(self): blobs = [ @@ -126,12 +127,9 @@ def test_gcs_fallback_for_non_iceberg(self): def test_gcs_mixed_iceberg_and_regular_files(self): """ - If ANY Iceberg table is detected, the client switches to Iceberg mode - and yields only Iceberg tables. Regular files in the same bucket are - not yielded in this scan — they are assumed to be data files belonging - to Iceberg tables or unrelated objects outside the warehouse prefix. - This is intentional: mixing Iceberg and non-Iceberg tables in the same - prefix should be avoided in practice. + Regular files NOT under any Iceberg table directory are yielded + alongside the Iceberg metadata entries. Only blobs that fall under + an Iceberg table's own subdirectory (data/, metadata/) are suppressed. """ blobs = [ _make_blob("warehouse/orders/metadata/v1.metadata.json", size=400), @@ -141,41 +139,63 @@ def test_gcs_mixed_iceberg_and_regular_files(self): results = list(client.get_table_names("my-bucket", prefix=None)) + assert len(results) == 2 + names = {r[0] for r in results} + assert "warehouse/orders/metadata/v1.metadata.json" in names + assert "regular_files/data.csv" in names + + def test_gcs_iceberg_version_comparison_v10(self): + """v10 must beat v9 — lexicographic comparison would fail here.""" + blobs = [ + _make_blob("warehouse/orders/metadata/v9.metadata.json", size=500), + _make_blob("warehouse/orders/metadata/v10.metadata.json", size=600), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + assert len(results) == 1 - name, _ = results[0] - assert name == "warehouse/orders/metadata/v1.metadata.json" + name, size = results[0] + assert name == "warehouse/orders/metadata/v10.metadata.json" + assert size == 600 class TestS3IcebergDiscovery: - def _make_s3_client(self, keys: list) -> DatalakeS3Client: + def _make_s3_client(self, keys: list, sizes: dict = None) -> DatalakeS3Client: + """Helper: create a DatalakeS3Client backed by a mocked boto3 client.""" mock_boto_client = MagicMock() client = DatalakeS3Client.__new__(DatalakeS3Client) client._client = mock_boto_client client._session = None - - s3_objects = [{"Key": k, "Size": 1024} for k in keys] - - with patch( - "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", - return_value=s3_objects, - ): - result = client._get_iceberg_tables("my-bucket", prefix="warehouse") - self._mock_boto_client = mock_boto_client - self._s3_objects = s3_objects - return client, result + sizes = sizes or {} + self._s3_objects = [ + {"Key": k, "Size": sizes.get(k, 1024)} for k in keys + ] + return client def test_s3_iceberg_table_detected(self): + """Latest version of Iceberg metadata is yielded; data/ blobs are suppressed.""" keys = [ "warehouse/orders/metadata/v1.metadata.json", "warehouse/orders/metadata/v2.metadata.json", "warehouse/orders/data/00000-0-abc.parquet", ] - _, result = self._make_s3_client(keys) + client = self._make_s3_client( + keys, + sizes={"warehouse/orders/metadata/v2.metadata.json": 600}, + ) - assert result == { - "warehouse/orders": "warehouse/orders/metadata/v2.metadata.json" - } + with patch( + "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", + return_value=self._s3_objects, + ): + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/v2.metadata.json" + assert size == 600 def test_s3_iceberg_yields_one_table_per_directory(self): keys = [ @@ -183,18 +203,14 @@ def test_s3_iceberg_yields_one_table_per_directory(self): "warehouse/orders/metadata/v2.metadata.json", "warehouse/orders/data/00000-0-abc.parquet", ] - mock_boto_client = MagicMock() - mock_boto_client.head_object.return_value = {"ContentLength": 600} - - client = DatalakeS3Client.__new__(DatalakeS3Client) - client._client = mock_boto_client - client._session = None - - s3_objects = [{"Key": k, "Size": 1024} for k in keys] + client = self._make_s3_client( + keys, + sizes={"warehouse/orders/metadata/v2.metadata.json": 600}, + ) with patch( "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", - return_value=s3_objects, + return_value=self._s3_objects, ): results = list(client.get_table_names("my-bucket", prefix="warehouse")) @@ -209,16 +225,11 @@ def test_s3_fallback_for_non_iceberg(self): "data/products.parquet", "data/users.json", ] - mock_boto_client = MagicMock() - client = DatalakeS3Client.__new__(DatalakeS3Client) - client._client = mock_boto_client - client._session = None - - s3_objects = [{"Key": k, "Size": 512} for k in keys] + client = self._make_s3_client(keys) with patch( "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", - return_value=s3_objects, + return_value=self._s3_objects, ): results = list(client.get_table_names("my-bucket", prefix="data")) @@ -228,6 +239,28 @@ def test_s3_fallback_for_non_iceberg(self): assert "data/products.parquet" in names assert "data/users.json" in names + def test_s3_iceberg_version_comparison_v10(self): + """v10 must beat v9 — lexicographic comparison would fail here.""" + keys = [ + "warehouse/orders/metadata/v9.metadata.json", + "warehouse/orders/metadata/v10.metadata.json", + ] + client = self._make_s3_client( + keys, + sizes={"warehouse/orders/metadata/v10.metadata.json": 600}, + ) + + with patch( + "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", + return_value=self._s3_objects, + ): + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/v10.metadata.json" + assert size == 600 + class TestIcebergTableNameHelper: """Tests for get_iceberg_table_name_from_metadata_path (Slice 3).""" From 8082b4e2a75baf8cb0916807dd2dbe42a4e97759 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Tue, 28 Apr 2026 23:55:35 +0530 Subject: [PATCH 03/12] fix(datalake): address Copilot + gitar-bot findings on Iceberg ingestion - Fix _is_json_lines false-positive: minified single-line Iceberg/Delta metadata dicts were classified as JSONL, bypassing the raw_data gate entirely. Now all three detection conditions (format-version, schema.fields, \) are checked. - Move _ICEBERG_METADATA_RE and _update_iceberg_entry to DatalakeBaseClient to eliminate regex/classify duplication between GCS and S3 clients (DRY) - Replace single-pass O(N) memory approach with two-pass streaming: pass 1 builds iceberg_tables dict only (O(tables)), pass 2 streams regular files without accumulation (O(1) per object) - Fix sys.modules stub in test_iceberg_discovery.py: use setdefault for all three google module entries to avoid overwriting real installed packages --- .../source/database/datalake/clients/base.py | 25 +++++++++- .../source/database/datalake/clients/gcs.py | 41 ++++++----------- .../source/database/datalake/clients/s3.py | 46 +++++++------------ .../src/metadata/readers/dataframe/json.py | 12 ++++- .../source/database/test_iceberg_discovery.py | 21 +++++---- 5 files changed, 77 insertions(+), 68 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py index 74a2aba784c3..664b0142a606 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py @@ -13,13 +13,36 @@ Datalake Base Client """ +import re from abc import ABC, abstractmethod -from typing import Any, Callable, Iterable, Optional, Tuple # noqa: UP035 +from typing import Any, Callable, Dict, Iterable, Optional, Tuple # noqa: UP035 class DatalakeBaseClient(ABC): """Base DL client implementation""" + _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") + + def _update_iceberg_entry( + self, + iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045 + name: str, + size: Optional[int], # noqa: UP045 + ) -> bool: + """ + If name matches the Iceberg metadata pattern, update iceberg_tables with + the highest-version entry and return True. Otherwise return False. + """ + match = self._ICEBERG_METADATA_RE.match(name) + if not match: + return False + table_dir, version = match.group(1), int(match.group(2)) + existing = iceberg_tables.get(table_dir) + if existing is None or version > existing[0]: + iceberg_tables[table_dir] = (version, name, size) + return True + + def __init__(self, client: Any, session: Any = None, **kwargs): self._client = client self._session = session diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index 9fd649494f86..c70b24ad6406 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -14,7 +14,6 @@ """ import os -import re from copy import deepcopy from functools import partial from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 @@ -108,28 +107,11 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str] for bucket in self._client.list_buckets(): yield bucket.name - _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") - @staticmethod def _should_skip_gcs_cold_storage(blob) -> bool: storage_class = getattr(blob, "storage_class", None) return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES) - def _classify_gcs_blob( - self, - iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045 - regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045 - blob, - ) -> None: - match = self._ICEBERG_METADATA_RE.match(blob.name) - if match: - table_dir, version = match.group(1), int(match.group(2)) - existing = iceberg_tables.get(table_dir) - if existing is None or version > existing[0]: - iceberg_tables[table_dir] = (version, blob.name, blob.size) - else: - regular_files.append((blob.name, blob.size)) - def get_table_names( self, bucket_name: str, @@ -137,16 +119,14 @@ def get_table_names( skip_cold_storage: bool = False, ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 """ - Lists tables in a GCS bucket using a single pass. + Lists tables in a GCS bucket using a two-pass approach. - Iceberg table directories are identified by blobs matching - ``/metadata/v.metadata.json``. Only the blob with the - highest integer version is yielded per table directory. Regular files - not under any Iceberg table directory are also yielded. + Pass 1 collects only the Iceberg table dict (memory proportional to the + number of Iceberg tables, which is always small). Pass 2 streams regular + files without accumulation, keeping memory overhead at O(1) per object. """ bucket = self._client.get_bucket(bucket_name) iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 - regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006 for blob in bucket.list_blobs(prefix=prefix): if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): @@ -155,14 +135,19 @@ def get_table_names( f"(storage_class: {getattr(blob, 'storage_class', None)})" ) continue - self._classify_gcs_blob(iceberg_tables, regular_files, blob) + self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) iceberg_dirs = set(iceberg_tables.keys()) for _, metadata_blob_path, size in iceberg_tables.values(): yield metadata_blob_path, size - for file_path, size in regular_files: - if not any(file_path.startswith(d + "/") for d in iceberg_dirs): - yield file_path, size + + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + continue + if not self._ICEBERG_METADATA_RE.match(blob.name) and not any( + blob.name.startswith(d + "/") for d in iceberg_dirs + ): + yield blob.name, blob.size def close(self, service_connection): os.environ.pop("GOOGLE_CLOUD_PROJECT", "") diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 413017381c77..025ae057725c 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -13,9 +13,8 @@ Datalake S3 Client """ -import re from functools import partial -from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 +from typing import Callable, Dict, Iterable, Optional, Set, Tuple # noqa: UP035 from metadata.clients.aws_client import AWSClient from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import ( @@ -62,8 +61,6 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str] for bucket in self._client.list_buckets()["Buckets"]: yield bucket["Name"] - _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") - @staticmethod def _should_skip_s3_cold_storage(key: dict) -> bool: storage_class = key.get("StorageClass", "STANDARD") @@ -73,22 +70,6 @@ def _should_skip_s3_cold_storage(key: dict) -> bool: "DEEP_ARCHIVE_ACCESS", } - def _classify_s3_object( - self, - iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045 - regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045 - key_name: str, - size: Optional[int], # noqa: UP045 - ) -> None: - match = self._ICEBERG_METADATA_RE.match(key_name) - if match: - table_dir, version = match.group(1), int(match.group(2)) - existing = iceberg_tables.get(table_dir) - if existing is None or version > existing[0]: - iceberg_tables[table_dir] = (version, key_name, size) - else: - regular_files.append((key_name, size)) - def get_table_names( self, bucket_name: str, @@ -96,19 +77,17 @@ def get_table_names( skip_cold_storage: bool = False, ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 """ - Lists tables in an S3 bucket using a single pass. + Lists tables in an S3 bucket using a two-pass approach. - Iceberg table directories are identified by objects matching - ``/metadata/v.metadata.json``. Only the object with the - highest integer version is yielded per table directory. Regular files - not under any Iceberg table directory are also yielded. + Pass 1 collects only the Iceberg table dict (memory proportional to the + number of Iceberg tables, which is always small). Pass 2 streams regular + files without accumulation, keeping memory overhead at O(1) per object. """ kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006 if prefix: kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 - regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006 for key in list_s3_objects(self._client, **kwargs): key_name = key["Key"] @@ -120,14 +99,21 @@ def get_table_names( f"ArchiveStatus: {key.get('ArchiveStatus', '')})" ) continue - self._classify_s3_object(iceberg_tables, regular_files, key_name, size) + self._update_iceberg_entry(iceberg_tables, key_name, size) iceberg_dirs = set(iceberg_tables.keys()) for _, metadata_key, size in iceberg_tables.values(): yield metadata_key, size - for file_path, size in regular_files: - if not any(file_path.startswith(d + "/") for d in iceberg_dirs): - yield file_path, size + + for key in list_s3_objects(self._client, **kwargs): + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + continue + if not self._ICEBERG_METADATA_RE.match(key_name) and not any( + key_name.startswith(d + "/") for d in iceberg_dirs + ): + yield key_name, size def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045 for page in self._client.get_paginator("list_objects_v2").paginate( diff --git a/ingestion/src/metadata/readers/dataframe/json.py b/ingestion/src/metadata/readers/dataframe/json.py index 120bf17eef18..d451730c613b 100644 --- a/ingestion/src/metadata/readers/dataframe/json.py +++ b/ingestion/src/metadata/readers/dataframe/json.py @@ -153,7 +153,17 @@ def _is_json_lines(file_obj) -> bool: return True try: obj = json.loads(first_line) - return isinstance(obj, dict) and not obj.get("$schema") + if not isinstance(obj, dict): + return False + if obj.get("$schema") is not None: + return False + if obj.get("format-version") is not None: + return False + if isinstance(obj.get("schema"), dict) and isinstance( + obj.get("schema", {}).get("fields"), list + ): + return False + return True except json.JSONDecodeError: return False diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py index d7f9b17b0242..d049350a8040 100644 --- a/ingestion/tests/unit/source/database/test_iceberg_discovery.py +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -17,14 +17,19 @@ from unittest.mock import MagicMock, patch # Stub google.cloud.storage so this test file runs without the google-cloud-storage -# package being installed. The logic under test (_get_iceberg_tables, get_table_names) -# only interacts with the storage client through our own mock objects. -_gcloud_mod = types.ModuleType("google.cloud") -_storage_mod = types.ModuleType("google.cloud.storage") -_storage_mod.Client = MagicMock -sys.modules.setdefault("google", types.ModuleType("google")) -sys.modules["google.cloud"] = _gcloud_mod -sys.modules["google.cloud.storage"] = _storage_mod +# package being installed. setdefault preserves the real module if it is already +# present, which prevents breaking other tests or masking integration issues. +_google_mod = sys.modules.setdefault("google", types.ModuleType("google")) +_gcloud_mod = sys.modules.setdefault("google.cloud", types.ModuleType("google.cloud")) +_storage_mod = sys.modules.setdefault( + "google.cloud.storage", types.ModuleType("google.cloud.storage") +) +if not hasattr(_storage_mod, "Client"): + _storage_mod.Client = MagicMock +if not hasattr(_google_mod, "cloud"): + _google_mod.cloud = _gcloud_mod +if not hasattr(_gcloud_mod, "storage"): + _gcloud_mod.storage = _storage_mod from metadata.ingestion.source.database.datalake.clients.gcs import ( # noqa: E402 DatalakeGcsClient, From b85aacc29e204d267740b272549bcfdd65d26111 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 29 Apr 2026 00:15:45 +0530 Subject: [PATCH 04/12] fix(datalake): hybrid two-pass listing, ruff checkstyle, and Copilot findings - Hybrid two-pass: non-Iceberg buckets (common case) skip regex/set overhead in pass 2; Iceberg buckets apply directory filter as before - Move _ICEBERG_METADATA_RE and _update_iceberg_entry to DatalakeBaseClient (DRY) - Fix _is_json_lines false-positive for minified Iceberg/Delta metadata dicts - Fix sys.modules stub to use setdefault (avoids overwriting real google packages) - Fix ruff: SIM103, TRY300 in json.py; RUF013, RUF059 in test file --- .../source/database/datalake/clients/base.py | 1 - .../source/database/datalake/clients/gcs.py | 23 +++--- .../source/database/datalake/clients/s3.py | 26 ++++--- .../src/metadata/readers/dataframe/json.py | 17 ++--- .../metadata/utils/datalake/datalake_utils.py | 2 +- .../source/database/test_iceberg_discovery.py | 71 +++++-------------- 6 files changed, 55 insertions(+), 85 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py index 664b0142a606..86f6c41371e9 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py @@ -42,7 +42,6 @@ def _update_iceberg_entry( iceberg_tables[table_dir] = (version, name, size) return True - def __init__(self, client: Any, session: Any = None, **kwargs): self._client = client self._session = session diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index c70b24ad6406..e683d6529d53 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -126,13 +126,12 @@ def get_table_names( files without accumulation, keeping memory overhead at O(1) per object. """ bucket = self._client.get_bucket(bucket_name) - iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 + iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 for blob in bucket.list_blobs(prefix=prefix): if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): logger.debug( - f"Skipping cold storage object: {blob.name} " - f"(storage_class: {getattr(blob, 'storage_class', None)})" + f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})" ) continue self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) @@ -141,13 +140,19 @@ def get_table_names( for _, metadata_blob_path, size in iceberg_tables.values(): yield metadata_blob_path, size - for blob in bucket.list_blobs(prefix=prefix): - if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): - continue - if not self._ICEBERG_METADATA_RE.match(blob.name) and not any( - blob.name.startswith(d + "/") for d in iceberg_dirs - ): + if not iceberg_dirs: + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + continue yield blob.name, blob.size + else: + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + continue + if not self._ICEBERG_METADATA_RE.match(blob.name) and not any( + blob.name.startswith(d + "/") for d in iceberg_dirs + ): + yield blob.name, blob.size def close(self, service_connection): os.environ.pop("GOOGLE_CLOUD_PROJECT", "") diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 025ae057725c..354c33ea5c7f 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -87,7 +87,7 @@ def get_table_names( if prefix: kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" - iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006 + iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 for key in list_s3_objects(self._client, **kwargs): key_name = key["Key"] @@ -105,15 +105,23 @@ def get_table_names( for _, metadata_key, size in iceberg_tables.values(): yield metadata_key, size - for key in list_s3_objects(self._client, **kwargs): - key_name = key["Key"] - size = key.get("Size") - if skip_cold_storage and self._should_skip_s3_cold_storage(key): - continue - if not self._ICEBERG_METADATA_RE.match(key_name) and not any( - key_name.startswith(d + "/") for d in iceberg_dirs - ): + if not iceberg_dirs: + for key in list_s3_objects(self._client, **kwargs): + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + continue yield key_name, size + else: + for key in list_s3_objects(self._client, **kwargs): + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + continue + if not self._ICEBERG_METADATA_RE.match(key_name) and not any( + key_name.startswith(d + "/") for d in iceberg_dirs + ): + yield key_name, size def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045 for page in self._client.get_paginator("list_objects_v2").paginate( diff --git a/ingestion/src/metadata/readers/dataframe/json.py b/ingestion/src/metadata/readers/dataframe/json.py index d451730c613b..7033e7e21753 100644 --- a/ingestion/src/metadata/readers/dataframe/json.py +++ b/ingestion/src/metadata/readers/dataframe/json.py @@ -125,11 +125,9 @@ def _read_json_object( if isinstance(data, dict) and ( data.get("$schema") is not None # JSON Schema files - or data.get("format-version") - is not None # Apache Iceberg table metadata + or data.get("format-version") is not None # Apache Iceberg table metadata or ( # Delta Lake / Iceberg schema structure - isinstance(data.get("schema"), dict) - and isinstance(data.get("schema", {}).get("fields"), list) + isinstance(data.get("schema"), dict) and isinstance(data.get("schema", {}).get("fields"), list) ) ) else None @@ -153,19 +151,16 @@ def _is_json_lines(file_obj) -> bool: return True try: obj = json.loads(first_line) + except json.JSONDecodeError: + return False + else: if not isinstance(obj, dict): return False if obj.get("$schema") is not None: return False if obj.get("format-version") is not None: return False - if isinstance(obj.get("schema"), dict) and isinstance( - obj.get("schema", {}).get("fields"), list - ): - return False - return True - except json.JSONDecodeError: - return False + return not (isinstance(obj.get("schema"), dict) and isinstance(obj.get("schema", {}).get("fields"), list)) def _read_json_smart( self, diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index a24a7fde21f9..85873bf3c671 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -153,7 +153,7 @@ def fetch_dataframe_first_chunk( _ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$") -def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> Optional[str]: +def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> str | None: """ Extracts the Iceberg table directory name from a metadata file path. diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py index d049350a8040..1784eb4eaeaa 100644 --- a/ingestion/tests/unit/source/database/test_iceberg_discovery.py +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -12,6 +12,7 @@ """ Tests for Iceberg table directory detection in DatalakeGcsClient and DatalakeS3Client. """ + import sys import types from unittest.mock import MagicMock, patch @@ -21,9 +22,7 @@ # present, which prevents breaking other tests or masking integration issues. _google_mod = sys.modules.setdefault("google", types.ModuleType("google")) _gcloud_mod = sys.modules.setdefault("google.cloud", types.ModuleType("google.cloud")) -_storage_mod = sys.modules.setdefault( - "google.cloud.storage", types.ModuleType("google.cloud.storage") -) +_storage_mod = sys.modules.setdefault("google.cloud.storage", types.ModuleType("google.cloud.storage")) if not hasattr(_storage_mod, "Client"): _storage_mod.Client = MagicMock if not hasattr(_google_mod, "cloud"): @@ -39,9 +38,7 @@ ) -def _make_blob( - name: str, size: int = 1024, storage_class: str = "STANDARD" -) -> MagicMock: +def _make_blob(name: str, size: int = 1024, storage_class: str = "STANDARD") -> MagicMock: blob = MagicMock() blob.name = name blob.size = size @@ -54,9 +51,7 @@ def _make_gcs_client(blobs: list) -> DatalakeGcsClient: mock_bucket = MagicMock() mock_storage_client.get_bucket.return_value = mock_bucket mock_bucket.list_blobs.return_value = blobs - mock_bucket.get_blob.side_effect = lambda name: next( - (b for b in blobs if b.name == name), None - ) + mock_bucket.get_blob.side_effect = lambda name: next((b for b in blobs if b.name == name), None) client = DatalakeGcsClient.__new__(DatalakeGcsClient) client._client = mock_storage_client client._temp_credentials_file_path_list = [] @@ -166,7 +161,7 @@ def test_gcs_iceberg_version_comparison_v10(self): class TestS3IcebergDiscovery: - def _make_s3_client(self, keys: list, sizes: dict = None) -> DatalakeS3Client: + def _make_s3_client(self, keys: list, sizes: dict | None = None) -> DatalakeS3Client: """Helper: create a DatalakeS3Client backed by a mocked boto3 client.""" mock_boto_client = MagicMock() client = DatalakeS3Client.__new__(DatalakeS3Client) @@ -174,9 +169,7 @@ def _make_s3_client(self, keys: list, sizes: dict = None) -> DatalakeS3Client: client._session = None self._mock_boto_client = mock_boto_client sizes = sizes or {} - self._s3_objects = [ - {"Key": k, "Size": sizes.get(k, 1024)} for k in keys - ] + self._s3_objects = [{"Key": k, "Size": sizes.get(k, 1024)} for k in keys] return client def test_s3_iceberg_table_detected(self): @@ -275,24 +268,9 @@ def test_iceberg_table_name_extracted_correctly(self): get_iceberg_table_name_from_metadata_path, ) - assert ( - get_iceberg_table_name_from_metadata_path( - "warehouse/orders/metadata/v2.metadata.json" - ) - == "orders" - ) - assert ( - get_iceberg_table_name_from_metadata_path( - "my_prefix/sales/metadata/v1.metadata.json" - ) - == "sales" - ) - assert ( - get_iceberg_table_name_from_metadata_path( - "simple/metadata/v3.metadata.json" - ) - == "simple" - ) + assert get_iceberg_table_name_from_metadata_path("warehouse/orders/metadata/v2.metadata.json") == "orders" + assert get_iceberg_table_name_from_metadata_path("my_prefix/sales/metadata/v1.metadata.json") == "sales" + assert get_iceberg_table_name_from_metadata_path("simple/metadata/v3.metadata.json") == "simple" def test_non_iceberg_path_returns_none(self): from metadata.utils.datalake.datalake_utils import ( @@ -300,14 +278,9 @@ def test_non_iceberg_path_returns_none(self): ) assert get_iceberg_table_name_from_metadata_path("data/orders.json") is None - assert ( - get_iceberg_table_name_from_metadata_path("warehouse/orders.json") is None - ) + assert get_iceberg_table_name_from_metadata_path("warehouse/orders.json") is None assert get_iceberg_table_name_from_metadata_path("metadata/v1.json") is None - assert ( - get_iceberg_table_name_from_metadata_path("orders/metadata/snapshot.avro") - is None - ) + assert get_iceberg_table_name_from_metadata_path("orders/metadata/snapshot.avro") is None def test_table_type_iceberg_for_metadata_files(self): from metadata.generated.schema.entity.data.table import TableType @@ -317,9 +290,7 @@ def test_table_type_iceberg_for_metadata_files(self): key_name = "warehouse/orders/metadata/v1.metadata.json" table_type = ( - TableType.Iceberg - if get_iceberg_table_name_from_metadata_path(key_name) is not None - else TableType.Regular + TableType.Iceberg if get_iceberg_table_name_from_metadata_path(key_name) is not None else TableType.Regular ) assert table_type == TableType.Iceberg @@ -335,9 +306,7 @@ def test_table_type_regular_for_normal_files(self): if get_iceberg_table_name_from_metadata_path(key_name) is not None else TableType.Regular ) - assert ( - table_type == TableType.Regular - ), f"Expected Regular for {key_name}, got {table_type}" + assert table_type == TableType.Regular, f"Expected Regular for {key_name}, got {table_type}" class TestSlice4FetchKeyCorrectness: @@ -375,7 +344,7 @@ def test_yield_table_uses_metadata_path_not_display_name(self): file_size, original_key, ) - table_name, table_type, table_extension, t_file_size, fetch_key = tuple_5 + table_name, _table_type, table_extension, t_file_size, fetch_key = tuple_5 wrapper = DatalakeTableSchemaWrapper( key=fetch_key, @@ -384,12 +353,8 @@ def test_yield_table_uses_metadata_path_not_display_name(self): file_size=t_file_size, ) - assert ( - wrapper.key == original_key - ), f"fetch key should be original blob path, got {wrapper.key!r}" - assert ( - wrapper.key != display_name - ), f"fetch key must NOT be the display name '{display_name}'" + assert wrapper.key == original_key, f"fetch key should be original blob path, got {wrapper.key!r}" + assert wrapper.key != display_name, f"fetch key must NOT be the display name '{display_name}'" assert table_name == display_name def test_non_iceberg_fetch_key_equals_table_name(self): @@ -406,9 +371,7 @@ def test_non_iceberg_fetch_key_equals_table_name(self): ) key_name = "data/orders.parquet" - table_name = ( - key_name # standardize_table_name returns unchanged for non-Iceberg - ) + table_name = key_name # standardize_table_name returns unchanged for non-Iceberg assert get_iceberg_table_name_from_metadata_path(key_name) is None From caef18a37b003f7436314e6ee94029a70e0a8b81 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 29 Apr 2026 00:29:13 +0530 Subject: [PATCH 05/12] fix(datalake): cold-storage Iceberg dir detection and tighten JSON classification --- .../ingestion/source/database/datalake/clients/gcs.py | 9 +++++++-- .../ingestion/source/database/datalake/clients/s3.py | 9 +++++++-- ingestion/src/metadata/readers/dataframe/json.py | 7 +------ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index e683d6529d53..eb931fc978a4 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -127,16 +127,21 @@ def get_table_names( """ bucket = self._client.get_bucket(bucket_name) iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 + cold_iceberg_dirs: Set[str] = set() # noqa: UP006 for blob in bucket.list_blobs(prefix=prefix): - if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + is_cold = skip_cold_storage and self._should_skip_gcs_cold_storage(blob) + if is_cold: logger.debug( f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})" ) + match = self._ICEBERG_METADATA_RE.match(blob.name) + if match: + cold_iceberg_dirs.add(match.group(1)) continue self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) - iceberg_dirs = set(iceberg_tables.keys()) + iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs for _, metadata_blob_path, size in iceberg_tables.values(): yield metadata_blob_path, size diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 354c33ea5c7f..3542120337ed 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -88,20 +88,25 @@ def get_table_names( kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 + cold_iceberg_dirs: Set[str] = set() # noqa: UP006 for key in list_s3_objects(self._client, **kwargs): key_name = key["Key"] size = key.get("Size") - if skip_cold_storage and self._should_skip_s3_cold_storage(key): + is_cold = skip_cold_storage and self._should_skip_s3_cold_storage(key) + if is_cold: logger.debug( f"Skipping cold storage object: {key_name} " f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, " f"ArchiveStatus: {key.get('ArchiveStatus', '')})" ) + match = self._ICEBERG_METADATA_RE.match(key_name) + if match: + cold_iceberg_dirs.add(match.group(1)) continue self._update_iceberg_entry(iceberg_tables, key_name, size) - iceberg_dirs = set(iceberg_tables.keys()) + iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs for _, metadata_key, size in iceberg_tables.values(): yield metadata_key, size diff --git a/ingestion/src/metadata/readers/dataframe/json.py b/ingestion/src/metadata/readers/dataframe/json.py index 7033e7e21753..4b8785efe916 100644 --- a/ingestion/src/metadata/readers/dataframe/json.py +++ b/ingestion/src/metadata/readers/dataframe/json.py @@ -126,9 +126,6 @@ def _read_json_object( and ( data.get("$schema") is not None # JSON Schema files or data.get("format-version") is not None # Apache Iceberg table metadata - or ( # Delta Lake / Iceberg schema structure - isinstance(data.get("schema"), dict) and isinstance(data.get("schema", {}).get("fields"), list) - ) ) else None ) @@ -158,9 +155,7 @@ def _is_json_lines(file_obj) -> bool: return False if obj.get("$schema") is not None: return False - if obj.get("format-version") is not None: - return False - return not (isinstance(obj.get("schema"), dict) and isinstance(obj.get("schema", {}).get("fields"), list)) + return obj.get("format-version") is None def _read_json_smart( self, From 6f8314069183747fa0027a0782aa1ec1a8953858 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 29 Apr 2026 22:17:51 +0530 Subject: [PATCH 06/12] fix(datalake): reduce SonarQube cognitive complexity to pass quality gate --- .../source/database/datalake/clients/gcs.py | 67 +++++++++------- .../source/database/datalake/clients/s3.py | 77 ++++++++++--------- 2 files changed, 80 insertions(+), 64 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index eb931fc978a4..b6539a7b45be 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -112,6 +112,43 @@ def _should_skip_gcs_cold_storage(blob) -> bool: storage_class = getattr(blob, "storage_class", None) return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES) + def _discover_iceberg_dirs( + self, + bucket, + prefix: Optional[str], # noqa: UP045 + skip_cold_storage: bool, + ) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006 + """Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs).""" + iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 + cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + match = self._ICEBERG_METADATA_RE.match(blob.name) + if match: + cold_iceberg_dirs.add(match.group(1)) + continue + self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) + + return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs + + def _yield_regular_files( + self, + bucket, + prefix: Optional[str], # noqa: UP045 + skip_cold_storage: bool, + iceberg_dirs: Set[str], # noqa: UP006 + ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 + """Pass 2: stream regular files, skipping Iceberg directory contents.""" + for blob in bucket.list_blobs(prefix=prefix): + if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + continue + if iceberg_dirs and ( + self._ICEBERG_METADATA_RE.match(blob.name) or any(blob.name.startswith(d + "/") for d in iceberg_dirs) + ): + continue + yield blob.name, blob.size + def get_table_names( self, bucket_name: str, @@ -126,38 +163,12 @@ def get_table_names( files without accumulation, keeping memory overhead at O(1) per object. """ bucket = self._client.get_bucket(bucket_name) - iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 - cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(bucket, prefix, skip_cold_storage) - for blob in bucket.list_blobs(prefix=prefix): - is_cold = skip_cold_storage and self._should_skip_gcs_cold_storage(blob) - if is_cold: - logger.debug( - f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})" - ) - match = self._ICEBERG_METADATA_RE.match(blob.name) - if match: - cold_iceberg_dirs.add(match.group(1)) - continue - self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) - - iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs for _, metadata_blob_path, size in iceberg_tables.values(): yield metadata_blob_path, size - if not iceberg_dirs: - for blob in bucket.list_blobs(prefix=prefix): - if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): - continue - yield blob.name, blob.size - else: - for blob in bucket.list_blobs(prefix=prefix): - if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): - continue - if not self._ICEBERG_METADATA_RE.match(blob.name) and not any( - blob.name.startswith(d + "/") for d in iceberg_dirs - ): - yield blob.name, blob.size + yield from self._yield_regular_files(bucket, prefix, skip_cold_storage, iceberg_dirs) def close(self, service_connection): os.environ.pop("GOOGLE_CLOUD_PROJECT", "") diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 3542120337ed..97b0b624d7c3 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -70,6 +70,45 @@ def _should_skip_s3_cold_storage(key: dict) -> bool: "DEEP_ARCHIVE_ACCESS", } + def _discover_iceberg_dirs( + self, + skip_cold_storage: bool, + **kwargs: str, + ) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006 + """Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs).""" + iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 + cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + + for key in list_s3_objects(self._client, **kwargs): + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + match = self._ICEBERG_METADATA_RE.match(key_name) + if match: + cold_iceberg_dirs.add(match.group(1)) + continue + self._update_iceberg_entry(iceberg_tables, key_name, size) + + return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs + + def _yield_regular_files( + self, + skip_cold_storage: bool, + iceberg_dirs: Set[str], # noqa: UP006 + **kwargs: str, + ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 + """Pass 2: stream regular files, skipping Iceberg directory contents.""" + for key in list_s3_objects(self._client, **kwargs): + key_name = key["Key"] + size = key.get("Size") + if skip_cold_storage and self._should_skip_s3_cold_storage(key): + continue + if iceberg_dirs and ( + self._ICEBERG_METADATA_RE.match(key_name) or any(key_name.startswith(d + "/") for d in iceberg_dirs) + ): + continue + yield key_name, size + def get_table_names( self, bucket_name: str, @@ -87,46 +126,12 @@ def get_table_names( if prefix: kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" - iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 - cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(skip_cold_storage, **kwargs) - for key in list_s3_objects(self._client, **kwargs): - key_name = key["Key"] - size = key.get("Size") - is_cold = skip_cold_storage and self._should_skip_s3_cold_storage(key) - if is_cold: - logger.debug( - f"Skipping cold storage object: {key_name} " - f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, " - f"ArchiveStatus: {key.get('ArchiveStatus', '')})" - ) - match = self._ICEBERG_METADATA_RE.match(key_name) - if match: - cold_iceberg_dirs.add(match.group(1)) - continue - self._update_iceberg_entry(iceberg_tables, key_name, size) - - iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs for _, metadata_key, size in iceberg_tables.values(): yield metadata_key, size - if not iceberg_dirs: - for key in list_s3_objects(self._client, **kwargs): - key_name = key["Key"] - size = key.get("Size") - if skip_cold_storage and self._should_skip_s3_cold_storage(key): - continue - yield key_name, size - else: - for key in list_s3_objects(self._client, **kwargs): - key_name = key["Key"] - size = key.get("Size") - if skip_cold_storage and self._should_skip_s3_cold_storage(key): - continue - if not self._ICEBERG_METADATA_RE.match(key_name) and not any( - key_name.startswith(d + "/") for d in iceberg_dirs - ): - yield key_name, size + yield from self._yield_regular_files(skip_cold_storage, iceberg_dirs, **kwargs) def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045 for page in self._client.get_paginator("list_objects_v2").paginate( From 77a9c0294e2e6dc6778c3a5af9009c9173492787 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Thu, 30 Apr 2026 02:30:55 +0530 Subject: [PATCH 07/12] fix(datalake): reduce cognitive complexity and fix union type to pass SonarQube quality gate --- .../metadata/utils/datalake/datalake_utils.py | 96 ++++++++++--------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index ecb6eb3413eb..f887ba63e3f8 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -19,7 +19,7 @@ import random import re import traceback -from typing import Any, Dict, List, Optional, Union, cast # noqa: UP035 +from typing import Any, Dict, List, Optional, cast # noqa: UP035 from metadata.generated.schema.entity.data.table import Column, DataType from metadata.ingestion.source.database.column_helpers import truncate_column_name @@ -246,7 +246,7 @@ def create( @staticmethod def _get_data_frame( - data_frame: Union[List["DataFrame"], "DataFrame"], # noqa: F821, UP006 + data_frame: list[Any] | Any, sample: bool, shuffle: bool, # noqa: F821, RUF100 ): @@ -301,6 +301,36 @@ def get_columns(self): """ return self._get_columns(self.data_frame) + @classmethod + def _parse_column(cls, data_frame: "DataFrame", column: str) -> Optional[Column]: # noqa: F821, UP045 + # use String by default + data_type = DataType.STRING + try: + if hasattr(data_frame[column], "dtypes"): + data_type = cls.fetch_col_types(data_frame, column_name=column) + + parsed_string = { + "dataTypeDisplay": data_type.value, + "dataType": data_type, + "name": truncate_column_name(column), + "displayName": column, + } + if data_type == DataType.ARRAY: + parsed_string["arrayDataType"] = DataType.UNKNOWN + struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100]) + if struct_children: + parsed_string["arrayDataType"] = DataType.STRUCT + parsed_string["children"] = struct_children + + if data_type == DataType.JSON: + parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100]) + + return Column(**parsed_string) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") + return None + @classmethod def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821 """ @@ -311,34 +341,10 @@ def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821 """ cols = [] if hasattr(data_frame, "columns"): - df_columns = list(data_frame.columns) - for column in df_columns: - # use String by default - data_type = DataType.STRING - try: - if hasattr(data_frame[column], "dtypes"): - data_type = cls.fetch_col_types(data_frame, column_name=column) - - parsed_string = { - "dataTypeDisplay": data_type.value, - "dataType": data_type, - "name": truncate_column_name(column), - "displayName": column, - } - if data_type == DataType.ARRAY: - parsed_string["arrayDataType"] = DataType.UNKNOWN - struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100]) - if struct_children: - parsed_string["arrayDataType"] = DataType.STRUCT - parsed_string["children"] = struct_children - - if data_type == DataType.JSON: - parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100]) - - cols.append(Column(**parsed_string)) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") + for column in list(data_frame.columns): + parsed_col = cls._parse_column(data_frame, column) + if parsed_col: + cols.append(parsed_col) return cols @classmethod @@ -413,6 +419,21 @@ def fetch_col_types(cls, data_frame, column_name): logger.debug(traceback.format_exc()) return data_type or DataType.STRING + @classmethod + def _process_unique_json_key(cls, result: Dict, key: str, value: Any) -> None: # noqa: UP006 + if isinstance(value, dict): + nested_json = result.get(key, {}) + # `isinstance(nested_json, dict)` if for a key we first see a non dict value + # but then see a dict value later, we will consider the key to be a dict. + result[key] = cls.unique_json_structure([nested_json if isinstance(nested_json, dict) else {}, value]) + elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value): + merged_struct = cls.unique_json_structure(value) + existing = result.get(key) + existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {} + result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct])) + else: + result[key] = value + @classmethod def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006 """Given a sample of `n` json objects, return a json object that represents the unique @@ -425,20 +446,7 @@ def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006 result = {} for dict_ in dicts: for key, value in dict_.items(): - if isinstance(value, dict): - nested_json = result.get(key, {}) - # `isinstance(nested_json, dict)` if for a key we first see a non dict value - # but then see a dict value later, we will consider the key to be a dict. - result[key] = cls.unique_json_structure( - [nested_json if isinstance(nested_json, dict) else {}, value] - ) - elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value): - merged_struct = cls.unique_json_structure(value) - existing = result.get(key) - existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {} - result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct])) - else: - result[key] = value + cls._process_unique_json_key(result, key, value) return result @classmethod From 166dee73473653a61a711f6baa04abb14d709f60 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Thu, 30 Apr 2026 15:17:57 +0530 Subject: [PATCH 08/12] fix(datalake): remove unnecessary list() wrapping to pass SonarQube quality gate --- ingestion/src/metadata/utils/datalake/datalake_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index f887ba63e3f8..f7dd5be99119 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -341,7 +341,7 @@ def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821 """ cols = [] if hasattr(data_frame, "columns"): - for column in list(data_frame.columns): + for column in data_frame.columns: parsed_col = cls._parse_column(data_frame, column) if parsed_col: cols.append(parsed_col) From 4e6e1c6d054134b119f131595812fe7e4c21b464 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Thu, 30 Apr 2026 15:36:00 +0530 Subject: [PATCH 09/12] fix(datalake): restore cold storage skip logging and remove duplicate tests per review --- .../source/database/datalake/clients/gcs.py | 3 ++ .../source/database/datalake/clients/s3.py | 5 +++ .../source/database/test_iceberg_discovery.py | 38 ------------------- 3 files changed, 8 insertions(+), 38 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index b6539a7b45be..761f7fbf708e 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -124,6 +124,9 @@ def _discover_iceberg_dirs( for blob in bucket.list_blobs(prefix=prefix): if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): + logger.debug( + f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})" + ) match = self._ICEBERG_METADATA_RE.match(blob.name) if match: cold_iceberg_dirs.add(match.group(1)) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index 97b0b624d7c3..d6732c3823ce 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -83,6 +83,11 @@ def _discover_iceberg_dirs( key_name = key["Key"] size = key.get("Size") if skip_cold_storage and self._should_skip_s3_cold_storage(key): + logger.debug( + f"Skipping cold storage object: {key_name} " + f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, " + f"ArchiveStatus: {key.get('ArchiveStatus', '')})" + ) match = self._ICEBERG_METADATA_RE.match(key_name) if match: cold_iceberg_dirs.add(match.group(1)) diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py index 1784eb4eaeaa..e9bf8be30033 100644 --- a/ingestion/tests/unit/source/database/test_iceberg_discovery.py +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -76,22 +76,6 @@ def test_gcs_iceberg_table_detected(self): assert name == "warehouse/orders/metadata/v2.metadata.json" assert size == 600 - def test_gcs_iceberg_yields_one_table_per_directory(self): - blobs = [ - _make_blob("warehouse/orders/metadata/v1.metadata.json", size=500), - _make_blob("warehouse/orders/metadata/v2.metadata.json", size=600), - _make_blob("warehouse/orders/data/00000-0-abc.parquet", size=8192), - _make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216), - ] - client = _make_gcs_client(blobs) - - results = list(client.get_table_names("my-bucket", prefix="warehouse")) - - assert len(results) == 1 - name, size = results[0] - assert name == "warehouse/orders/metadata/v2.metadata.json" - assert size == 600 - def test_gcs_multiple_iceberg_tables(self): blobs = [ _make_blob("warehouse/orders/metadata/v1.metadata.json", size=400), @@ -195,28 +179,6 @@ def test_s3_iceberg_table_detected(self): assert name == "warehouse/orders/metadata/v2.metadata.json" assert size == 600 - def test_s3_iceberg_yields_one_table_per_directory(self): - keys = [ - "warehouse/orders/metadata/v1.metadata.json", - "warehouse/orders/metadata/v2.metadata.json", - "warehouse/orders/data/00000-0-abc.parquet", - ] - client = self._make_s3_client( - keys, - sizes={"warehouse/orders/metadata/v2.metadata.json": 600}, - ) - - with patch( - "metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects", - return_value=self._s3_objects, - ): - results = list(client.get_table_names("my-bucket", prefix="warehouse")) - - assert len(results) == 1 - name, size = results[0] - assert name == "warehouse/orders/metadata/v2.metadata.json" - assert size == 600 - def test_s3_fallback_for_non_iceberg(self): keys = [ "data/orders.csv", From 043f812a13756b1738d604a0aa58d54bd2db9c6d Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 6 May 2026 02:50:18 +0530 Subject: [PATCH 10/12] fix(datalake): replace regex with string ops, fix basedpyright type errors, and resolve override incompatibility --- .../source/database/datalake/clients/base.py | 25 +++++++++++++++---- .../source/database/datalake/clients/gcs.py | 17 +++++++------ .../source/database/datalake/clients/s3.py | 9 ++++--- .../source/database/datalake/metadata.py | 21 +++++++--------- .../metadata/utils/datalake/datalake_utils.py | 19 ++++++++++---- 5 files changed, 57 insertions(+), 34 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py index 86f6c41371e9..e8085f953ee4 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py @@ -13,7 +13,6 @@ Datalake Base Client """ -import re from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Iterable, Optional, Tuple # noqa: UP035 @@ -21,7 +20,23 @@ class DatalakeBaseClient(ABC): """Base DL client implementation""" - _ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$") + _ICEBERG_METADATA_SUFFIX = ".metadata.json" + _ICEBERG_METADATA_SEGMENT = "/metadata/v" + + @staticmethod + def _parse_iceberg_metadata(name: str) -> Optional[Tuple[str, int]]: # noqa: UP006, UP045 + """Parse an Iceberg metadata path, returning (table_dir, version) or None.""" + if not name.endswith(DatalakeBaseClient._ICEBERG_METADATA_SUFFIX): + return None + idx = name.rfind(DatalakeBaseClient._ICEBERG_METADATA_SEGMENT) + if idx < 0: + return None + version_str = name[ + idx + len(DatalakeBaseClient._ICEBERG_METADATA_SEGMENT) : -len(DatalakeBaseClient._ICEBERG_METADATA_SUFFIX) + ] + if not version_str.isdigit(): + return None + return name[:idx], int(version_str) def _update_iceberg_entry( self, @@ -33,10 +48,10 @@ def _update_iceberg_entry( If name matches the Iceberg metadata pattern, update iceberg_tables with the highest-version entry and return True. Otherwise return False. """ - match = self._ICEBERG_METADATA_RE.match(name) - if not match: + parsed = self._parse_iceberg_metadata(name) + if not parsed: return False - table_dir, version = match.group(1), int(match.group(2)) + table_dir, version = parsed existing = iceberg_tables.get(table_dir) if existing is None or version > existing[0]: iceberg_tables[table_dir] = (version, name, size) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index 761f7fbf708e..74d9ae9ec1cf 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -16,7 +16,7 @@ import os from copy import deepcopy from functools import partial -from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 from google.cloud import storage @@ -108,13 +108,13 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str] yield bucket.name @staticmethod - def _should_skip_gcs_cold_storage(blob) -> bool: + def _should_skip_gcs_cold_storage(blob: Any) -> bool: storage_class = getattr(blob, "storage_class", None) return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES) def _discover_iceberg_dirs( self, - bucket, + bucket: Any, prefix: Optional[str], # noqa: UP045 skip_cold_storage: bool, ) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006 @@ -127,9 +127,9 @@ def _discover_iceberg_dirs( logger.debug( f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})" ) - match = self._ICEBERG_METADATA_RE.match(blob.name) - if match: - cold_iceberg_dirs.add(match.group(1)) + parsed = self._parse_iceberg_metadata(blob.name) + if parsed: + cold_iceberg_dirs.add(parsed[0]) continue self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) @@ -137,7 +137,7 @@ def _discover_iceberg_dirs( def _yield_regular_files( self, - bucket, + bucket: Any, prefix: Optional[str], # noqa: UP045 skip_cold_storage: bool, iceberg_dirs: Set[str], # noqa: UP006 @@ -147,7 +147,8 @@ def _yield_regular_files( if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): continue if iceberg_dirs and ( - self._ICEBERG_METADATA_RE.match(blob.name) or any(blob.name.startswith(d + "/") for d in iceberg_dirs) + self._parse_iceberg_metadata(blob.name) is not None + or any(blob.name.startswith(d + "/") for d in iceberg_dirs) ): continue yield blob.name, blob.size diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index d6732c3823ce..dea483a98103 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -88,9 +88,9 @@ def _discover_iceberg_dirs( f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, " f"ArchiveStatus: {key.get('ArchiveStatus', '')})" ) - match = self._ICEBERG_METADATA_RE.match(key_name) - if match: - cold_iceberg_dirs.add(match.group(1)) + parsed = self._parse_iceberg_metadata(key_name) + if parsed: + cold_iceberg_dirs.add(parsed[0]) continue self._update_iceberg_entry(iceberg_tables, key_name, size) @@ -109,7 +109,8 @@ def _yield_regular_files( if skip_cold_storage and self._should_skip_s3_cold_storage(key): continue if iceberg_dirs and ( - self._ICEBERG_METADATA_RE.match(key_name) or any(key_name.startswith(d + "/") for d in iceberg_dirs) + self._parse_iceberg_metadata(key_name) is not None + or any(key_name.startswith(d + "/") for d in iceberg_dirs) ): continue yield key_name, size diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 716b8444d21d..dea2434be15f 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -16,7 +16,7 @@ import json import traceback from hashlib import md5 -from typing import Any, Iterable, Optional, Tuple # noqa: UP035 +from typing import Any, Dict, Iterable, Optional, Tuple # noqa: UP035 from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -59,7 +59,7 @@ OPENMETADATA_TEMPLATE_FILE_NAME, ) from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper -from metadata.readers.dataframe.reader_factory import SupportedTypes +from metadata.readers.dataframe.reader_factory import SupportedTypes # noqa: TC001 from metadata.readers.file.base import ReadException from metadata.readers.file.config_source_factory import get_reader from metadata.utils import fqn @@ -96,6 +96,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self.connection_obj = self.client self.test_connection() self.reader = get_reader(config_source=self.config_source, client=self.client.client) + self._table_info: Dict[str, Tuple[SupportedTypes, Optional[int], str]] = {} # noqa: UP006, UP045 @classmethod def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None): # noqa: UP045 @@ -202,7 +203,7 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab def get_tables_name_and_type( # pylint: disable=too-many-branches self, - ) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int], str]]: # noqa: UP006, UP045 + ) -> Optional[Iterable[Tuple[str, TableType]]]: # noqa: UP006, UP045 """ Handle table and views. @@ -244,24 +245,20 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches if get_iceberg_table_name_from_metadata_path(key_name) is not None else TableType.Regular ) - yield table_name, table_type, file_extension, file_size, key_name + self._table_info[table_name] = (file_extension, file_size, key_name) + yield table_name, table_type def yield_table( self, - table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int], str], # noqa: UP006, UP045 + table_name_and_type: Tuple[str, TableType], # noqa: UP006 ) -> Iterable[Either[CreateTableRequest]]: """ From topology. Prepare a table request and pass it to the sink. Uses first chunk only for schema inference to avoid loading entire file. """ - ( - table_name, - table_type, - table_extension, - file_size, - fetch_key, - ) = table_name_and_type + table_name, table_type = table_name_and_type + table_extension, file_size, fetch_key = self._table_info.pop(table_name, (None, None, table_name)) schema_name = self.context.get().database_schema try: table_constraints = None diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index f7dd5be99119..9446098e9a26 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -17,7 +17,6 @@ import ast import json import random -import re import traceback from typing import Any, Dict, List, Optional, cast # noqa: UP035 @@ -160,7 +159,8 @@ def fetch_dataframe_first_chunk( return None -_ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$") +_ICEBERG_METADATA_SUFFIX = ".metadata.json" +_ICEBERG_METADATA_SEGMENT = "/metadata/v" def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> str | None: @@ -175,8 +175,17 @@ def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> str | None: Returns None if the path does not match the Iceberg metadata pattern. """ - match = _ICEBERG_METADATA_PATH_RE.search(metadata_path) - return match.group(1) if match else None + if not metadata_path.endswith(_ICEBERG_METADATA_SUFFIX): + return None + idx = metadata_path.rfind(_ICEBERG_METADATA_SEGMENT) + if idx < 0: + return None + version_str = metadata_path[idx + len(_ICEBERG_METADATA_SEGMENT) : -len(_ICEBERG_METADATA_SUFFIX)] + if not version_str.isdigit(): + return None + table_dir = metadata_path[:idx] + last_slash = table_dir.rfind("/") + return table_dir[last_slash + 1 :] if last_slash >= 0 else table_dir def get_file_format_type(key_name, metadata_entry=None): @@ -302,7 +311,7 @@ def get_columns(self): return self._get_columns(self.data_frame) @classmethod - def _parse_column(cls, data_frame: "DataFrame", column: str) -> Optional[Column]: # noqa: F821, UP045 + def _parse_column(cls, data_frame: Any, column: str) -> Optional[Column]: # noqa: UP045 # use String by default data_type = DataType.STRING try: From a97d576d494730cab1a85e8f78d4ca6c77a1b2a6 Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 6 May 2026 15:09:01 +0530 Subject: [PATCH 11/12] fix(datalake): address review - single-pass listing, broaden iceberg patterns, fix collision --- .../source/database/datalake/clients/base.py | 31 +++++-- .../source/database/datalake/clients/gcs.py | 63 +++++--------- .../source/database/datalake/clients/s3.py | 73 ++++++---------- .../source/database/datalake/metadata.py | 14 ++-- .../metadata/utils/datalake/datalake_utils.py | 27 +++--- .../source/database/test_iceberg_discovery.py | 83 ++++++++++--------- .../unit/topology/database/test_datalake.py | 3 +- 7 files changed, 138 insertions(+), 156 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py index e8085f953ee4..e6abe8a466ab 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py @@ -21,22 +21,37 @@ class DatalakeBaseClient(ABC): """Base DL client implementation""" _ICEBERG_METADATA_SUFFIX = ".metadata.json" - _ICEBERG_METADATA_SEGMENT = "/metadata/v" + _ICEBERG_METADATA_DIR = "/metadata/" @staticmethod def _parse_iceberg_metadata(name: str) -> Optional[Tuple[str, int]]: # noqa: UP006, UP045 - """Parse an Iceberg metadata path, returning (table_dir, version) or None.""" + """ + Parse an Iceberg metadata path, returning (table_dir, version) or None. + + Supports all standard Iceberg metadata filename formats: + - v{n}.metadata.json (Hadoop catalog) + - v{n}-.metadata.json (Hive catalog) + - {n}-.metadata.json (REST/Nessie catalog) + """ if not name.endswith(DatalakeBaseClient._ICEBERG_METADATA_SUFFIX): return None - idx = name.rfind(DatalakeBaseClient._ICEBERG_METADATA_SEGMENT) - if idx < 0: + metadata_idx = name.rfind(DatalakeBaseClient._ICEBERG_METADATA_DIR) + if metadata_idx < 0: return None - version_str = name[ - idx + len(DatalakeBaseClient._ICEBERG_METADATA_SEGMENT) : -len(DatalakeBaseClient._ICEBERG_METADATA_SUFFIX) + table_dir = name[:metadata_idx] + filename = name[ + metadata_idx + len(DatalakeBaseClient._ICEBERG_METADATA_DIR) : -len( + DatalakeBaseClient._ICEBERG_METADATA_SUFFIX + ) ] - if not version_str.isdigit(): + if not filename: + return None + raw = filename.lstrip("v") + dash_pos = raw.find("-") + version_part = raw[:dash_pos] if dash_pos > 0 else raw + if not version_part.isdigit(): return None - return name[:idx], int(version_str) + return table_dir, int(version_part) def _update_iceberg_entry( self, diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py index 74d9ae9ec1cf..cb6cd24f6e75 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py @@ -112,15 +112,24 @@ def _should_skip_gcs_cold_storage(blob: Any) -> bool: storage_class = getattr(blob, "storage_class", None) return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES) - def _discover_iceberg_dirs( + def get_table_names( self, - bucket: Any, + bucket_name: str, prefix: Optional[str], # noqa: UP045 - skip_cold_storage: bool, - ) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006 - """Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs).""" + skip_cold_storage: bool = False, + ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 + """ + Lists tables in a GCS bucket using a single-pass approach. + + Iterates all blobs once, collecting Iceberg metadata entries and + buffering regular files. After the pass, yields Iceberg tables + (highest version per directory) followed by regular files that + do not belong to any Iceberg directory. + """ + bucket = self._client.get_bucket(bucket_name) iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006, UP045 for blob in bucket.list_blobs(prefix=prefix): if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): @@ -131,48 +140,18 @@ def _discover_iceberg_dirs( if parsed: cold_iceberg_dirs.add(parsed[0]) continue - self._update_iceberg_entry(iceberg_tables, blob.name, blob.size) - - return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs - - def _yield_regular_files( - self, - bucket: Any, - prefix: Optional[str], # noqa: UP045 - skip_cold_storage: bool, - iceberg_dirs: Set[str], # noqa: UP006 - ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 - """Pass 2: stream regular files, skipping Iceberg directory contents.""" - for blob in bucket.list_blobs(prefix=prefix): - if skip_cold_storage and self._should_skip_gcs_cold_storage(blob): - continue - if iceberg_dirs and ( - self._parse_iceberg_metadata(blob.name) is not None - or any(blob.name.startswith(d + "/") for d in iceberg_dirs) - ): - continue - yield blob.name, blob.size - - def get_table_names( - self, - bucket_name: str, - prefix: Optional[str], # noqa: UP045 - skip_cold_storage: bool = False, - ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 - """ - Lists tables in a GCS bucket using a two-pass approach. + if not self._update_iceberg_entry(iceberg_tables, blob.name, blob.size): + regular_files.append((blob.name, blob.size)) - Pass 1 collects only the Iceberg table dict (memory proportional to the - number of Iceberg tables, which is always small). Pass 2 streams regular - files without accumulation, keeping memory overhead at O(1) per object. - """ - bucket = self._client.get_bucket(bucket_name) - iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(bucket, prefix, skip_cold_storage) + iceberg_dirs: Set[str] = set(iceberg_tables.keys()) | cold_iceberg_dirs # noqa: UP006 for _, metadata_blob_path, size in iceberg_tables.values(): yield metadata_blob_path, size - yield from self._yield_regular_files(bucket, prefix, skip_cold_storage, iceberg_dirs) + for name, size in regular_files: + if iceberg_dirs and any(name.startswith(d + "/") for d in iceberg_dirs): + continue + yield name, size def close(self, service_connection): os.environ.pop("GOOGLE_CLOUD_PROJECT", "") diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py index dea483a98103..e91569ef5208 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py @@ -14,7 +14,7 @@ """ from functools import partial -from typing import Callable, Dict, Iterable, Optional, Set, Tuple # noqa: UP035 +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035 from metadata.clients.aws_client import AWSClient from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import ( @@ -70,14 +70,27 @@ def _should_skip_s3_cold_storage(key: dict) -> bool: "DEEP_ARCHIVE_ACCESS", } - def _discover_iceberg_dirs( + def get_table_names( self, - skip_cold_storage: bool, - **kwargs: str, - ) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006 - """Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs).""" + bucket_name: str, + prefix: Optional[str], # noqa: UP045 + skip_cold_storage: bool = False, + ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 + """ + Lists tables in an S3 bucket using a single-pass approach. + + Iterates all objects once, collecting Iceberg metadata entries and + buffering regular files. After the pass, yields Iceberg tables + (highest version per directory) followed by regular files that + do not belong to any Iceberg directory. + """ + kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006 + if prefix: + kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" + iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006 cold_iceberg_dirs: Set[str] = set() # noqa: UP006 + regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006, UP045 for key in list_s3_objects(self._client, **kwargs): key_name = key["Key"] @@ -92,52 +105,18 @@ def _discover_iceberg_dirs( if parsed: cold_iceberg_dirs.add(parsed[0]) continue - self._update_iceberg_entry(iceberg_tables, key_name, size) - - return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs - - def _yield_regular_files( - self, - skip_cold_storage: bool, - iceberg_dirs: Set[str], # noqa: UP006 - **kwargs: str, - ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 - """Pass 2: stream regular files, skipping Iceberg directory contents.""" - for key in list_s3_objects(self._client, **kwargs): - key_name = key["Key"] - size = key.get("Size") - if skip_cold_storage and self._should_skip_s3_cold_storage(key): - continue - if iceberg_dirs and ( - self._parse_iceberg_metadata(key_name) is not None - or any(key_name.startswith(d + "/") for d in iceberg_dirs) - ): - continue - yield key_name, size - - def get_table_names( - self, - bucket_name: str, - prefix: Optional[str], # noqa: UP045 - skip_cold_storage: bool = False, - ) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045 - """ - Lists tables in an S3 bucket using a two-pass approach. + if not self._update_iceberg_entry(iceberg_tables, key_name, size): + regular_files.append((key_name, size)) - Pass 1 collects only the Iceberg table dict (memory proportional to the - number of Iceberg tables, which is always small). Pass 2 streams regular - files without accumulation, keeping memory overhead at O(1) per object. - """ - kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006 - if prefix: - kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/" - - iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(skip_cold_storage, **kwargs) + iceberg_dirs: Set[str] = set(iceberg_tables.keys()) | cold_iceberg_dirs # noqa: UP006 for _, metadata_key, size in iceberg_tables.values(): yield metadata_key, size - yield from self._yield_regular_files(skip_cold_storage, iceberg_dirs, **kwargs) + for name, size in regular_files: + if iceberg_dirs and any(name.startswith(d + "/") for d in iceberg_dirs): + continue + yield name, size def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045 for page in self._client.get_paginator("list_objects_v2").paginate( diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index dea2434be15f..fae1abfc9a9a 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -96,7 +96,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self.connection_obj = self.client self.test_connection() self.reader = get_reader(config_source=self.config_source, client=self.client.client) - self._table_info: Dict[str, Tuple[SupportedTypes, Optional[int], str]] = {} # noqa: UP006, UP045 + self._table_info: Dict[str, Tuple[SupportedTypes, Optional[int]]] = {} # noqa: UP006, UP045 @classmethod def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None): # noqa: UP045 @@ -236,7 +236,7 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches logger.info(f"Processing table: {table_name}") file_extension = get_file_format_type(key_name=key_name, metadata_entry=metadata_entry) - if table_name.endswith("/") or not file_extension: + if key_name.endswith("/") or not file_extension: logger.debug(f"Object filtered due to unsupported file type: {key_name}") continue @@ -245,8 +245,8 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches if get_iceberg_table_name_from_metadata_path(key_name) is not None else TableType.Regular ) - self._table_info[table_name] = (file_extension, file_size, key_name) - yield table_name, table_type + self._table_info[key_name] = (file_extension, file_size) + yield key_name, table_type def yield_table( self, @@ -257,8 +257,10 @@ def yield_table( Prepare a table request and pass it to the sink. Uses first chunk only for schema inference to avoid loading entire file. """ - table_name, table_type = table_name_and_type - table_extension, file_size, fetch_key = self._table_info.pop(table_name, (None, None, table_name)) + key_name, table_type = table_name_and_type + table_extension, file_size = self._table_info.pop(key_name, (None, None)) + fetch_key = key_name + table_name = self.standardize_table_name(self.context.get().database_schema, key_name) schema_name = self.context.get().database_schema try: table_constraints = None diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index 9446098e9a26..8d02a2931446 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -160,30 +160,35 @@ def fetch_dataframe_first_chunk( _ICEBERG_METADATA_SUFFIX = ".metadata.json" -_ICEBERG_METADATA_SEGMENT = "/metadata/v" +_ICEBERG_METADATA_DIR = "/metadata/" def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> str | None: """ Extracts the Iceberg table directory name from a metadata file path. - Examples: - "warehouse/orders/metadata/v2.metadata.json" -> "orders" - "my_prefix/sales/metadata/v1.metadata.json" -> "sales" - "simple/metadata/v3.metadata.json" -> "simple" - "data/orders.json" -> None + Supports all standard Iceberg metadata filename formats: + "warehouse/orders/metadata/v2.metadata.json" -> "orders" + "warehouse/orders/metadata/v1-abc123.metadata.json" -> "orders" + "warehouse/orders/metadata/00015-8a14161c-65ad.metadata.json" -> "orders" + "data/orders.json" -> None Returns None if the path does not match the Iceberg metadata pattern. """ if not metadata_path.endswith(_ICEBERG_METADATA_SUFFIX): return None - idx = metadata_path.rfind(_ICEBERG_METADATA_SEGMENT) - if idx < 0: + metadata_idx = metadata_path.rfind(_ICEBERG_METADATA_DIR) + if metadata_idx < 0: return None - version_str = metadata_path[idx + len(_ICEBERG_METADATA_SEGMENT) : -len(_ICEBERG_METADATA_SUFFIX)] - if not version_str.isdigit(): + filename = metadata_path[metadata_idx + len(_ICEBERG_METADATA_DIR) : -len(_ICEBERG_METADATA_SUFFIX)] + if not filename: return None - table_dir = metadata_path[:idx] + raw = filename.lstrip("v") + dash_pos = raw.find("-") + version_part = raw[:dash_pos] if dash_pos > 0 else raw + if not version_part.isdigit(): + return None + table_dir = metadata_path[:metadata_idx] last_slash = table_dir.rfind("/") return table_dir[last_slash + 1 :] if last_slash >= 0 else table_dir diff --git a/ingestion/tests/unit/source/database/test_iceberg_discovery.py b/ingestion/tests/unit/source/database/test_iceberg_discovery.py index e9bf8be30033..453b613cfe60 100644 --- a/ingestion/tests/unit/source/database/test_iceberg_discovery.py +++ b/ingestion/tests/unit/source/database/test_iceberg_discovery.py @@ -143,6 +143,21 @@ def test_gcs_iceberg_version_comparison_v10(self): assert name == "warehouse/orders/metadata/v10.metadata.json" assert size == 600 + def test_gcs_iceberg_uuid_metadata_filenames(self): + """UUID-based metadata filenames (REST/Nessie catalog) are detected.""" + blobs = [ + _make_blob("warehouse/orders/metadata/00015-8a14161c-65ad.metadata.json", size=700), + _make_blob("warehouse/orders/data/00000.parquet", size=8192), + ] + client = _make_gcs_client(blobs) + + results = list(client.get_table_names("my-bucket", prefix="warehouse")) + + assert len(results) == 1 + name, size = results[0] + assert name == "warehouse/orders/metadata/00015-8a14161c-65ad.metadata.json" + assert size == 700 + class TestS3IcebergDiscovery: def _make_s3_client(self, keys: list, sizes: dict | None = None) -> DatalakeS3Client: @@ -234,6 +249,22 @@ def test_iceberg_table_name_extracted_correctly(self): assert get_iceberg_table_name_from_metadata_path("my_prefix/sales/metadata/v1.metadata.json") == "sales" assert get_iceberg_table_name_from_metadata_path("simple/metadata/v3.metadata.json") == "simple" + def test_uuid_based_metadata_filenames(self): + from metadata.utils.datalake.datalake_utils import ( + get_iceberg_table_name_from_metadata_path, + ) + + assert ( + get_iceberg_table_name_from_metadata_path("warehouse/orders/metadata/v1-abc123-def456.metadata.json") + == "orders" + ) + assert ( + get_iceberg_table_name_from_metadata_path( + "warehouse/orders/metadata/00015-8a14161c-65ad-45fc-b665-ec16dcbf647e.metadata.json" + ) + == "orders" + ) + def test_non_iceberg_path_returns_none(self): from metadata.utils.datalake.datalake_utils import ( get_iceberg_table_name_from_metadata_path, @@ -280,52 +311,33 @@ class TestSlice4FetchKeyCorrectness: def test_yield_table_uses_metadata_path_not_display_name(self): """ - The 5-tuple yielded by get_tables_name_and_type() must carry - key_name (original blob path) separately from table_name (display name). - - For an Iceberg table: - table_name = "orders" (display, from standardize_table_name) - key_name = "warehouse/orders/metadata/v2.metadata.json" (fetch path) - - DatalakeTableSchemaWrapper must be constructed with key=key_name, - NOT key=table_name. + The key_name passed through the topology is the full blob path. + _table_info stores (file_extension, file_size) keyed by key_name. + yield_table must use key_name for DatalakeTableSchemaWrapper, + not the standardized display name. """ - from metadata.generated.schema.entity.data.table import TableType from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes - display_name = "orders" original_key = "warehouse/orders/metadata/v2.metadata.json" file_extension = SupportedTypes.JSON file_size = 1024 - tuple_5 = ( - display_name, - TableType.Iceberg, - file_extension, - file_size, - original_key, - ) - table_name, _table_type, table_extension, t_file_size, fetch_key = tuple_5 - wrapper = DatalakeTableSchemaWrapper( - key=fetch_key, + key=original_key, bucket_name="my-bucket", - file_extension=table_extension, - file_size=t_file_size, + file_extension=file_extension, + file_size=file_size, ) - assert wrapper.key == original_key, f"fetch key should be original blob path, got {wrapper.key!r}" - assert wrapper.key != display_name, f"fetch key must NOT be the display name '{display_name}'" - assert table_name == display_name + assert wrapper.key == original_key + assert wrapper.key != "orders" def test_non_iceberg_fetch_key_equals_table_name(self): """ For non-Iceberg tables, key_name == table_name (standardize_table_name - returns the path unchanged), so the 5-tuple element is redundant but - harmless. This test confirms the invariant holds. + returns the path unchanged), so the fetch key is the same as the name. """ - from metadata.generated.schema.entity.data.table import TableType from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.utils.datalake.datalake_utils import ( @@ -333,25 +345,14 @@ def test_non_iceberg_fetch_key_equals_table_name(self): ) key_name = "data/orders.parquet" - table_name = key_name # standardize_table_name returns unchanged for non-Iceberg assert get_iceberg_table_name_from_metadata_path(key_name) is None - tuple_5 = ( - table_name, - TableType.Regular, - SupportedTypes.PARQUET, - 2048, - key_name, - ) - _, _, _, _, fetch_key = tuple_5 - wrapper = DatalakeTableSchemaWrapper( - key=fetch_key, + key=key_name, bucket_name="my-bucket", file_extension=SupportedTypes.PARQUET, file_size=2048, ) assert wrapper.key == "data/orders.parquet" - assert wrapper.key == table_name diff --git a/ingestion/tests/unit/topology/database/test_datalake.py b/ingestion/tests/unit/topology/database/test_datalake.py index f3376baaf2fc..46b50e199c83 100644 --- a/ingestion/tests/unit/topology/database/test_datalake.py +++ b/ingestion/tests/unit/topology/database/test_datalake.py @@ -726,7 +726,8 @@ def _yield_table_request(self, table_name): return_value="local_datalake.default.my_bucket", ), ): - results = list(self.source.yield_table((table_name, TableType.Regular, None, None))) + self.source._table_info[table_name] = (None, None) + results = list(self.source.yield_table((table_name, TableType.Regular))) rights = [r.right for r in results if r.right is not None] return rights[0] if rights else None From 9e675a957b919bd9d00f9cc144a30fbd4c1bd26d Mon Sep 17 00:00:00 2001 From: Mohit Jeswani <2022.mohit.jeswani@ves.ac.in> Date: Wed, 6 May 2026 21:47:11 +0530 Subject: [PATCH 12/12] fix(datalake): reorder schema_name access to match basedpyright baseline --- .../src/metadata/ingestion/source/database/datalake/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index fae1abfc9a9a..43dabd582ee1 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -260,8 +260,8 @@ def yield_table( key_name, table_type = table_name_and_type table_extension, file_size = self._table_info.pop(key_name, (None, None)) fetch_key = key_name - table_name = self.standardize_table_name(self.context.get().database_schema, key_name) schema_name = self.context.get().database_schema + table_name = self.standardize_table_name(schema_name, key_name) try: table_constraints = None data_frame, raw_data = fetch_dataframe_first_chunk(