Skip to content

Commit fafe3cf

Browse files
authored
Fixes #27538: [OpenLineage] Add AWS Glue, Kusto, and Cosmos DB dataset naming support (#27533)
* feat(openlineage): add AWS Glue, Kusto, and Cosmos DB dataset naming support * Add better logging for exceptions * Add cosmos defensive check for `colls/` prefix in names * Correct spell mistakes and wrap long line * fix: guard None source attributes in _sort_array_entity_fields Optional array fields (tasks, columns, fields) on existing OMD entities can be None when the API omits empty arrays in responses. Iterating over None crashed with TypeError. Mirrors the existing `or []` guard already applied to destination_attributes on the line below. * Add test for schema not found case retuning none instead of rasing exception
1 parent e1f3013 commit fafe3cf

3 files changed

Lines changed: 262 additions & 24 deletions

File tree

ingestion/src/metadata/ingestion/models/patch_request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ def _sort_array_entity_fields(
623623
source_attributes = getattr(source, field)
624624

625625
source_dict = {
626-
_get_attribute_name(attr): attr for attr in source_attributes
626+
_get_attribute_name(attr): attr for attr in (source_attributes or [])
627627
}
628628

629629
updated_attributes = []

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
OpenLineage source to extract metadata from Kafka or Kinesis events
1414
"""
1515
import json
16+
import re
1617
import time
1718
import traceback
1819
from collections import defaultdict
@@ -100,7 +101,8 @@ class OpenlineageSource(PipelineServiceSource):
100101
Works under the assumption that OpenLineage integrations produce events to Kafka topic or Kinesis stream,
101102
which is a source of events for this connector.
102103
103-
Only OpenLineage events that indicate successfull data movement (COMPLETE, RUNNING, START) are taken into account in this connector.
104+
Only OpenLineage events that indicate successful data movement (COMPLETE, RUNNING, START) are taken into account
105+
in this connector.
104106
105107
Configuring OpenLineage integrations: https://openlineage.io/docs/integrations/about
106108
"""
@@ -184,6 +186,27 @@ def _get_table_details(cls, data: Dict) -> TableDetails:
184186
"input table name cannot be retrieved from name attribute."
185187
)
186188

189+
namespace = data.get("namespace", "")
190+
191+
# AWS Glue: arn:aws:glue:{region}:{account} / table/{database}/{table}
192+
# Source: https://openlineage.io/docs/spec/naming/
193+
if namespace.startswith("arn:aws:glue:"):
194+
result = OpenlineageSource._parse_glue_table_name(name)
195+
if result:
196+
return result
197+
198+
# Azure Data Explorer (Kusto): azurekusto://{host} / {database}/{table}
199+
if namespace.startswith("azurekusto://"):
200+
result = OpenlineageSource._parse_slash_table_name(name)
201+
if result:
202+
return result
203+
204+
# Azure Cosmos DB: azurecosmos://{host}/dbs/{db} / colls/{collection}
205+
if namespace.startswith("azurecosmos://"):
206+
result = OpenlineageSource._parse_cosmos_table_name(namespace, name)
207+
if result:
208+
return result
209+
187210
name_parts = name.split(".")
188211

189212
if len(name_parts) < 2:
@@ -229,6 +252,59 @@ def _get_topic_details(data: Dict) -> TopicDetails:
229252

230253
return TopicDetails(name=name, broker_hostname=broker_hostname)
231254

