Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e37ffd6
Storage Service Connect , path specification and manifest file deprec…
harshach Apr 15, 2026
a8978c3
Storage Service Connect , path specification and manifest file deprec…
harshach Apr 15, 2026
889d2bb
Update generated TypeScript types
github-actions[bot] Apr 15, 2026
951b70e
Storage Service Connect , path specification and manifest file deprec…
harshach Apr 15, 2026
2548b53
Merge remote-tracking branch 'origin/storage_wildcard' into storage_w…
harshach Apr 15, 2026
385b068
Storage Service Connect , path specification and manifest file deprec…
harshach Apr 15, 2026
960482a
Update generated TypeScript types
github-actions[bot] Apr 15, 2026
4922a2d
Merge branch 'main' into storage_wildcard
harshach Apr 16, 2026
5131349
Address comments
harshach Apr 16, 2026
b67a4bf
Address comments
harshach Apr 16, 2026
13c90e1
Add tests
harshach Apr 16, 2026
1d0f70a
Add tests
harshach Apr 16, 2026
6ec8edf
Update generated TypeScript types
github-actions[bot] Apr 16, 2026
c6fd6a3
Add UI support for manifest config
harshach Apr 16, 2026
276fb5f
Merge remote-tracking branch 'origin/storage_wildcard' into storage_w…
harshach Apr 16, 2026
3c66792
Improve manifest support for storage services
harshach Apr 16, 2026
953686f
Merge branch 'main' into storage_wildcard
harshach Apr 16, 2026
8e93095
Address comments
harshach Apr 16, 2026
26228c8
Fix #24823: containerFilterPattern and include/exclude Not work
harshach Apr 16, 2026
cfe4acb
Merge remote-tracking branch 'origin/storage_wildcard' into storage_w…
harshach Apr 16, 2026
fbb1723
Update generated TypeScript types
github-actions[bot] Apr 16, 2026
16d93cf
Address commments
harshach Apr 17, 2026
db85dab
Merge remote-tracking branch 'origin/main' into storage_wildcard
harshach Apr 18, 2026
a3486c9
Address commments
harshach Apr 18, 2026
bb49d68
Merge remote-tracking branch 'origin/storage_wildcard' into storage_w…
harshach Apr 18, 2026
1f658de
fix partiion column type conversion error
ulixius9 Apr 20, 2026
1b5fbe5
merge main resolve conflict
ulixius9 Apr 20, 2026
180ab49
Address comments
harshach Apr 21, 2026
11e54ab
Update generated TypeScript types
github-actions[bot] Apr 21, 2026
36ce395
Address comments on manifest widget
harshach Apr 21, 2026
deb79d1
Merge remote-tracking branch 'origin/storage_wildcard' into storage_w…
harshach Apr 21, 2026
128cf6b
merge main resolve conflict
ulixius9 Apr 21, 2026
324263a
address comments
ulixius9 Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ingestion/src/metadata/ingestion/models/custom_pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,27 @@ def handle_secret(value: Any, handler, info: SerializationInfo) -> str:
CustomSecretStr = Annotated[_CustomSecretStr, WrapSerializer(handle_secret)]


def format_validation_error(exc: Exception) -> str:
"""Render a Pydantic ``ValidationError`` (v2) as a compact one-liner
suitable for log messages and workflow status warnings.

Each field error becomes ``field.path: message``, joined by ``; ``.
Falls back to ``str(exc)`` for non-Pydantic exceptions so callers
don't need to type-check.

Example output::

entries.0.dataPath: Field required; entries.1.structureFormat: Input should be a valid string
"""
errors = getattr(exc, "errors", None)
if callable(errors):
return "; ".join(
f"{'.'.join(str(p) for p in err.get('loc', ()))}: {err.get('msg', '')}"
for err in errors()
)
return str(exc)


