Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 70 additions & 20 deletions ingestion/src/metadata/ingestion/source/database/sas/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
from metadata.generated.schema.entity.services.connections.database.sasConnection import (
SASConnection,
)
from metadata.generated.schema.security.ssl.verifySSLConfig import VerifySSL
from metadata.ingestion.connections.source_api_client import TrackedREST
from metadata.ingestion.ometa.client import APIError, ClientConfig
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
from metadata.utils.ssl_registry import get_verify_ssl_fn

logger = ingestion_logger()

SAS_CLI_AUTH_HEADER = "Basic c2FzLmNsaTo="


class SASClient:
"""
Expand All @@ -32,14 +36,21 @@ class SASClient:

def __init__(self, config: SASConnection):
self.config: SASConnection = config
self.auth_token = self.get_token(config.serverHost, config.username, config.password.get_secret_value())
verify = self._get_verify()
self.auth_token = self.get_token(
config.serverHost,
config.username,
config.password.get_secret_value(),
verify=verify,
)

client_config: ClientConfig = ClientConfig(
base_url=clean_uri(config.serverHost),
auth_header="Authorization",
auth_token=self.get_auth_token,
api_version="",
allow_redirects=True,
verify=False,
verify=verify,
)
self.client = TrackedREST(client_config, source_name="sas")
# custom setting
Expand All @@ -50,6 +61,22 @@ def __init__(self, config: SASConnection):
self.enable_dataflows = config.dataflows
self.custom_filter_dataflows = config.dataflowsCustomFilter

def _get_verify(self):
"""
Helper to determine the SSL verification strategy
"""
verify = True
if self.config.verifySSL == VerifySSL.ignore:
verify = False
elif self.config.verifySSL == VerifySSL.no_ssl:
verify = False
elif self.config.verifySSL == VerifySSL.validate and self.config.sslConfig:
try:
verify = get_verify_ssl_fn(self.config.verifySSL)(self.config.sslConfig)
except Exception: # pylint: disable=broad-except
verify = True
return verify

def check_connection(self):
"""
Check metadata connection to SAS
Expand All @@ -71,8 +98,8 @@ def get_instance(self, instance_id):
"Accept": "application/vnd.sas.metadata.instance.entity.detail+json",
}
response = self.client.get(path=endpoint, headers=headers)
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
return response

def get_information_catalog_link(self, instance_id):
Expand All @@ -98,8 +125,8 @@ def list_assets(self, assets):
endpoint = f"catalog/search?indices={assets}&q={asset_filter if str(asset_filter) != 'None' else '*'}"
headers = {"Accept-Item": "application/vnd.sas.metadata.instance.entity+json"}
response = self.client.get(path=endpoint, headers=headers)
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
return response["items"]

def get_views(self, query):
Expand All @@ -110,8 +137,8 @@ def get_views(self, query):
}
logger.info(f"{query}")
response = self.client.post(path=endpoint, data=query, headers=headers)
if "error" in response.keys(): # noqa: SIM118
raise APIError(f"{response}")
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": "Error fetching views from SAS", "code": 0})
return response

def get_data_source(self, endpoint):
Expand All @@ -120,8 +147,8 @@ def get_data_source(self, endpoint):
}
response = self.client.get(path=endpoint, headers=headers)
logger.info(f"{response}")
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
return response

def get_report_link(self, resource, uri):
Expand All @@ -135,8 +162,8 @@ def load_table(self, endpoint):
def get_report_relationship(self, report_id):
endpoint = f"reports/commons/relationships/reports/{report_id}"
response = self.client.get(endpoint)
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
dependencies = []
for item in response["items"]:
if item["type"] == "Dependent":
Expand All @@ -145,27 +172,50 @@ def get_report_relationship(self, report_id):

def get_resource(self, endpoint):
response = self.client.get(endpoint)
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
return response

def get_instances_with_param(self, data):
endpoint = f"catalog/instances?{data}"
response = self.client.get(endpoint)
if "error" in response.keys(): # noqa: SIM118
raise APIError(response["error"])
if response and isinstance(response, dict) and "error" in response:
raise APIError({"message": response["error"], "code": 0})
return response["items"]

def get_auth_token(self):
return self.auth_token, 0

def get_token(self, base_url, user, password):
def get_token(self, base_url, user, password, verify=True):
endpoint = "/SASLogon/oauth/token"
payload = {"grant_type": "password", "username": user, "password": password}
headers = {
"Content-type": "application/x-www-form-urlencoded",
"Authorization": "Basic c2FzLmNsaTo=",
"Authorization": SAS_CLI_AUTH_HEADER,
}
url = base_url + endpoint
response = requests.request("POST", url, headers=headers, data=payload, verify=False, timeout=10)
return response.json()["access_token"]

