Skip to content

Commit c1348cb

Browse files
Merge branch 'main' into feat/issue-12787-hive-metastore-mssql-oracle
2 parents 7650733 + 72b1418 commit c1348cb

88 files changed

Lines changed: 4744 additions & 1707 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,4 @@ ingestion/.claude/agents
192192
# AI scaffold working documents — stay local, never committed
193193
**/CONNECTOR_CONTEXT.md
194194

195+
test-results/

ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,20 @@ def get_sample_data(self, table: Table) -> Optional[Table]:
260260

261261
return None
262262

263+
def delete_sample_data(self, table: Table) -> None:
264+
"""
265+
DELETE call for the /sampleData endpoint for a given Table
266+
"""
267+
try:
268+
self.client.delete(
269+
f"{self.get_suffix(Table)}/{table.id.root}/sampleData",
270+
)
271+
except Exception as exc:
272+
logger.debug(traceback.format_exc())
273+
logger.warning(
274+
f"Error trying to DELETE sample data for {table.fullyQualifiedName.root}: {exc}"
275+
)
276+
263277
def add_pipeline_observability(
264278
self, table_id: Uuid, pipeline_observability: List[PipelineObservability]
265279
) -> Optional[Table]:

ingestion/src/metadata/ingestion/source/api/rest/connection.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
validate_openapi_schema,
4646
)
4747
from metadata.utils.constants import THREE_MIN
48+
from metadata.utils.ssl_registry import get_verify_ssl_fn
4849

4950

5051
class SchemaURLError(Exception):
@@ -67,10 +68,16 @@ def get_connection(connection: RestConnection) -> Union[Response, Dict]:
6768
"""
6869
schema_conn = connection.openAPISchemaConnection
6970
if isinstance(schema_conn, OpenAPISchemaURL):
71+
verify_ssl_fn = get_verify_ssl_fn(connection.verifySSL)
72+
verify = verify_ssl_fn(connection.sslConfig)
73+
if verify is None:
74+
verify = True
75+
headers = {}
7076
if connection.token:
71-
headers = {"Authorization": f"Bearer {connection.token.get_secret_value()}"}
72-
return requests.get(schema_conn.openAPISchemaURL, headers=headers)
73-
return requests.get(schema_conn.openAPISchemaURL)
77+
headers["Authorization"] = f"Bearer {connection.token.get_secret_value()}"
78+
return requests.get(
79+
schema_conn.openAPISchemaURL, headers=headers, verify=verify
80+
)
7481

7582
if isinstance(schema_conn, OpenAPISchemaFilePath):
7683
return parse_openapi_schema_from_file(schema_conn.openAPISchemaFilePath)

ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ def __init__(
8181
):
8282
super().__init__(config, metadata)
8383
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
84+
if (
85+
self.generate_sample_data
86+
and self._is_sample_data_storing_globally_disabled()
87+
):
88+
self.generate_sample_data = False
8489
self.service_connection = self.config.serviceConnection.root.config
8590
self.admin_client = self.connection.admin_client
8691
self.schema_registry_client = self.connection.schema_registry_client

ingestion/src/metadata/ingestion/source/messaging/kinesis/metadata.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ def __init__(
7373
):
7474
super().__init__(config, metadata)
7575
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
76+
if (
77+
self.generate_sample_data
78+
and self._is_sample_data_storing_globally_disabled()
79+
):
80+
self.generate_sample_data = False
7681
self.kinesis = self.connection
7782

7883
@classmethod

ingestion/src/metadata/ingestion/source/messaging/messaging_service.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
"""
1414

1515
from abc import ABC, abstractmethod
16-
from typing import Any, Iterable, List, Optional, Set
16+
from typing import Any, Iterable, List, Optional, Set, cast
1717

1818
from pydantic import BaseModel, Field
1919
from typing_extensions import Annotated
2020

