Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
134c1eb
feat(datalake): add GCS/S3 Iceberg table ingestion support (#22644)
mohitjeswani01 Apr 25, 2026
b7b8904
fix(datalake): address gitar-bot review on Iceberg ingestion (#22644)
mohitjeswani01 Apr 25, 2026
3604a30
Merge branch 'main' into feature/22644-iceberg-gcp-support
mohitjeswani01 Apr 28, 2026
8082b4e
fix(datalake): address Copilot + gitar-bot findings on Iceberg ingestion
mohitjeswani01 Apr 28, 2026
b85aacc
fix(datalake): hybrid two-pass listing, ruff checkstyle, and Copilot …
mohitjeswani01 Apr 28, 2026
caef18a
fix(datalake): cold-storage Iceberg dir detection and tighten JSON cl…
mohitjeswani01 Apr 28, 2026
6f83140
fix(datalake): reduce SonarQube cognitive complexity to pass quality …
mohitjeswani01 Apr 29, 2026
be1ea8f
Merge branch 'main' into feature/22644-iceberg-gcp-support
mohitjeswani01 Apr 29, 2026
77a9c02
fix(datalake): reduce cognitive complexity and fix union type to pass…
mohitjeswani01 Apr 29, 2026
166dee7
fix(datalake): remove unnecessary list() wrapping to pass SonarQube q…
mohitjeswani01 Apr 30, 2026
949389a
Merge branch 'main' into feature/22644-iceberg-gcp-support
mohitjeswani01 Apr 30, 2026
4e6e1c6
fix(datalake): restore cold storage skip logging and remove duplicate…
mohitjeswani01 Apr 30, 2026
f9497bd
Merge branch 'main' into feature/22644-iceberg-gcp-support
mohitjeswani01 May 1, 2026
043f812
fix(datalake): replace regex with string ops, fix basedpyright type e…
mohitjeswani01 May 5, 2026
a97d576
fix(datalake): address review - single-pass listing, broaden iceberg …
mohitjeswani01 May 6, 2026
9e675a9
fix(datalake): reorder schema_name access to match basedpyright baseline
mohitjeswani01 May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,36 @@
Datalake Base Client
"""

import re
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, Optional, Tuple # noqa: UP035
from typing import Any, Callable, Dict, Iterable, Optional, Tuple # noqa: UP035


class DatalakeBaseClient(ABC):
"""Base DL client implementation"""

_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")

Comment on lines +27 to +55
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumption that the iceberg files will always be in v{n}.metadata.json where n is a number is not right.

it could be v{n}-<UUID>.metadata.json or just like this metadata/00015-8a14161c-65ad-45fc-b665-ec16dcbf647e.metadata.json

in such cases the code would reject them as a iceberg table

def _update_iceberg_entry(
self,
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
name: str,
size: Optional[int], # noqa: UP045
) -> bool:
"""
If name matches the Iceberg metadata pattern, update iceberg_tables with
the highest-version entry and return True. Otherwise return False.
"""
match = self._ICEBERG_METADATA_RE.match(name)
if not match:
return False
table_dir, version = match.group(1), int(match.group(2))
existing = iceberg_tables.get(table_dir)
if existing is None or version > existing[0]:
iceberg_tables[table_dir] = (version, name, size)
return True


def __init__(self, client: Any, session: Any = None, **kwargs):
self._client = client
self._session = session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import os
from copy import deepcopy
from functools import partial
from typing import Callable, Iterable, List, Optional, Set, Tuple # noqa: UP035
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035

from google.cloud import storage

Expand Down Expand Up @@ -107,21 +107,47 @@
for bucket in self._client.list_buckets():
yield bucket.name

@staticmethod
def _should_skip_gcs_cold_storage(blob) -> bool:

Check warning on line 111 in ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type annotation is missing for parameter "blob" (reportMissingParameterType)
storage_class = getattr(blob, "storage_class", None)
return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES)

def get_table_names(
self,
bucket_name: str,
prefix: Optional[str], # noqa: UP045
skip_cold_storage: bool = False,
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
bucket = self._client.get_bucket(bucket_name)
"""
Lists tables in a GCS bucket using a two-pass approach.

for key in bucket.list_blobs(prefix=prefix):
if skip_cold_storage:
storage_class = getattr(key, "storage_class", None)
if storage_class and storage_class in GCS_COLD_STORAGE_CLASSES:
logger.debug(f"Skipping cold storage object: {key.name} (storage_class: {storage_class})")
continue
yield key.name, key.size
Pass 1 collects only the Iceberg table dict (memory proportional to the
number of Iceberg tables, which is always small). Pass 2 streams regular
files without accumulation, keeping memory overhead at O(1) per object.
"""
bucket = self._client.get_bucket(bucket_name)
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006

for blob in bucket.list_blobs(prefix=prefix):
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
logger.debug(
f"Skipping cold storage object: {blob.name} "
f"(storage_class: {getattr(blob, 'storage_class', None)})"
)
continue
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)

iceberg_dirs = set(iceberg_tables.keys())
for _, metadata_blob_path, size in iceberg_tables.values():
yield metadata_blob_path, size

for blob in bucket.list_blobs(prefix=prefix):
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
continue
if not self._ICEBERG_METADATA_RE.match(blob.name) and not any(
blob.name.startswith(d + "/") for d in iceberg_dirs
):
yield blob.name, blob.size

def close(self, service_connection):
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

from functools import partial
from typing import Callable, Iterable, Optional, Set, Tuple # noqa: UP035
from typing import Callable, Dict, Iterable, Optional, Set, Tuple # noqa: UP035

from metadata.clients.aws_client import AWSClient
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
Expand Down Expand Up @@ -61,31 +61,59 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str]
for bucket in self._client.list_buckets()["Buckets"]:
yield bucket["Name"]

@staticmethod
def _should_skip_s3_cold_storage(key: dict) -> bool:
storage_class = key.get("StorageClass", "STANDARD")
archive_status = key.get("ArchiveStatus", "")
return storage_class in S3_COLD_STORAGE_CLASSES or archive_status in {
"ARCHIVE_ACCESS",
"DEEP_ARCHIVE_ACCESS",
}

def get_table_names(
self,
bucket_name: str,
prefix: Optional[str], # noqa: UP045
skip_cold_storage: bool = False,
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
kwargs = {"Bucket": bucket_name}

"""
Lists tables in an S3 bucket using a two-pass approach.

Pass 1 collects only the Iceberg table dict (memory proportional to the
number of Iceberg tables, which is always small). Pass 2 streams regular
files without accumulation, keeping memory overhead at O(1) per object.
"""
kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006
if prefix:
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"

iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006

for key in list_s3_objects(self._client, **kwargs):
key_name = key["Key"]
size = key.get("Size")
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
logger.debug(
f"Skipping cold storage object: {key_name} "
f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, "
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
)
continue
self._update_iceberg_entry(iceberg_tables, key_name, size)

Comment thread
mohitjeswani01 marked this conversation as resolved.
iceberg_dirs = set(iceberg_tables.keys())
for _, metadata_key, size in iceberg_tables.values():
yield metadata_key, size

for key in list_s3_objects(self._client, **kwargs):
if skip_cold_storage:
storage_class = key.get("StorageClass", "STANDARD")
archive_status = key.get("ArchiveStatus", "")
if storage_class in S3_COLD_STORAGE_CLASSES or archive_status in {
"ARCHIVE_ACCESS",
"DEEP_ARCHIVE_ACCESS",
}:
logger.debug(
f"Skipping cold storage object: {key['Key']} "
f"(StorageClass: {storage_class}, ArchiveStatus: {archive_status})"
)
continue
yield key["Key"], key.get("Size")
key_name = key["Key"]
size = key.get("Size")
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
continue
if not self._ICEBERG_METADATA_RE.match(key_name) and not any(
key_name.startswith(d + "/") for d in iceberg_dirs
):
yield key_name, size
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated

def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045
for page in self._client.get_paginator("list_objects_v2").paginate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
DataFrameColumnParser,
fetch_dataframe_first_chunk,
get_file_format_type,
get_iceberg_table_name_from_metadata_path,
)
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -201,7 +202,7 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab

def get_tables_name_and_type( # pylint: disable=too-many-branches
self,
) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int]]]: # noqa: UP006, UP045
) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int], str]]: # noqa: UP006, UP045
"""
Comment thread
mohitjeswani01 marked this conversation as resolved.
Handle table and views.

Expand Down Expand Up @@ -238,26 +239,37 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches
logger.debug(f"Object filtered due to unsupported file type: {key_name}")
continue

yield table_name, TableType.Regular, file_extension, file_size
table_type = (
TableType.Iceberg
if get_iceberg_table_name_from_metadata_path(key_name) is not None
else TableType.Regular
)
yield table_name, table_type, file_extension, file_size, key_name
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated

def yield_table(
self,
table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int]], # noqa: UP006, UP045
table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int], str], # noqa: UP006, UP045
) -> Iterable[Either[CreateTableRequest]]:
Comment thread
mohitjeswani01 marked this conversation as resolved.
"""
From topology.
Prepare a table request and pass it to the sink.
Uses first chunk only for schema inference to avoid loading entire file.
"""
table_name, table_type, table_extension, file_size = table_name_and_type
(
table_name,
table_type,
table_extension,
file_size,
fetch_key,
) = table_name_and_type
schema_name = self.context.get().database_schema
try:
table_constraints = None
data_frame, raw_data = fetch_dataframe_first_chunk(
config_source=self.config_source,
client=self.client.client,
file_fqn=DatalakeTableSchemaWrapper(
key=table_name,
key=fetch_key,
bucket_name=schema_name,
file_extension=table_extension,
file_size=file_size,
Expand Down Expand Up @@ -326,7 +338,8 @@ def standardize_table_name(
schema: str,
table: str, # pylint: disable=unused-argument
) -> str:
return table
iceberg_name = get_iceberg_table_name_from_metadata_path(table)
return iceberg_name if iceberg_name is not None else table
Comment thread
mohitjeswani01 marked this conversation as resolved.

def filter_dl_table(self, table_name: str):
"""Filters Datalake Tables based on filterPattern"""
Expand Down
27 changes: 25 additions & 2 deletions ingestion/src/metadata/readers/dataframe/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,20 @@ def _read_json_object(

content = content.decode(UTF_8, errors="ignore") if isinstance(content, bytes) else content
data = json.loads(content)
raw_data = content if isinstance(data, dict) and data.get("$schema") else None
raw_data = (
content
if isinstance(data, dict)
and (
data.get("$schema") is not None # JSON Schema files
or data.get("format-version")
is not None # Apache Iceberg table metadata
or ( # Delta Lake / Iceberg schema structure
isinstance(data.get("schema"), dict)
and isinstance(data.get("schema", {}).get("fields"), list)
)
)
Comment thread
mohitjeswani01 marked this conversation as resolved.
else None
Comment thread
mohitjeswani01 marked this conversation as resolved.
)
Comment thread
mohitjeswani01 marked this conversation as resolved.
data = [data] if isinstance(data, dict) else data

def chunk_generator():
Expand All @@ -140,7 +153,17 @@ def _is_json_lines(file_obj) -> bool:
return True
try:
obj = json.loads(first_line)
return isinstance(obj, dict) and not obj.get("$schema")
if not isinstance(obj, dict):
return False
if obj.get("$schema") is not None:
return False
Comment thread
mohitjeswani01 marked this conversation as resolved.
if obj.get("format-version") is not None:
return False
if isinstance(obj.get("schema"), dict) and isinstance(
obj.get("schema", {}).get("fields"), list
):
return False
return True
except json.JSONDecodeError:
return False

Expand Down
20 changes: 20 additions & 0 deletions ingestion/src/metadata/utils/datalake/datalake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ast
import json
import random
import re
import traceback
from typing import Any, Dict, List, Optional, Union, cast # noqa: UP035

Expand Down Expand Up @@ -149,6 +150,25 @@ def fetch_dataframe_first_chunk(
return None


_ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$")


def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> Optional[str]:
"""
Extracts the Iceberg table directory name from a metadata file path.

Examples:
"warehouse/orders/metadata/v2.metadata.json" -> "orders"
"my_prefix/sales/metadata/v1.metadata.json" -> "sales"
"simple/metadata/v3.metadata.json" -> "simple"
"data/orders.json" -> None

Returns None if the path does not match the Iceberg metadata pattern.
"""
match = _ICEBERG_METADATA_PATH_RE.search(metadata_path)
return match.group(1) if match else None


def get_file_format_type(key_name, metadata_entry=None):
for supported_types in SupportedTypes:
if key_name.lower().endswith(supported_types.value.lower()):
Expand Down
Loading
Loading