response = requests.request(
"POST",
url,
headers=headers,
data=payload,
verify=verify,
timeout=10,
)
logger.debug(
"Token request for user: %s completed with status: %s",
user,
response.status_code,
)
try:
body = response.json()
except ValueError as exc:
response.raise_for_status()
raise RuntimeError(f"SAS token endpoint returned non-JSON response (HTTP {response.status_code})") from exc

response.raise_for_status()
token = body.get("access_token")
if not token:
raise RuntimeError(f"Failed to retrieve access_token from SAS (HTTP {response.status_code})")
return token
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.generated.schema.security.ssl.verifySSLConfig import VerifySSL
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -102,22 +103,28 @@ def _handle_ssl_context_by_path(ssl_config: SslConfig):
return ca_cert, client_cert, private_key


def get_ssl_context(ssl_config: SslConfig) -> ssl.SSLContext:
def get_ssl_context(
ssl_config: SslConfig | None, verify_ssl: VerifySSL | None = VerifySSL.validate
) -> ssl.SSLContext | None:
"""
Method to get SSL Context
"""
if verify_ssl == VerifySSL.ignore:
return ssl._create_unverified_context() # pylint: disable=protected-access
Comment on lines +112 to +113
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Elasticsearch get_ssl_context returns unverified context for no certs

When verify_ssl == VerifySSL.validate (the default) but no ssl_config or certificates are provided, the function falls through to line 143 and returns ssl.create_default_context(). This is correct. However, when verify_ssl == VerifySSL.ignore, it returns ssl._create_unverified_context() which disables hostname and cert verification. This is intentional per the PR but worth noting that VerifySSL.ignore in the Elasticsearch path creates an SSL context that will still attempt TLS (just without verification), whereas in the SAS path it sets verify=False which may skip TLS entirely depending on the HTTP library. The behavior difference between connectors for the same enum value could confuse users.

Was this helpful? React with 👍 / 👎


if verify_ssl == VerifySSL.no_ssl:
return None

ca_cert = False
client_cert = None
private_key = None
cert_chain = None

if not ssl_config.certificates:
return None

if isinstance(ssl_config.certificates, SslCertificatesByValues):
ca_cert, client_cert, private_key = _handle_ssl_context_by_value(ssl_config=ssl_config)
elif isinstance(ssl_config.certificates, SslCertificatesByPath):
ca_cert, client_cert, private_key = _handle_ssl_context_by_path(ssl_config=ssl_config)
if ssl_config and ssl_config.certificates:
if isinstance(ssl_config.certificates, SslCertificatesByValues):
ca_cert, client_cert, private_key = _handle_ssl_context_by_value(ssl_config=ssl_config)
elif isinstance(ssl_config.certificates, SslCertificatesByPath):
ca_cert, client_cert, private_key = _handle_ssl_context_by_path(ssl_config=ssl_config)

if client_cert and private_key:
cert_chain = (client_cert, private_key)
Expand All @@ -133,7 +140,7 @@ def get_ssl_context(ssl_config: SslConfig) -> ssl.SSLContext:
)
return ssl_context # noqa: RET504

return ssl._create_unverified_context() # pylint: disable=protected-access
return ssl.create_default_context()


def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
Expand All @@ -146,7 +153,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
if isinstance(connection.authType, BasicAuthentication) and connection.authType.username:
basic_auth = (
connection.authType.username,
connection.authType.password.get_secret_value() if connection.authType.password else None,
(connection.authType.password.get_secret_value() if connection.authType.password else None),
)

if isinstance(connection.authType, ApiKeyAuthentication):
Expand All @@ -161,8 +168,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()

if connection.sslConfig:
ssl_context = get_ssl_context(connection.sslConfig)
ssl_context = get_ssl_context(connection.sslConfig, connection.verifySSL)

return Elasticsearch(
str(connection.hostPort),
Expand All @@ -188,15 +194,16 @@ def test_connection(
def test_get_search_indexes():
try:
result = client.indices.get_alias(expand_wildcards="open")
if result is None:
raise ConnectionError("Failed to retrieve search indexes from Elasticsearch") # noqa: TRY301
return result # noqa: TRY300
except Exception as exc:
raise ConnectionError(
f"Unable to connect to Elasticsearch or retrieve indexes: {exc}. "
"Please check your Elasticsearch connection configuration and cluster health."
) from exc

if result is None:
raise ConnectionError("Failed to retrieve search indexes from Elasticsearch")
return result

test_fn = {
"CheckAccess": client.info,
"GetSearchIndexes": test_get_search_indexes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_string_value(self, secret_id: str) -> Optional[str]: # noqa: UP045
try:
kwargs = {"SecretId": secret_id}
response = self.client.get_secret_value(**kwargs)
logger.debug("Got value for secret %s.", secret_id)
logger.debug("Successfully retrieved value from secrets manager.")
Comment thread
gitar-bot[bot] marked this conversation as resolved.
except ClientError as err:
logger.debug(traceback.format_exc())
logger.error(f"Couldn't get value for secret [{secret_id}]: {err}")
Expand Down
Loading
Loading