2121
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
2222
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
23+
from metadata.generated.schema.configuration.profilerConfiguration import (
24+
ProfilerConfiguration,
25+
SampleDataIngestionConfig,
26+
)
2327
from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData
2428
from metadata.generated.schema.entity.services.messagingService import (
2529
MessagingConnection,
@@ -153,6 +157,32 @@ def __init__(
153157
def name(self) -> str:
154158
return self.service_connection.type.name
155159

160+
def _is_sample_data_storing_globally_disabled(self) -> bool:
161+
"""Check if storing sample data is globally disabled via profiler configuration.
162+
163+
Returns True if storing of sample data is disabled in the global
164+
profiler config, meaning sample data should not be fetched for
165+
messaging sources.
166+
"""
167+
try:
168+
settings = self.metadata.get_profiler_config_settings()
169+
if not settings or not settings.config_value:
170+
return False
171+
profiler_config = cast(ProfilerConfiguration, settings.config_value)
172+
sample_data_config = profiler_config.sampleDataConfig
173+
if sample_data_config is None:
174+
return False
175+
sample_data_config = cast(SampleDataIngestionConfig, sample_data_config)
176+
if not sample_data_config.storeSampleData:
177+
logger.info(
178+
"Global profiler configuration disables storing "
179+
"of sample data. Overriding source configuration."
180+
)
181+
return True
182+
except Exception as exc:
183+
logger.debug(f"Could not fetch global profiler config: {exc}")
184+
return False
185+
156186
@abstractmethod
157187
def yield_topic(self, topic_details: Any) -> Iterable[Either[CreateTopicRequest]]:
158188
"""

ingestion/src/metadata/ingestion/source/messaging/pubsub/metadata.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ def __init__(
7575
):
7676
super().__init__(config, metadata)
7777
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
78+
if (
79+
self.generate_sample_data
80+
and self._is_sample_data_storing_globally_disabled()
81+
):
82+
self.generate_sample_data = False
7883
self.pubsub = self.connection
7984
self.project_id = self.pubsub.project_id
8085

ingestion/src/metadata/ingestion/source/pipeline/airflow/api/source.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
SourceUrl,
4343
Timestamp,
4444
)
45+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
4546
from metadata.ingestion.api.models import Either
4647
from metadata.ingestion.api.steps import InvalidSourceException
4748
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@@ -126,6 +127,23 @@ def _get_dag_source_url(self, dag_id: str) -> str:
126127
return f"{host}/dags/{quote(dag_id)}"
127128
return f"{host}/dags/{quote(dag_id)}/grid"
128129

130+
def get_owners(self, owners: Optional[List[str]]) -> Optional[EntityReferenceList]:
131+
if not self.source_config.includeOwners or not owners:
132+
return None
133+
refs = EntityReferenceList(root=[])
134+
for owner_name in owners:
135+
try:
136+
ref = self.metadata.get_reference_by_name(
137+
name=owner_name, is_owner=True
138+
)
139+
if ref:
140+
refs.root.extend(ref.root)
141+
except Exception as exc:
142+
logger.warning(
143+
f"Error while getting details of user {owner_name} - {exc}"
144+
)
145+
return refs if refs.root else None
146+
129147
def _build_tasks(self, dag_details: AirflowApiDagDetails) -> List[Task]:
130148
return [
131149
Task(
@@ -164,6 +182,7 @@ def yield_pipeline(
164182
),
165183
tasks=self._build_tasks(pipeline_details),
166184
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
185+
owners=self.get_owners(pipeline_details.owners),
167186
scheduleInterval=pipeline_details.schedule_interval,
168187
tags=get_tag_labels(
169188
metadata=self.metadata,

ingestion/src/metadata/sampler/processor.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from copy import deepcopy
1616
from typing import Optional, Type, cast
1717

18+
from metadata.generated.schema.configuration.profilerConfiguration import (
19+
ProfilerConfiguration,
20+
)
1821
from metadata.generated.schema.entity.data.database import Database
1922
from metadata.generated.schema.entity.data.table import Table
2023
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
@@ -135,9 +138,28 @@ def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]:
135138
default_sample_config=SampleConfig(),
136139
default_sample_data_count=self.source_config.sampleDataCount,
137140
)
141+
142+
settings = self.metadata.get_profiler_config_settings()
143+
profiler_global_config = (
144+
cast(ProfilerConfiguration, settings.config_value) if settings else None
145+
)
146+
147+
sample_data_config = (
148+
profiler_global_config.sampleDataConfig
149+
if profiler_global_config
150+
else None
151+
)
152+
138153
sample_data = SampleData(
139-
data=sampler_interface.generate_sample_data(),
140-
store=self.source_config.storeSampleData,
154+
data=sampler_interface.generate_sample_data(
155+
sample_data_config if sample_data_config else None
156+
),
157+
store=bool(
158+
self.source_config.storeSampleData
159+
and (
160+
sample_data_config is None or sample_data_config.storeSampleData
161+
)
162+
),
141163
)
142164
sampler_interface.close()
143165
return Either(

ingestion/src/metadata/sampler/sampler_interface.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from abc import ABC, abstractmethod
1616
from typing import Any, List, Optional, Set, Union
1717

18+
from metadata.generated.schema.configuration.profilerConfiguration import (
19+
SampleDataIngestionConfig,
20+
)
1821
from metadata.generated.schema.entity.data.database import Database
1922
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
2023
from metadata.generated.schema.entity.data.table import (
@@ -240,33 +243,57 @@ def _truncate_cell(value: Any) -> Any:
240243
return value
241244

242245
@calculate_execution_time(store=False)
243-
def generate_sample_data(self) -> Optional[TableData]:
246+
def generate_sample_data(
247+
self, sample_data_config: Optional[SampleDataIngestionConfig] = None
248+
) -> TableData:
244249
"""Fetch and ingest sample data
245250
246251
Returns:
247252
TableData: sample data
248253
"""
249-
try:
250-
logger.debug(
251-
f"Fetching sample data for {self.entity.fullyQualifiedName.root}..."
254+
if sample_data_config is None:
255+
# if there is no global config, default to storing and reading sample data to ensure backward compatibility
256+
# and availability of sample data for downstream steps
257+
sample_data_config = SampleDataIngestionConfig(
258+
storeSampleData=True, readSampleData=True
252259
)
253-
table_data = self.fetch_sample_data(self.columns)
254-
# Truncate large cell values to prevent OOM in downstream
255-
# processing (NLP, serialization, etc.)
256-
table_data.rows = [
257-
[self._truncate_cell(cell) for cell in row]
258-
for row in table_data.rows[
259-
: min(SAMPLE_DATA_DEFAULT_COUNT, self.sample_limit)
260-
]
261-
]
262-
# Only store the data if configured to do so
263-
if self.storage_config:
264-
upload_sample_data(
265-
data=table_data,
266-
entity=self.entity,
267-
sample_storage_config=self.storage_config,
260+
261+
if (
262+
not sample_data_config.storeSampleData
263+
and not sample_data_config.readSampleData
264+
):
265+
logger.info(
266+
"Both storing and reading of sample data are disabled. Skipping sample data generation."
267+
)
268+
return TableData(rows=[], columns=[])
269+
try:
270+
271+
# Stores overwrites reading since if we are storing the data, we want to fetch it
272+
# as well to pass down the pipeline. If we are not storing, but reading is enabled,
273+
# we still want to fetch the data to pass down the pipeline, but we won't store it.
274+
if sample_data_config.readSampleData or sample_data_config.storeSampleData:
275+
logger.debug(
276+
f"Fetching sample data for {self.entity.fullyQualifiedName.root}..."
268277
)
269-
return table_data
278+
table_data = self.fetch_sample_data(self.columns)
279+
# Truncate large cell values to prevent OOM in downstream
280+
# processing (NLP, serialization, etc.)
281+
table_data.rows = [
282+
[self._truncate_cell(cell) for cell in row]
283+
for row in table_data.rows[
284+
: min(SAMPLE_DATA_DEFAULT_COUNT, self.sample_limit)
285+
]
286+
]
287+
# Only store the data if configured to do so
288+
if self.storage_config and sample_data_config.storeSampleData:
289+
upload_sample_data(
290+
data=table_data,
291+
entity=self.entity,
292+
sample_storage_config=self.storage_config,
293+
)
294+
return table_data
295+
296+
return TableData(rows=[], columns=[])
270297

271298
except Exception as err:
272299
logger.debug(traceback.format_exc())

0 commit comments

Comments
 (0)