Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 168 additions & 44 deletions ingestion/src/metadata/ingestion/source/database/sas/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import re
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, Optional, Tuple

Expand Down Expand Up @@ -83,6 +84,86 @@
logger = ingestion_logger()


@dataclass(frozen=True)
class SASResourceContext:
"""Components extracted from a SAS Information Catalog resourceId.

The SAS Data Tables REST API exposes table resources at paths of the form:

/dataTables/dataSources/{provider}~fs~{host}~fs~{library}/tables/{table}

where ``~fs~`` is the field separator (literal, not URL-encoded).

Known provider values
---------------------
- ``cas`` — CAS (Cloud Analytic Services) table. *host* is the CAS
server name (e.g. ``cas-shared-default``).
- ``Compute`` — SAS Compute session table. *host* is a session UUID
(e.g. ``49736234-36b3-48d2-b2e2-e12aa365ce05``).

Real-world examples
-------------------
CAS table:
``/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples/tables/WATER_CLUSTER``
Compute table:
``/dataTables/dataSources/Compute~fs~49736234-…~fs~PUBLIC/tables/LAS_TRAIN``

Reference
---------
SAS REST API — Data Tables service:
https://developer.sas.com/rest-apis/dataTables
"""

provider: str
host: str
library: str
raw_resource_id: str

@property
def database_name(self) -> str:
return f"{self.provider}.{self.host}"


# The field separator used inside the ``dataSources`` path segment.
_SAS_FIELD_SEPARATOR = "~fs~"


def parse_resource_id(resource_id: str) -> Optional[SASResourceContext]:
"""Parse a SAS Information Catalog resourceId into its components.

Returns ``None`` (instead of raising) when the resourceId does not
conform to the expected shape so that callers can cleanly fall back
to the relationships-based lookup.
"""
segments = resource_id.split("/")
# Expected: ['', 'dataTables', 'dataSources', '<context>', 'tables', ...]
if len(segments) < 4:
logger.warning(
"resourceId %r has fewer than 4 slash-delimited segments; "
"cannot extract provider/host/library.",
resource_id,
)
return None

context = segments[3]
parts = context.split(_SAS_FIELD_SEPARATOR)
if len(parts) < 3:
logger.warning(
"resourceId context segment %r has %d field(s) (expected 3: "
"provider, host, library); cannot derive database/schema.",
context,
len(parts),
)
return None

return SASResourceContext(
provider=parts[0],
host=parts[1],
library=parts[2],
raw_resource_id=resource_id,
)


class SasSource(
DatabaseServiceSource
): # pylint: disable=too-many-instance-attributes,too-many-public-methods
Expand Down Expand Up @@ -232,53 +313,77 @@ def create_database_alt(self, db):

def create_database_schema(self, table):
"""
create database schema
Create database and schema entities for the given table.

First attempts to derive provider/host/library from the table's
``resourceId`` via ``parse_resource_id``. If the resourceId does
not match the expected SAS Data Tables shape, or the resulting
create/update call fails, falls back to a relationships-based
lookup through the Information Catalog.
"""
try:
context = table["resourceId"].split("/")[3]
resource_id = table.get("resourceId", "")
ctx = parse_resource_id(resource_id)

provider = context.split("~")[0]
self.db_name = provider + "." + context.split("~")[2]
self.db_schema_name = context.split("~")[4]
if ctx is not None:
try:
self.db_name = ctx.database_name
self.db_schema_name = ctx.library

database = CreateDatabaseRequest(
name=self.db_name,
displayName=self.db_name,
service=self.config.serviceName,
)
database = self.metadata.create_or_update(data=database)
database = CreateDatabaseRequest(
name=self.db_name,
displayName=self.db_name,
service=self.config.serviceName,
)
database = self.metadata.create_or_update(data=database)

db_schema = CreateDatabaseSchemaRequest(
name=self.db_schema_name, database=database.fullyQualifiedName
)
db_schema_entity = self.metadata.create_or_update(db_schema)
return db_schema_entity

except HTTPError as _:
# Find the "database" entity in Information Catalog
# First see if the table is a member of the library through the relationships attribute
# Or we could use views to query the dataStores
data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b"
data_store_id = None
for relation in table["relationships"]:
if relation["definitionId"] != data_store_data_sets:
continue
data_store_id = relation["endpointId"]
break
db_schema = CreateDatabaseSchemaRequest(
name=self.db_schema_name, database=database.fullyQualifiedName
)
return self.metadata.create_or_update(db_schema)