def ignore_type_decoder(type_: Any) -> None:
"""Given a type_, add a custom decoder to the BaseModel
to ignore any decoding errors for that type_."""
Expand Down
200 changes: 134 additions & 66 deletions ingestion/src/metadata/ingestion/source/storage/s3/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.custom_pydantic import format_validation_error
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.storage.s3.models import (
Expand All @@ -65,6 +66,7 @@
from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger
from metadata.utils.s3_utils import list_s3_objects
from metadata.utils.storage_utils import COLD_STORAGE_CLASSES, is_excluded_artifact
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_label

logger = ingestion_logger()
Expand Down Expand Up @@ -130,38 +132,27 @@ def get_containers(self) -> Iterable[S3ContainerDetails]:
parent_entity: EntityReference = EntityReference(
id=self._bucket_cache[bucket_name].id.root, type="container"
)
if self.global_manifest:
manifest_entries_for_current_bucket = (
self._manifest_entries_to_metadata_entries_by_container(
container_name=bucket_name, manifest=self.global_manifest
)
manifest_entries = self._resolve_manifest_entries(bucket_name)
if manifest_entries:
expanded_entries = self.expand_entries(
bucket_name=bucket_name, entries=manifest_entries
)
# Apply containerFilterPattern + default Spark-artifact
# excludes to the concrete paths *before* we attempt to
# list sample files / infer schema. Prevents Issue #24823
# where entries like ``_SUCCESS`` or user-excluded paths
# would still be processed.
filtered_entries = self.filter_manifest_entries(
bucket_name=bucket_name, entries=expanded_entries
)
# Check if we have entries in the manifest file belonging to this bucket
if manifest_entries_for_current_bucket:
# ingest all the relevant valid paths from it
yield from self._generate_structured_containers(
bucket_response=bucket_response,
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=manifest_entries_for_current_bucket,
parent=parent_entity,
)
# nothing else do to for the current bucket, skipping to the next
continue
# If no global file, or no valid entries in the manifest, check for bucket level metadata file
metadata_config = self._load_metadata_file(bucket_name=bucket_name)
if metadata_config:
yield from self._generate_structured_containers(
bucket_response=bucket_response,
entries=metadata_config.entries,
entries=filtered_entries,
parent=parent_entity,
)
yield from self._generate_unstructured_containers(
bucket_response=bucket_response,
entries=metadata_config.entries,
entries=filtered_entries,
parent=parent_entity,
)

Expand Down Expand Up @@ -358,8 +349,7 @@ def _generate_structured_containers_by_depth(
if entry
and entry.get("Key")
and len(entry.get("Key").split("/")) > total_depth
and "/_delta_log/" not in entry.get("Key")
and not entry.get("Key").endswith("/_SUCCESS")
and not is_excluded_artifact(entry.get("Key"))
}
for key in candidate_keys:
metadata_entry_copy = deepcopy(metadata_entry)
Expand Down Expand Up @@ -563,6 +553,24 @@ def _generate_unstructured_containers(
),
)

def list_keys(self, bucket_name: str, prefix: str) -> Iterable[Tuple[str, int]]:
"""List (key, size_bytes) for all files under prefix.

Filters out directories, cold storage objects, and Spark/Delta
sentinel artifacts (``_SUCCESS``, ``*.crc``, ``_committed_*``,
etc.) so they never participate in glob matching or grouping.
"""
for obj in list_s3_objects(self.s3_client, Bucket=bucket_name, Prefix=prefix):
key = obj.get("Key", "")
if not key or key.endswith("/"):
continue
storage_class = obj.get("StorageClass", "STANDARD")
if storage_class in COLD_STORAGE_CLASSES:
continue
if is_excluded_artifact(key):
continue
yield key, obj.get("Size", 0)
Comment on lines +556 to +572
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3Source.list_keys currently yields keys for all non-directory objects under the prefix, including Spark/Delta/Hadoop sentinel artifacts (e.g. _SUCCESS.crc, _committed_*, .crc sidecars). Since glob expansion (and especially unstructuredData: true) can turn matched keys directly into containers, it’s safer to filter these artifacts at the listing stage as well (e.g., if is_excluded_artifact(key): continue) so they never participate in glob matching/grouping.

Copilot uses AI. Check for mistakes.

def fetch_buckets(self) -> List[S3BucketResponse]:
results: List[S3BucketResponse] = []
try:
Expand Down Expand Up @@ -678,36 +686,57 @@ def _get_sample_file_path(
self, bucket_name: str, metadata_entry: MetadataEntry
) -> Optional[str]:
"""
Given a bucket and a metadata entry, returns the full path key to a file which can then be used to infer schema
or None in the case of a non-structured metadata entry, or if no such keys can be found
Given a bucket and a metadata entry, returns the full path key to a
file which can then be used to infer schema, or None if no suitable
file exists.

Spark/Delta artifacts (``_SUCCESS``, ``_SUCCESS.crc``,
``_delta_log``, ``_temporary``, ``_spark_metadata``, ``.tmp``,
``_committed_*``, ``_started_*``) are always skipped — these
sentinel files are commonly 0-byte or non-parquet and would
crash the schema-inference readers (see Issue #24823).

The entry's ``structureFormat`` (if set) is used to prefer a
matching extension so a ``.parquet`` table is not sampled from
a neighbouring ``.csv`` or ``.crc`` file.
"""
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
# this will look only in the first 1000 files under that path (default for list_objects_v2).
# We'd rather not do pagination here as it would incur unwanted costs
if not prefix:
return None

# this will look only in the first 1000 files under that path
# (default for list_objects_v2). Pagination would incur unwanted costs.
try:
if prefix:
response = self.s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=prefix
)
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry
and entry.get("Key")
and not entry.get("Key").endswith("/")
and "/_delta_log/" not in entry.get("Key")
and not entry.get("Key").endswith("/_SUCCESS")
]
# pick a random key out of the candidates if any were returned
if candidate_keys:
result_key = secrets.choice(candidate_keys)
logger.info(
f"File {result_key} was picked to infer data structure from."
)
return result_key
logger.warning(
f"No sample files found in {prefix} with {metadata_entry.structureFormat} extension"
response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
all_keys = [
entry["Key"]
for entry in response.get(S3_CLIENT_ROOT_RESPONSE, []) or []
if entry
and entry.get("Key")
and not entry.get("Key").endswith("/")
and not is_excluded_artifact(entry.get("Key"))
]
# Prefer files that match the requested structureFormat
# extension when one is set; fall back to any remaining file
# if none match (some tables write parquet with uncommon
# extensions like .pq / .parq).
fmt = (metadata_entry.structureFormat or "").strip().lower()
if fmt:
preferred = [k for k in all_keys if k.lower().endswith("." + fmt)]
candidate_keys = preferred or all_keys
else:
candidate_keys = all_keys

if candidate_keys:
result_key = secrets.choice(candidate_keys)
logger.info(
f"File {result_key} was picked to infer data structure from."
)
return result_key
logger.warning(
f"No sample files found in {prefix} with "
f"{metadata_entry.structureFormat} extension"
)
return None
except Exception:
logger.debug(traceback.format_exc())
Expand Down Expand Up @@ -790,27 +819,66 @@ def _get_object_source_url(self, bucket_name: str, prefix: str) -> Optional[str]

def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConfig]:
"""
Load the metadata template file from the root of the bucket, if it exists
Load the metadata template file from the root of the bucket, if it exists.

Errors are distinguished so users can diagnose why a bucket was not
registered:

- Missing file → logged at INFO (expected when no manifest is used)
- JSON syntax error → WARNING with line/column
- Schema validation error (e.g. missing required field, wrong type) →
WARNING with Pydantic's per-field message
- Any other error → WARNING with the exception repr

All non-missing errors are also recorded on the workflow ``status``
so they show up in the Ingestion tab alongside other warnings.
"""
manifest_uri = f"s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
try:
logger.info(
f"Looking for metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
logger.info(f"Looking for metadata template file at - {manifest_uri}")
response_object = self.s3_reader.read(
path=OPENMETADATA_TEMPLATE_FILE_NAME,
bucket_name=bucket_name,
verbose=False,
)
except ReadException:
logger.info(
f"No manifest file found at {manifest_uri} — falling back to "
f"defaultManifest / global manifest if configured."
)
return None

try:
content = json.loads(response_object)
except json.JSONDecodeError as exc:
msg = (
f"Bucket manifest {manifest_uri} is not valid JSON "
f"(line {exc.lineno}, column {exc.colno}): {exc.msg}. "
f"This bucket will use the defaultManifest fallback if one is "
f"configured; otherwise no nested containers will be ingested."
)
logger.warning(msg)
self.status.warning(bucket_name, msg)
return None

try:
metadata_config = StorageContainerConfig.model_validate(content)
return metadata_config
except ReadException:
logger.warning(
f"No metadata file found at s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
except ValidationError as exc:
msg = (
f"Bucket manifest {manifest_uri} does not match the expected "
f"schema: {format_validation_error(exc)}. This bucket will use the defaultManifest "
f"fallback if one is configured; otherwise no nested "
f"containers will be ingested."
)
except Exception as exc:
logger.warning(msg)
self.status.warning(bucket_name, msg)
return None
except Exception as exc: # pragma: no cover — defensive
logger.debug(traceback.format_exc())
logger.warning(
f"Failed loading metadata file s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}"
)
return None
msg = f"Unexpected error loading manifest {manifest_uri}: {exc}"
logger.warning(msg)
self.status.warning(bucket_name, msg)
return None

logger.info(f"Loaded bucket-level manifest from {manifest_uri}")
return metadata_config
Loading
Loading