From 1e8bbdbf9afc4d8a9d15e0a5a43d43239e12c7f2 Mon Sep 17 00:00:00 2001 From: harshsoni2024 Date: Fri, 10 Apr 2026 10:49:27 +0530 Subject: [PATCH 1/4] sas viya 4 null error changes --- .../ingestion/source/database/sas/metadata.py | 39 +- .../tests/unit/topology/database/test_sas.py | 338 ++++++++++++++++++ 2 files changed, 370 insertions(+), 7 deletions(-) create mode 100644 ingestion/tests/unit/topology/database/test_sas.py diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index 3cf7344d8739..4e00428033c4 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -236,10 +236,16 @@ def create_database_schema(self, table): """ try: context = table["resourceId"].split("/")[3] + context_parts = context.split("~") + if len(context_parts) < 5: + raise ValueError( + f"Unexpected resourceId format, cannot derive database/schema: " + f"{table['resourceId']}" + ) - provider = context.split("~")[0] - self.db_name = provider + "." + context.split("~")[2] - self.db_schema_name = context.split("~")[4] + provider = context_parts[0] + self.db_name = provider + "." + context_parts[2] + self.db_schema_name = context_parts[4] database = CreateDatabaseRequest( name=self.db_name, @@ -254,7 +260,7 @@ def create_database_schema(self, table): db_schema_entity = self.metadata.create_or_update(db_schema) return db_schema_entity - except HTTPError as _: + except (HTTPError, IndexError, KeyError, ValueError) as _: # Find the "database" entity in Information Catalog # First see if the table is a member of the library through the relationships attribute # Or we could use views to query the dataStores @@ -439,6 +445,7 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: global table_fqn table_entity, table_fqn = None, None + table_name = table.get("name") if isinstance(table, dict) else None try: table_url = self.sas_client.get_information_catalog_link(table["id"]) @@ -506,10 +513,13 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: custom_attributes = [ custom_attribute["name"] for custom_attribute in TABLE_CUSTOM_ATTR ] + # Drop null values — OpenMetadata's custom-field types + # (e.g. STRING_TYPE) reject null and fail the create with + # "Custom field has invalid JSON [$: null found, string expected]" extension_attributes = { attr: value for attr, value in table_extension.items() - if attr in custom_attributes + if attr in custom_attributes and value is not None } table_request = CreateTableRequest( @@ -529,6 +539,18 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: table_entity = self.metadata.get_by_name( entity=Table, fqn=self.get_table_fqn(table_name) ) + # If the table wasn't actually persisted (e.g. the sink + # rejected the CreateTableRequest), skip the follow-up + # patch/profile calls so we don't raise an AttributeError + # that masks the real sink-side failure. + if table_entity is None: + logger.warning( + f"Table [{table_name}] was not created in OpenMetadata; " + "skipping description/extension/profile updates. " + "Check the sink logs for the underlying error." + ) + return + # update the description logger.debug( f"Updating description for {table_entity.id.root} with {table_description}" @@ -595,10 +617,13 @@ def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: except Exception as exc: logger.error(f"table failed to create: {table}") + error_name = table_name or ( + table.get("id") if isinstance(table, dict) else "unknown" + ) yield Either( left=StackTraceError( - name=table_name, - error=f"Unexpected exception to create table [{table_name}]: {exc}", + name=str(error_name), + error=f"Unexpected exception to create table [{error_name}]: {exc}", stackTrace=traceback.format_exc(), ) ) diff --git a/ingestion/tests/unit/topology/database/test_sas.py b/ingestion/tests/unit/topology/database/test_sas.py new file mode 100644 index 000000000000..d61b43bace40 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_sas.py @@ -0,0 +1,338 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for the SAS connector. + +These tests pin the bug fixes from issue #16888 where metadata ingestion +failed with a default SAS Viya 4 configuration because: + 1. `casHost` (and other nullable SAS attributes) came back as None and + the backend rejected the CreateTableRequest with 400 + "Custom field casHost has invalid JSON [$: null found, string expected]". + 2. After the sink rejection, the source re-fetched the table and crashed + on `None.id`, masking the real sink error. + 3. The bare `except` in `create_table_entity` referenced `table_name` + before it was assigned in some code paths. + 4. `create_database_schema` only caught `HTTPError`, so a malformed + `resourceId` raised an uncaught `IndexError`. +""" +# pylint: disable=protected-access + +from unittest.mock import MagicMock, patch + +import pytest +from requests.exceptions import HTTPError + +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.source.database.sas.client import SASClient +from metadata.ingestion.source.database.sas.metadata import SasSource + +MOCK_SAS_CONFIG = { + "source": { + "type": "sas", + "serviceName": "local_sas", + "serviceConnection": { + "config": { + "type": "SAS", + "serverHost": "http://your-server-host.org", + "username": "username", + "password": "password", + "datatables": True, + "dataTablesCustomFilter": None, + "reports": False, + "reportsCustomFilter": None, + "dataflows": False, + "dataflowsCustomFilter": None, + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "sas-unit-test"}, + } + }, +} + +# A realistic dataset search hit taken from the user's ingestion log +# (#16888). resourceId uses the "~fs~" separator style that the parsing +# code in create_database_schema relies on. +LAS_TRAIN_SEARCH_HIT = { + "id": "0396a44a-889f-4ee0-8211-252fc088a3cc", + "name": "LAS_TRAIN", + "type": "sasTable", + "attributes": {"library": "PUBLIC", "reviewStatus": "none"}, +} + +# Mock `get_views` response. The SAS source expects a `dataSet` entity +# plus zero or more `dataField`/`Column` entities in "entities". +LAS_TRAIN_VIEW = { + "entities": [ + { + "id": "0396a44a-889f-4ee0-8211-252fc088a3cc", + "type": ["Table", "dataSet"], + "name": "LAS_TRAIN", + "resourceId": ( + "/dataTables/dataSources/Compute~fs~" + "49736234-36b3-48d2-b2e2-e12aa365ce05~fs~PUBLIC/tables/LAS_TRAIN" + ), + "creationTimeStamp": None, + "attributes": { + "analysisTimeStamp": "2024-07-01T10:25:00.000Z", + "rowCount": 10, + "columnCount": 1, + "dataSize": 1024, + # The specific field that triggered the original bug — + # SAS returns it as null for compute-backed tables. + "casHost": None, + "CASLIB": "PUBLIC", + "engineName": "V9", + }, + }, + { + "id": "col-1", + "type": ["Column"], + "name": "col1", + "attributes": { + "dataType": "char", + "ordinalPosition": 1, + "charsMaxCount": 10, + }, + }, + ] +} + + +@pytest.fixture +def sas_source(): + """Build a SasSource with every network call mocked out.""" + with patch.object(SASClient, "get_token", return_value="token"), patch( + "metadata.ingestion.source.database.sas.metadata.SasSource.test_connection" + ), patch( + "metadata.ingestion.source.database.sas.metadata." + "SasSource.add_table_custom_attributes" + ): + config = OpenMetadataWorkflowConfig.model_validate(MOCK_SAS_CONFIG) + source = SasSource.create( + MOCK_SAS_CONFIG["source"], + MagicMock(), + ) + source.config = config.source + source.db_service_name = "local_sas" + return source + + +class TestCreateDatabaseSchema: + """Cover the fragile resourceId parsing in create_database_schema.""" + + def test_parses_standard_cas_resource_id(self, sas_source): + """A well-formed cas~fs~host~fs~lib path should populate db_name + and db_schema_name without hitting the HTTPError fallback.""" + sas_source.metadata = MagicMock() + sas_source.metadata.create_or_update.return_value = MagicMock( + fullyQualifiedName="cas.cas-shared-default" + ) + table = { + "resourceId": ( + "/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples" + "/tables/WATER_CLUSTER?ext=sashdat" + ), + } + + sas_source.create_database_schema(table) + + assert sas_source.db_name == "cas.cas-shared-default" + assert sas_source.db_schema_name == "Samples" + + def test_malformed_resource_id_falls_back_without_index_error(self, sas_source): + """A resourceId without the expected tilde segments used to raise + IndexError because the old code only caught HTTPError. It should + now enter the relationships-based fallback path.""" + sas_source.metadata = MagicMock() + sas_source.sas_client = MagicMock() + sas_source.sas_client.get_instance.return_value = { + "name": "fallback_schema", + "resourceId": "/dataSources/some/parent", + "links": [{"rel": "parent", "uri": "/parent"}], + } + # Stub create_database_alt so we don't need a real API round-trip. + sas_source.create_database_alt = MagicMock( + return_value=MagicMock(fullyQualifiedName="fallback_db") + ) + + table = { + # Only three slash-delimited segments: indexing [3] raises. + "resourceId": "/too/short", + "relationships": [ + { + "definitionId": "4b114f6e-1c2a-4060-9184-6809a612f27b", + "endpointId": "data-store-1", + } + ], + } + + result = sas_source.create_database_schema(table) + + assert result is not None + sas_source.create_database_alt.assert_called_once() + + def test_fallback_returns_none_when_no_data_store_relationship(self, sas_source): + """If the resourceId is malformed *and* there's no datastore + relationship, the method should bail out cleanly instead of + raising.""" + sas_source.metadata = MagicMock() + sas_source.sas_client = MagicMock() + + table = { + "resourceId": "/x/y", # not enough segments + "relationships": [], + } + + assert sas_source.create_database_schema(table) is None + + +class TestExtensionAttributeFiltering: + """The primary bug: null extension values must be stripped before + the CreateTableRequest is yielded, otherwise the sink returns 400.""" + + def _run_create_table_entity(self, sas_source): + sas_source.sas_client = MagicMock() + sas_source.sas_client.get_information_catalog_link.return_value = ( + "http://sas/catalog/LAS_TRAIN" + ) + sas_source.metadata = MagicMock() + # Table does not exist yet, so the source should yield a Create. + sas_source.metadata.get_by_name.return_value = None + + with patch.object( + SasSource, + "get_entities_using_view", + return_value=(LAS_TRAIN_VIEW["entities"], LAS_TRAIN_VIEW["entities"][0]), + ), patch.object( + SasSource, + "create_database_schema", + return_value=MagicMock(fullyQualifiedName="cas.49736234.PUBLIC"), + ): + return list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT)) + + def test_null_cas_host_is_dropped_from_extension(self, sas_source): + """The regression guard from #16888: casHost=None must not end + up in the CreateTableRequest extension.""" + results = self._run_create_table_entity(sas_source) + + create_requests = [ + r.right + for r in results + if r.right is not None and isinstance(r.right, CreateTableRequest) + ] + assert create_requests, f"No CreateTableRequest yielded: {results}" + request = create_requests[0] + assert request.extension is not None + extension = request.extension.root + assert "casHost" not in extension, ( + "Null casHost must be stripped so the backend does not reject " + "the create with 'null found, string expected'" + ) + # Non-null custom attributes should still be kept. + assert extension.get("CASLIB") == "PUBLIC" + assert extension.get("engineName") == "V9" + + +class TestSinkFailureGuard: + """After yielding the CreateTableRequest, the source must tolerate + the table not existing (e.g. because the sink rejected the create).""" + + def test_missing_table_after_yield_does_not_raise_attribute_error(self, sas_source): + """Simulates the log in #16888: get_by_name returns None after the + yield because the sink 400'd. We must NOT crash on `None.id`.""" + sas_source.sas_client = MagicMock() + sas_source.sas_client.get_information_catalog_link.return_value = ( + "http://sas/catalog/LAS_TRAIN" + ) + sas_source.metadata = MagicMock() + # Two get_by_name calls: + # 1. Check-before-create → None (table does not exist yet) + # 2. Re-fetch after yield → None (sink rejected create) + sas_source.metadata.get_by_name.return_value = None + + with patch.object( + SasSource, + "get_entities_using_view", + return_value=(LAS_TRAIN_VIEW["entities"], LAS_TRAIN_VIEW["entities"][0]), + ), patch.object( + SasSource, + "create_database_schema", + return_value=MagicMock(fullyQualifiedName="cas.49736234.PUBLIC"), + ), patch.object( + SasSource, "create_lineage_table_source", return_value=iter([]) + ): + results = list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT)) + + # The CreateTableRequest must still be yielded (the sink will + # record its own failure); the source itself must NOT yield a + # StackTraceError (AttributeError) on the follow-up patch calls. + stack_trace_errors = [r for r in results if r.left is not None] + assert not stack_trace_errors, ( + f"Source should not raise after sink-side failure, got: " + f"{[e.left.error for e in stack_trace_errors]}" + ) + # The PATCH/profile calls must not have been invoked because we + # returned early. + sas_source.metadata.client.patch.assert_not_called() + sas_source.metadata.client.put.assert_not_called() + + +class TestExceptionHandlerSafety: + """The bare except used to reference `table_name` which could be + undefined if the exception fired before it was assigned.""" + + def test_exception_before_table_name_assigned_yields_stack_trace(self, sas_source): + """If get_entities_using_view throws, `table_name` is not set. + The except block must still produce a valid StackTraceError + (previously raised UnboundLocalError).""" + sas_source.sas_client = MagicMock() + sas_source.sas_client.get_information_catalog_link.return_value = "url" + sas_source.metadata = MagicMock() + + with patch.object( + SasSource, + "get_entities_using_view", + side_effect=HTTPError("boom"), + ): + results = list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT)) + + errors = [r.left for r in results if r.left is not None] + assert len(errors) == 1 + error = errors[0] + # Falls back to the search-hit's name (not an UnboundLocalError). + assert error.name == "LAS_TRAIN" + assert "boom" in error.error + + def test_exception_with_non_dict_table_yields_unknown_name(self, sas_source): + """Defensive: even if `table` is not a dict, the except block + should still produce a valid StackTraceError.""" + sas_source.sas_client = MagicMock() + sas_source.sas_client.get_information_catalog_link.side_effect = RuntimeError( + "kaboom" + ) + sas_source.metadata = MagicMock() + + results = list(sas_source.create_table_entity({"id": "abc"})) + + errors = [r.left for r in results if r.left is not None] + assert len(errors) == 1 + # Without a "name" in the search hit, we fall back to the id. + assert errors[0].name == "abc" From 1ba5716c87131d93fadceb42c91d464c742dd722 Mon Sep 17 00:00:00 2001 From: harshsoni2024 Date: Fri, 10 Apr 2026 11:05:27 +0530 Subject: [PATCH 2/4] comments fixes --- .../ingestion/source/database/sas/metadata.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index 4e00428033c4..df1982af2a56 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -237,6 +237,11 @@ def create_database_schema(self, table): try: context = table["resourceId"].split("/")[3] context_parts = context.split("~") + # Explicit shape check (rather than letting context_parts[4] + # raise IndexError): gives operators a named error in the + # fallback log and guards against "5 parts but wrong shape" + # strings that would otherwise silently build a garbage + # db_name/schema pair. if len(context_parts) < 5: raise ValueError( f"Unexpected resourceId format, cannot derive database/schema: " @@ -260,7 +265,21 @@ def create_database_schema(self, table): db_schema_entity = self.metadata.create_or_update(db_schema) return db_schema_entity - except (HTTPError, IndexError, KeyError, ValueError) as _: + except (HTTPError, IndexError, KeyError, ValueError) as exc: + # Distinguish the "expected" HTTP-failure path (catalog/create + # request bounced) from a parsing/format issue with the + # resourceId, so operators can tell them apart in the logs. + if isinstance(exc, HTTPError): + logger.debug( + "Falling back to relationships-based schema lookup for " + f"{table.get('resourceId')} after HTTP error: {exc}" + ) + else: + logger.warning( + "Could not derive database/schema from resourceId " + f"{table.get('resourceId')!r} ({type(exc).__name__}: {exc}); " + "falling back to relationships-based lookup." + ) # Find the "database" entity in Information Catalog # First see if the table is a member of the library through the relationships attribute # Or we could use views to query the dataStores From 74dd259ff61416a018e6d6c6040281b094589fe5 Mon Sep 17 00:00:00 2001 From: harshsoni2024 Date: Fri, 17 Apr 2026 10:51:31 +0530 Subject: [PATCH 3/4] centralize sas resource parsing --- .../ingestion/source/database/sas/metadata.py | 200 ++++++++++++------ .../tests/unit/topology/database/test_sas.py | 79 +++++-- 2 files changed, 201 insertions(+), 78 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index df1982af2a56..b5f974aaf042 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -18,6 +18,7 @@ import json import re import traceback +from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, Iterable, Optional, Tuple @@ -83,6 +84,86 @@ logger = ingestion_logger() +@dataclass(frozen=True) +class SASResourceContext: + """Components extracted from a SAS Information Catalog resourceId. + + The SAS Data Tables REST API exposes table resources at paths of the form: + + /dataTables/dataSources/{provider}~fs~{host}~fs~{library}/tables/{table} + + where ``~fs~`` is the field separator (literal, not URL-encoded). + + Known provider values + --------------------- + - ``cas`` — CAS (Cloud Analytic Services) table. *host* is the CAS + server name (e.g. ``cas-shared-default``). + - ``Compute`` — SAS Compute session table. *host* is a session UUID + (e.g. ``49736234-36b3-48d2-b2e2-e12aa365ce05``). + + Real-world examples + ------------------- + CAS table: + ``/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples/tables/WATER_CLUSTER`` + Compute table: + ``/dataTables/dataSources/Compute~fs~49736234-…~fs~PUBLIC/tables/LAS_TRAIN`` + + Reference + --------- + SAS REST API — Data Tables service: + https://developer.sas.com/rest-apis/dataTables + """ + + provider: str + host: str + library: str + raw_resource_id: str + + @property + def database_name(self) -> str: + return f"{self.provider}.{self.host}" + + +# The field separator used inside the ``dataSources`` path segment. +_SAS_FIELD_SEPARATOR = "~fs~" + + +def parse_resource_id(resource_id: str) -> Optional[SASResourceContext]: + """Parse a SAS Information Catalog resourceId into its components. + + Returns ``None`` (instead of raising) when the resourceId does not + conform to the expected shape so that callers can cleanly fall back + to the relationships-based lookup. + """ + segments = resource_id.split("/") + # Expected: ['', 'dataTables', 'dataSources', '', 'tables', ...] + if len(segments) < 4: + logger.warning( + "resourceId %r has fewer than 4 slash-delimited segments; " + "cannot extract provider/host/library.", + resource_id, + ) + return None + + context = segments[3] + parts = context.split(_SAS_FIELD_SEPARATOR) + if len(parts) < 3: + logger.warning( + "resourceId context segment %r has %d field(s) (expected 3: " + "provider, host, library); cannot derive database/schema.", + context, + len(parts), + ) + return None + + return SASResourceContext( + provider=parts[0], + host=parts[1], + library=parts[2], + raw_resource_id=resource_id, + ) + + class SasSource( DatabaseServiceSource ): # pylint: disable=too-many-instance-attributes,too-many-public-methods @@ -232,78 +313,71 @@ def create_database_alt(self, db): def create_database_schema(self, table): """ - create database schema + Create database and schema entities for the given table. + + First attempts to derive provider/host/library from the table's + ``resourceId`` via ``parse_resource_id``. If the resourceId does + not match the expected SAS Data Tables shape, or the resulting + create/update call fails, falls back to a relationships-based + lookup through the Information Catalog. """ - try: - context = table["resourceId"].split("/")[3] - context_parts = context.split("~") - # Explicit shape check (rather than letting context_parts[4] - # raise IndexError): gives operators a named error in the - # fallback log and guards against "5 parts but wrong shape" - # strings that would otherwise silently build a garbage - # db_name/schema pair. - if len(context_parts) < 5: - raise ValueError( - f"Unexpected resourceId format, cannot derive database/schema: " - f"{table['resourceId']}" - ) + resource_id = table.get("resourceId", "") + ctx = parse_resource_id(resource_id) + + if ctx is not None: + try: + self.db_name = ctx.database_name + self.db_schema_name = ctx.library - provider = context_parts[0] - self.db_name = provider + "." + context_parts[2] - self.db_schema_name = context_parts[4] + database = CreateDatabaseRequest( + name=self.db_name, + displayName=self.db_name, + service=self.config.serviceName, + ) + database = self.metadata.create_or_update(data=database) - database = CreateDatabaseRequest( - name=self.db_name, - displayName=self.db_name, - service=self.config.serviceName, - ) - database = self.metadata.create_or_update(data=database) + db_schema = CreateDatabaseSchemaRequest( + name=self.db_schema_name, database=database.fullyQualifiedName + ) + return self.metadata.create_or_update(db_schema) - db_schema = CreateDatabaseSchemaRequest( - name=self.db_schema_name, database=database.fullyQualifiedName - ) - db_schema_entity = self.metadata.create_or_update(db_schema) - return db_schema_entity - - except (HTTPError, IndexError, KeyError, ValueError) as exc: - # Distinguish the "expected" HTTP-failure path (catalog/create - # request bounced) from a parsing/format issue with the - # resourceId, so operators can tell them apart in the logs. - if isinstance(exc, HTTPError): + except HTTPError as exc: logger.debug( "Falling back to relationships-based schema lookup for " - f"{table.get('resourceId')} after HTTP error: {exc}" - ) - else: - logger.warning( - "Could not derive database/schema from resourceId " - f"{table.get('resourceId')!r} ({type(exc).__name__}: {exc}); " - "falling back to relationships-based lookup." + "%s after HTTP error: %s", + resource_id, + exc, ) - # Find the "database" entity in Information Catalog - # First see if the table is a member of the library through the relationships attribute - # Or we could use views to query the dataStores - data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b" - data_store_id = None - for relation in table["relationships"]: - if relation["definitionId"] != data_store_data_sets: - continue - data_store_id = relation["endpointId"] - break - if data_store_id is None: - # log error due to exclude amount of work with tables in dataTables - logger.error("Data store id should not be none") - return None + return self._create_database_schema_from_relationships(table) - data_store = self.sas_client.get_instance(data_store_id) - database = self.create_database_alt(data_store) - self.db_schema_name = data_store["name"] - db_schema = CreateDatabaseSchemaRequest( - name=data_store["name"], database=database.fullyQualifiedName - ) - db_schema_entity = self.metadata.create_or_update(db_schema) - return db_schema_entity + def _create_database_schema_from_relationships(self, table): + """Derive database/schema from the table's catalog relationships. + + This is the fallback path when ``parse_resource_id`` returns + ``None`` or the primary create fails. It looks for a + ``dataStoreDataSets`` relationship to locate the parent data + store, then uses ``create_database_alt`` for the database entity. + """ + data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b" + data_store_id = None + for relation in table.get("relationships", []): + if relation["definitionId"] != data_store_data_sets: + continue + data_store_id = relation["endpointId"] + break + + if data_store_id is None: + logger.error("Data store id should not be none") + return None + + data_store = self.sas_client.get_instance(data_store_id) + database = self.create_database_alt(data_store) + self.db_schema_name = data_store["name"] + db_schema = CreateDatabaseSchemaRequest( + name=data_store["name"], database=database.fullyQualifiedName + ) + return self.metadata.create_or_update(db_schema) def create_columns_alt(self, table): """ diff --git a/ingestion/tests/unit/topology/database/test_sas.py b/ingestion/tests/unit/topology/database/test_sas.py index d61b43bace40..8d3999685196 100644 --- a/ingestion/tests/unit/topology/database/test_sas.py +++ b/ingestion/tests/unit/topology/database/test_sas.py @@ -35,7 +35,11 @@ OpenMetadataWorkflowConfig, ) from metadata.ingestion.source.database.sas.client import SASClient -from metadata.ingestion.source.database.sas.metadata import SasSource +from metadata.ingestion.source.database.sas.metadata import ( + SASResourceContext, + SasSource, + parse_resource_id, +) MOCK_SAS_CONFIG = { "source": { @@ -135,12 +139,65 @@ def sas_source(): return source +class TestParseResourceId: + """Cover the standalone parse_resource_id function that extracts + provider/host/library from a SAS Information Catalog resourceId. + + Known shapes (SAS Data Tables REST API): + /dataTables/dataSources/{provider}~fs~{host}~fs~{library}/tables/{table} + """ + + def test_cas_table(self): + ctx = parse_resource_id( + "/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples" + "/tables/WATER_CLUSTER?ext=sashdat" + ) + assert ctx == SASResourceContext( + provider="cas", + host="cas-shared-default", + library="Samples", + raw_resource_id=( + "/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples" + "/tables/WATER_CLUSTER?ext=sashdat" + ), + ) + assert ctx.database_name == "cas.cas-shared-default" + + def test_compute_table(self): + ctx = parse_resource_id( + "/dataTables/dataSources/Compute~fs~" + "49736234-36b3-48d2-b2e2-e12aa365ce05~fs~PUBLIC/tables/LAS_TRAIN" + ) + assert ctx.provider == "Compute" + assert ctx.host == "49736234-36b3-48d2-b2e2-e12aa365ce05" + assert ctx.library == "PUBLIC" + assert ctx.database_name == "Compute.49736234-36b3-48d2-b2e2-e12aa365ce05" + + def test_too_few_slash_segments_returns_none(self): + assert parse_resource_id("/too/short") is None + + def test_missing_field_separator_returns_none(self): + assert ( + parse_resource_id("/dataTables/dataSources/no-separators-here/tables/T") + is None + ) + + def test_only_two_fields_returns_none(self): + assert parse_resource_id("/dataTables/dataSources/cas~fs~host/tables/T") is None + + def test_empty_string_returns_none(self): + assert parse_resource_id("") is None + + def test_frozen_dataclass(self): + ctx = parse_resource_id("/dataTables/dataSources/cas~fs~host~fs~lib/tables/T") + with pytest.raises(AttributeError): + ctx.provider = "modified" + + class TestCreateDatabaseSchema: - """Cover the fragile resourceId parsing in create_database_schema.""" + """Cover create_database_schema using parse_resource_id + fallback.""" - def test_parses_standard_cas_resource_id(self, sas_source): - """A well-formed cas~fs~host~fs~lib path should populate db_name - and db_schema_name without hitting the HTTPError fallback.""" + def test_well_formed_resource_id_sets_db_and_schema(self, sas_source): sas_source.metadata = MagicMock() sas_source.metadata.create_or_update.return_value = MagicMock( fullyQualifiedName="cas.cas-shared-default" @@ -157,10 +214,7 @@ def test_parses_standard_cas_resource_id(self, sas_source): assert sas_source.db_name == "cas.cas-shared-default" assert sas_source.db_schema_name == "Samples" - def test_malformed_resource_id_falls_back_without_index_error(self, sas_source): - """A resourceId without the expected tilde segments used to raise - IndexError because the old code only caught HTTPError. It should - now enter the relationships-based fallback path.""" + def test_malformed_resource_id_falls_back_to_relationships(self, sas_source): sas_source.metadata = MagicMock() sas_source.sas_client = MagicMock() sas_source.sas_client.get_instance.return_value = { @@ -168,13 +222,11 @@ def test_malformed_resource_id_falls_back_without_index_error(self, sas_source): "resourceId": "/dataSources/some/parent", "links": [{"rel": "parent", "uri": "/parent"}], } - # Stub create_database_alt so we don't need a real API round-trip. sas_source.create_database_alt = MagicMock( return_value=MagicMock(fullyQualifiedName="fallback_db") ) table = { - # Only three slash-delimited segments: indexing [3] raises. "resourceId": "/too/short", "relationships": [ { @@ -190,14 +242,11 @@ def test_malformed_resource_id_falls_back_without_index_error(self, sas_source): sas_source.create_database_alt.assert_called_once() def test_fallback_returns_none_when_no_data_store_relationship(self, sas_source): - """If the resourceId is malformed *and* there's no datastore - relationship, the method should bail out cleanly instead of - raising.""" sas_source.metadata = MagicMock() sas_source.sas_client = MagicMock() table = { - "resourceId": "/x/y", # not enough segments + "resourceId": "/x/y", "relationships": [], } From 705cbae3b235891eec3731f4c836d24243a9ddef Mon Sep 17 00:00:00 2001 From: harshsoni2024 Date: Fri, 17 Apr 2026 10:58:14 +0530 Subject: [PATCH 4/4] minor fixes --- .../metadata/ingestion/source/database/sas/metadata.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index b5f974aaf042..65fec6955f8b 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -368,7 +368,13 @@ def _create_database_schema_from_relationships(self, table): break if data_store_id is None: - logger.error("Data store id should not be none") + logger.error( + "Failed to derive database schema for SAS table '%s' (resourceId=%s): " + "missing data store identifier because the expected " + "'dataStoreDataSets' relationship was not found.", + table.get("name", ""), + table.get("resourceId", ""), + ) return None data_store = self.sas_client.get_instance(data_store_id) @@ -755,7 +761,7 @@ def create_lineage_table_source(self, table_extension, table_name): entity=Table, fqn=source_table_fqn ) - if source_table_entity: + if source_table_entity and target_table_entity: yield from self.create_table_lineage( source_table_entity, target_table_entity )