except HTTPError as exc:
logger.debug(
"Falling back to relationships-based schema lookup for "
"%s after HTTP error: %s",
resource_id,
exc,
)

if data_store_id is None:
# log error due to exclude amount of work with tables in dataTables
logger.error("Data store id should not be none")
return None
return self._create_database_schema_from_relationships(table)

data_store = self.sas_client.get_instance(data_store_id)
database = self.create_database_alt(data_store)
self.db_schema_name = data_store["name"]
db_schema = CreateDatabaseSchemaRequest(
name=data_store["name"], database=database.fullyQualifiedName
def _create_database_schema_from_relationships(self, table):
"""Derive database/schema from the table's catalog relationships.

This is the fallback path when ``parse_resource_id`` returns
``None`` or the primary create fails. It looks for a
``dataStoreDataSets`` relationship to locate the parent data
store, then uses ``create_database_alt`` for the database entity.
"""
data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b"
data_store_id = None
for relation in table.get("relationships", []):
if relation["definitionId"] != data_store_data_sets:
continue
data_store_id = relation["endpointId"]
break

if data_store_id is None:
logger.error(
"Failed to derive database schema for SAS table '%s' (resourceId=%s): "
"missing data store identifier because the expected "
"'dataStoreDataSets' relationship was not found.",
table.get("name", "<unknown>"),
table.get("resourceId", "<missing>"),
)
db_schema_entity = self.metadata.create_or_update(db_schema)
return db_schema_entity
return None

data_store = self.sas_client.get_instance(data_store_id)
database = self.create_database_alt(data_store)
self.db_schema_name = data_store["name"]
db_schema = CreateDatabaseSchemaRequest(
name=data_store["name"], database=database.fullyQualifiedName
)
return self.metadata.create_or_update(db_schema)

def create_columns_alt(self, table):
"""
Expand Down Expand Up @@ -439,6 +544,7 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]:
global table_fqn

table_entity, table_fqn = None, None
table_name = table.get("name") if isinstance(table, dict) else None

try:
table_url = self.sas_client.get_information_catalog_link(table["id"])
Expand Down Expand Up @@ -506,10 +612,13 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]:
custom_attributes = [
custom_attribute["name"] for custom_attribute in TABLE_CUSTOM_ATTR
]
# Drop null values — OpenMetadata's custom-field types
# (e.g. STRING_TYPE) reject null and fail the create with
# "Custom field <name> has invalid JSON [$: null found, string expected]"
extension_attributes = {
attr: value
for attr, value in table_extension.items()
if attr in custom_attributes
if attr in custom_attributes and value is not None
}

table_request = CreateTableRequest(
Expand All @@ -529,6 +638,18 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]:
table_entity = self.metadata.get_by_name(
entity=Table, fqn=self.get_table_fqn(table_name)
)
# If the table wasn't actually persisted (e.g. the sink
# rejected the CreateTableRequest), skip the follow-up
# patch/profile calls so we don't raise an AttributeError
# that masks the real sink-side failure.
if table_entity is None:
logger.warning(
f"Table [{table_name}] was not created in OpenMetadata; "
"skipping description/extension/profile updates. "
"Check the sink logs for the underlying error."
)
return

# update the description
logger.debug(
f"Updating description for {table_entity.id.root} with {table_description}"
Expand Down Expand Up @@ -595,10 +716,13 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]:

except Exception as exc:
logger.error(f"table failed to create: {table}")
error_name = table_name or (
table.get("id") if isinstance(table, dict) else "unknown"
)
yield Either(
left=StackTraceError(
name=table_name,
error=f"Unexpected exception to create table [{table_name}]: {exc}",
name=str(error_name),
error=f"Unexpected exception to create table [{error_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
Expand Down Expand Up @@ -637,7 +761,7 @@ def create_lineage_table_source(self, table_extension, table_name):
entity=Table, fqn=source_table_fqn
)

if source_table_entity:
if source_table_entity and target_table_entity:
yield from self.create_table_lineage(
source_table_entity, target_table_entity
)
Expand Down
Loading
Loading