255+
@staticmethod
256+
def _parse_glue_table_name(name: str) -> Optional[TableDetails]:
257+
"""
258+
Parse AWS Glue OL dataset name: ``table/{database}/{table}``.
259+
260+
Glue EMR jobs emit a slash-separated name with a ``table/`` prefix instead
261+
of the dot-separated ``schema.table`` convention used by SQL engines.
262+
263+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
264+
src/main/java/io/openlineage/client/dataset/Naming.java (GlueNaming)
265+
"""
266+
if not name.startswith("table/"):
267+
return None
268+
parts = name[len("table/") :].split("/")
269+
if len(parts) < 2:
270+
return None
271+
return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower())
272+
273+
@staticmethod
274+
def _parse_slash_table_name(name: str) -> Optional[TableDetails]:
275+
"""
276+
Parse slash-separated ``{database}/{table}`` OL dataset names.
277+
278+
Used by Azure Data Explorer (Kusto):
279+
namespace ``azurekusto://{host}`` / name ``{database}/{table}``
280+
281+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
282+
src/main/java/io/openlineage/client/dataset/Naming.java (KustoNaming)
283+
"""
284+
parts = name.split("/")
285+
if len(parts) < 2:
286+
return None
287+
return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower())
288+
289+
@staticmethod
290+
def _parse_cosmos_table_name(namespace: str, name: str) -> Optional[TableDetails]:
291+
"""
292+
Parse Azure Cosmos DB OL dataset names.
293+
294+
The database lives in the namespace path (``azurecosmos://{host}/dbs/{db}``)
295+
while the name field is ``colls/{collection}``.
296+
297+
Source: https://github.com/OpenLineage/OpenLineage/blob/main/client/java/
298+
src/main/java/io/openlineage/client/dataset/Naming.java (CosmosNaming)
299+
"""
300+
db_match = re.search(r"/dbs/([^/]+)", namespace)
301+
coll_match = re.fullmatch(r"colls/([^/]+)", name)
302+
if not db_match or not coll_match:
303+
return None
304+
return TableDetails(
305+
name=coll_match.group(1).lower(), schema=db_match.group(1).lower()
306+
)
307+
232308
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
233309
"""Wrapper around metadata.get_by_name with in-memory caching."""
234310
if not hasattr(self, "_entity_cache"):
@@ -324,6 +400,11 @@ def _get_table_fqn(
324400
)
325401
return f"{schema_fqn}.{table_details.name}"
326402
except FQNNotFoundException:
403+
logger.debug(
404+
f"Table '{table_details.name}' in schema '{table_details.schema}' "
405+
f"not found in services {resolved_services or self.get_db_service_names()}. "
406+
"Skipping lineage edge."
407+
)
327408
return None
328409
except Exception:
329410
logger.warning(
@@ -466,7 +547,7 @@ def _get_schema_fqn_from_om(
466547

467548
if not result:
468549
raise FQNNotFoundException(
469-
f"Schema FQN not found within services: {services}"
550+
f"Schema '{schema}' not found in services: {services}"
470551
)
471552

472553
return result
@@ -566,14 +647,13 @@ def get_create_table_request(self, table: Dict) -> Optional[Either]:
566647
if not om_table_fqn:
567648
try:
568649
om_schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
569-
except FQNNotFoundException as e:
570-
return Either(
571-
left=StackTraceError(
572-
name="",
573-
error=f"Failed to get fully qualified schema name: {e}",
574-
stackTrace=traceback.format_exc(),
575-
)
650+
except FQNNotFoundException:
651+
logger.warning(
652+
f"Schema '{table_details.schema}' not found in configured services "
653+
f"{self.get_db_service_names()}. Skipping table creation for "
654+
f"'{table_details.name}'."
576655
)
656+
return None
577657

578658
# After finding schema fqn (based on partial schema name) we know where we can create table
579659
# and we move forward with creating request.
@@ -1050,10 +1130,13 @@ def _poll_kafka(self, broker: KafkaBrokerConfig) -> Iterable[OpenLineageEvent]:
10501130
if result:
10511131
yield result
10521132
except Exception as e:
1053-
logger.debug(e)
1133+
logger.warning(
1134+
f"Failed to parse OpenLineage event from Kafka message: {e}"
1135+
)
1136+
logger.debug(traceback.format_exc())
10541137

10551138
except Exception as e:
1056-
traceback.print_exc()
1139+
logger.debug(traceback.format_exc())
10571140
raise InvalidSourceException(f"Failed to read from Kafka: {str(e)}")
10581141

10591142
finally:
@@ -1113,12 +1196,15 @@ def _poll_kinesis(self, broker: KinesisBrokerConfig) -> Iterable[OpenLineageEven
11131196
if result:
11141197
yield result
11151198
except Exception as e:
1116-
logger.debug(e)
1199+
logger.warning(
1200+
f"Failed to parse OpenLineage event from Kinesis record: {e}"
1201+
)
1202+
logger.debug(traceback.format_exc())
11171203

11181204
time.sleep(pool_timeout)
11191205

11201206
except Exception as e:
1121-
traceback.print_exc()
1207+
logger.debug(traceback.format_exc())
11221208
raise InvalidSourceException(f"Failed to read from Kinesis: {str(e)}")
11231209

11241210
def get_pipeline_name(self, pipeline_details: OpenLineageEvent) -> str:

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 162 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,28 @@ def test_get_create_table_request(self, mock_get_schema_fqn, mock_get_table_fqn)
799799
create_request.columns[i].dataTypeDisplay, expected_type_display
800800
)
801801

802+
@patch(
803+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om"
804+
)
805+
@patch(
806+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_schema_fqn_from_om"
807+
)
808+
def test_get_create_table_request_schema_not_found_returns_none(
809+
self, mock_get_schema_fqn, mock_get_table_fqn
810+
):
811+
"""Schema not found in any configured service — returns None without raising."""
812+
mock_get_table_fqn.side_effect = FQNNotFoundException("Table not found")
813+
mock_get_schema_fqn.side_effect = FQNNotFoundException("Schema not found")
814+
table_data = {
815+
"name": "unknown_schema.employees",
816+
"namespace": "bigquery",
817+
"facets": {},
818+
}
819+
820+
result = self.open_lineage_source.get_create_table_request(table_data)
821+
822+
assert result is None
823+
802824
@patch("confluent_kafka.Consumer")
803825
def test_get_pipelines_list_filters_complete_events(self, mock_consumer_class):
804826
"""Test that get_pipelines_list returns COMPLETE events"""
@@ -1689,16 +1711,18 @@ def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self):
16891711
mock_pipeline = Mock()
16901712
mock_pipeline.id.root = pipeline_id
16911713

1692-
with patch.object(
1693-
self.open_lineage_source, "metadata"
1694-
) as mock_metadata, patch.object(
1695-
self.open_lineage_source,
1696-
"_get_table_fqn",
1697-
return_value="db-service.public.some_table",
1698-
), patch.object(
1699-
self.open_lineage_source,
1700-
"get_create_table_request",
1701-
return_value=None,
1714+
with (
1715+
patch.object(self.open_lineage_source, "metadata") as mock_metadata,
1716+
patch.object(
1717+
self.open_lineage_source,
1718+
"_get_table_fqn",
1719+
return_value="db-service.public.some_table",
1720+
),
1721+
patch.object(
1722+
self.open_lineage_source,
1723+
"get_create_table_request",
1724+
return_value=None,
1725+
),
17021726
):
17031727
# Empty messaging services list — no broker match for unknown-broker
17041728
mock_metadata.list_all_entities.return_value = iter([])
@@ -2093,6 +2117,134 @@ def test_cleanup_handles_downstream_edges_scoped_to_event(self):
20932117
self.assertEqual(str(deleted_edge.toEntity.id.root), table_b_id)
20942118
self.assertEqual(deleted_edge.toEntity.type, "table")
20952119

2120+
def test_parse_glue_table_name_trino_glue_catalog_schema(self):
2121+
"""Trino backed by AWS Glue Data Catalog uses the public schema and underscore-separated table names.
2122+
Verifies the parser handles the common Glue catalog table naming pattern correctly.
2123+
"""
2124+
result = OpenlineageSource._parse_glue_table_name(
2125+
"table/public/order_line_items"
2126+
)
2127+
self.assertEqual(result.name, "order_line_items")
2128+
self.assertEqual(result.schema, "public")
2129+
2130+
def test_parse_glue_table_name_happy_path(self):
2131+
"""Glue OL naming: table/{database}/{table} — source: Naming.java GlueNaming."""
2132+
result = OpenlineageSource._parse_glue_table_name("table/sales/users")
2133+
self.assertEqual(result.name, "users")
2134+
self.assertEqual(result.schema, "sales")
2135+
2136+
def test_parse_glue_table_name_normalizes_to_lowercase(self):
2137+
"""Glue table and database names are normalized to lowercase for FQN matching."""
2138+
result = OpenlineageSource._parse_glue_table_name("table/Sales/Users")
2139+
self.assertEqual(result.name, "users")
2140+
self.assertEqual(result.schema, "sales")
2141+
2142+
def test_parse_glue_table_name_not_glue_format_returns_none(self):
2143+
"""Names without the table/ prefix are not Glue format and return None."""
2144+
self.assertIsNone(OpenlineageSource._parse_glue_table_name("sales.users"))
2145+
2146+
def test_parse_glue_table_name_missing_table_part_returns_none(self):
2147+
"""table/ prefix with only one path segment is malformed and returns None."""
2148+
self.assertIsNone(OpenlineageSource._parse_glue_table_name("table/only_db"))
2149+
2150+
def test_parse_slash_table_name_happy_path(self):
2151+
"""Kusto OL naming: {database}/{table} — source: Naming.java KustoNaming."""
2152+
result = OpenlineageSource._parse_slash_table_name("mydb/mytable")
2153+
self.assertEqual(result.name, "mytable")
2154+
self.assertEqual(result.schema, "mydb")
2155+
2156+
def test_parse_slash_table_name_normalizes_to_lowercase(self):
2157+
"""Kusto table and database names are normalized to lowercase for FQN matching."""
2158+
result = OpenlineageSource._parse_slash_table_name("MyDB/MyTable")
2159+
self.assertEqual(result.name, "mytable")
2160+
self.assertEqual(result.schema, "mydb")
2161+
2162+
def test_parse_slash_table_name_single_part_returns_none(self):
2163+
"""A single path segment without a slash cannot be split into db/table and returns None."""
2164+
self.assertIsNone(OpenlineageSource._parse_slash_table_name("only_table"))
2165+
2166+
def test_parse_cosmos_table_name_happy_path(self):
2167+
"""Cosmos OL naming: db from namespace /dbs/{db}, name colls/{coll} — source: Naming.java CosmosNaming."""
2168+
result = OpenlineageSource._parse_cosmos_table_name(
2169+
"azurecosmos://myaccount.documents.azure.com/dbs/mydb",
2170+
"colls/mycollection",
2171+
)
2172+
self.assertEqual(result.name, "mycollection")
2173+
self.assertEqual(result.schema, "mydb")
2174+
2175+
def test_parse_cosmos_table_name_normalizes_to_lowercase(self):
2176+
"""Cosmos database and collection names are normalized to lowercase for FQN matching."""
2177+
result = OpenlineageSource._parse_cosmos_table_name(
2178+
"azurecosmos://host/dbs/MyDB", "colls/MyCollection"
2179+
)
2180+
self.assertEqual(result.name, "mycollection")
2181+
self.assertEqual(result.schema, "mydb")
2182+
2183+
def test_parse_cosmos_table_name_no_dbs_segment_returns_none(self):
2184+
"""A Cosmos namespace without /dbs/{db} cannot provide the database name and returns None."""
2185+
self.assertIsNone(
2186+
OpenlineageSource._parse_cosmos_table_name(
2187+
"azurecosmos://host", "colls/mycoll"
2188+
)
2189+
)
2190+
2191+
def test_parse_cosmos_table_name_non_colls_name_returns_none(self):
2192+
"""A Cosmos name not matching colls/{collection} is non-conformant and returns None."""
2193+
self.assertIsNone(
2194+
OpenlineageSource._parse_cosmos_table_name(
2195+
"azurecosmos://host/dbs/mydb", "mycollection"
2196+
)
2197+
)
2198+
2199+
def test_get_table_details_glue_namespace_parses_slash_name(self):
2200+
"""AWS Glue EMR events use arn:aws:glue namespace + table/{db}/{table} name."""
2201+
data = {
2202+
"namespace": "arn:aws:glue:us-east-1:123456789012",
2203+
"name": "table/sales/users",
2204+
}
2205+
result = OpenlineageSource._get_table_details(data)
2206+
self.assertEqual(result.name, "users")
2207+
self.assertEqual(result.schema, "sales")
2208+
2209+
def test_get_table_details_kusto_namespace_parses_slash_name(self):
2210+
"""Azure Kusto events use azurekusto namespace + {db}/{table} name."""
2211+
data = {
2212+
"namespace": "azurekusto://mycluster.kusto.windows.net",
2213+
"name": "mydb/mytable",
2214+
}
2215+
result = OpenlineageSource._get_table_details(data)
2216+
self.assertEqual(result.name, "mytable")
2217+
self.assertEqual(result.schema, "mydb")
2218+
2219+
def test_get_table_details_cosmos_namespace_parses_colls_name(self):
2220+
"""Azure Cosmos DB events carry the database in the namespace path."""
2221+
data = {
2222+
"namespace": "azurecosmos://host.documents.azure.com/dbs/mydb",
2223+
"name": "colls/orders",
2224+
}
2225+
result = OpenlineageSource._get_table_details(data)
2226+
self.assertEqual(result.name, "orders")
2227+
self.assertEqual(result.schema, "mydb")
2228+
2229+
def test_get_entity_details_glue_namespace_resolves_to_table(self):
2230+
"""Glue ARN namespace + table/{db}/{table} name resolves to a table entity."""
2231+
data = {
2232+
"namespace": "arn:aws:glue:us-east-1:123456789012",
2233+
"name": "table/sales/users",
2234+
"facets": {},
2235+
}
2236+
result = OpenlineageSource._get_entity_details(data)
2237+
self.assertIsNotNone(result)
2238+
self.assertEqual(result.entity_type, "table")
2239+
self.assertEqual(result.table_details.name, "users")
2240+
self.assertEqual(result.table_details.schema, "sales")
2241+
2242+
def test_get_entity_details_unparseable_name_raises_value_error(self):
2243+
"""Unrecognised name formats raise ValueError so callers can surface the error."""
2244+
data = {"namespace": "trino://host:8080", "name": "invalidname"}
2245+
with self.assertRaises(ValueError):
2246+
OpenlineageSource._get_entity_details(data)
2247+
20962248

20972249
if __name__ == "__main__":
20982250
unittest.main()

0 commit comments

Comments
 (0)