1111"""
1212Trino lineage module
1313"""
14+
1415import traceback
15- from typing import Iterable , Iterator , List
16+ from typing import Dict , Iterable , Iterator , List , Optional
1617
1718from sqlalchemy import text
1819
3031 TRINO_QUERY_BATCH_SIZE ,
3132 TrinoQueryParserSource ,
3233)
34+ from metadata .utils import fqn
3335from metadata .utils .logger import ingestion_logger
3436
3537logger = ingestion_logger ()
@@ -112,9 +114,122 @@ def check_same_table(self, table1: Table, table2: Table) -> bool:
112114 """
113115 Method to check whether the table1 and table2 are same
114116 """
115- return table1 .name .root == table2 .name .root and {
116- column .name .root for column in table1 .columns
117- } == {column .name .root for column in table2 .columns }
117+ if table1 .name .root .lower () != table2 .name .root .lower ():
118+ return False
119+
120+ if not table1 .columns and not table2 .columns :
121+ return True
122+
123+ if not table1 .columns or not table2 .columns :
124+ return False
125+ return {column .name .root .lower () for column in table1 .columns } == {
126+ column .name .root .lower () for column in table2 .columns
127+ }
128+
129+ def _get_cross_database_schema_fqn (
130+ self ,
131+ cross_database_fqn : str ,
132+ trino_table : Table ,
133+ cross_database_schema_mapping : Dict [str , Dict [str , str ]],
134+ ) -> Optional [str ]:
135+ trino_schema_name = None
136+ if trino_table .databaseSchema and trino_table .databaseSchema .name :
137+ trino_schema_name = trino_table .databaseSchema .name .root
138+
139+ if (
140+ not trino_schema_name
141+ and trino_table .fullyQualifiedName
142+ and trino_table .fullyQualifiedName .root
143+ ):
144+ trino_table_fqn_parts = fqn .split (trino_table .fullyQualifiedName .root )
145+ if len (trino_table_fqn_parts ) >= 4 :
146+ trino_schema_name = trino_table_fqn_parts [- 2 ]
147+
148+ if not trino_schema_name :
149+ return None
150+
151+ if cross_database_fqn not in cross_database_schema_mapping :
152+ cross_database_schema_mapping [cross_database_fqn ] = {}
153+
154+ cross_database_schema_fqn = cross_database_schema_mapping [
155+ cross_database_fqn
156+ ].get (trino_schema_name .lower ())
157+ if cross_database_schema_fqn :
158+ return cross_database_schema_fqn
159+
160+ cross_database_fqn_parts = fqn .split (cross_database_fqn )
161+ if len (cross_database_fqn_parts ) == 2 :
162+ cross_database_service_name , cross_database_name = cross_database_fqn_parts
163+ cross_database_schemas = fqn .search_database_schema_from_es (
164+ metadata = self .metadata ,
165+ database_name = cross_database_name ,
166+ schema_name = trino_schema_name ,
167+ service_name = cross_database_service_name ,
168+ fetch_multiple_entities = True ,
169+ fields = "fullyQualifiedName,name" ,
170+ )
171+ if cross_database_schemas :
172+ for cross_database_schema in cross_database_schemas :
173+ if (
174+ cross_database_schema .name
175+ and cross_database_schema .fullyQualifiedName
176+ ):
177+ cross_database_schema_mapping [cross_database_fqn ][
178+ cross_database_schema .name .root .lower ()
179+ ] = cross_database_schema .fullyQualifiedName .root
180+
181+ return (
182+ cross_database_schema_mapping [cross_database_fqn ].get (
183+ trino_schema_name .lower ()
184+ )
185+ or f"{ cross_database_fqn } .{ fqn .quote_name (trino_schema_name )} "
186+ )
187+
188+ def _get_case_insensitive_cross_database_table (
189+ self ,
190+ cross_database_schema_fqn : str ,
191+ trino_table : Table ,
192+ cross_database_table_schema_mapping : Dict [str , Dict [str , List [Table ]]],
193+ ) -> Optional [Table ]:
194+ if cross_database_schema_fqn not in cross_database_table_schema_mapping :
195+ cross_database_table_schema_mapping [cross_database_schema_fqn ] = {}
196+
197+ table_key = trino_table .name .root .lower ()
198+ if (
199+ table_key
200+ not in cross_database_table_schema_mapping [cross_database_schema_fqn ]
201+ ):
202+ cross_database_table_schema_mapping [cross_database_schema_fqn ][
203+ table_key
204+ ] = []
205+ cross_database_schema_fqn_parts = fqn .split (cross_database_schema_fqn )
206+ if len (cross_database_schema_fqn_parts ) == 3 :
207+ (
208+ cross_database_service_name ,
209+ cross_database_name ,
210+ cross_database_schema_name ,
211+ ) = cross_database_schema_fqn_parts
212+ cross_database_tables = fqn .search_table_from_es (
213+ metadata = self .metadata ,
214+ database_name = cross_database_name ,
215+ schema_name = cross_database_schema_name ,
216+ service_name = cross_database_service_name ,
217+ table_name = table_key ,
218+ fetch_multiple_entities = True ,
219+ fields = "fullyQualifiedName,name,columns,databaseSchema" ,
220+ )
221+ if cross_database_tables :
222+ cross_database_table_schema_mapping [cross_database_schema_fqn ][
223+ table_key
224+ ] = cross_database_tables
225+
226+ for cross_database_table in cross_database_table_schema_mapping [
227+ cross_database_schema_fqn
228+ ].get (table_key , []):
229+ if self .check_same_table (trino_table , cross_database_table ):
230+ return cross_database_table
231+
232+ return None
118233
119234 def get_cross_database_lineage (
120235 self , from_table : Table , to_table : Table
@@ -131,10 +246,64 @@ def get_cross_database_lineage(
131246 from_entity = from_table , to_entity = to_table , column_lineage = column_lineage
132247 )
133248
249+ def _get_cross_database_lineage_for_table (
250+ self ,
251+ trino_database_fqn : str ,
252+ trino_table : Table ,
253+ * ,
254+ all_cross_database_fqns : List [str ],
255+ cross_database_table_fqn_mapping : Dict [str , Optional [Table ]],
256+ cross_database_schema_fqn_mapping : Dict [str , Dict [str , str ]],
257+ cross_database_table_schema_mapping : Dict [str , Dict [str , List [Table ]]],
258+ ) -> Optional [Either [AddLineageRequest ]]:
259+ trino_table_fqn = trino_table .fullyQualifiedName .root
260+ trino_database_prefix = f"{ trino_database_fqn } ."
261+ if not trino_table_fqn .startswith (trino_database_prefix ):
262+ return None
263+
264+ trino_table_suffix = trino_table_fqn [len (trino_database_fqn ) :]
265+ for cross_database_fqn in all_cross_database_fqns :
266+ cross_database_table_fqn = f"{ cross_database_fqn } { trino_table_suffix } "
267+ if cross_database_table_fqn not in cross_database_table_fqn_mapping :
268+ cross_database_table = self .metadata .get_by_name (
269+ Table , fqn = cross_database_table_fqn
270+ )
271+ if not cross_database_table :
272+ cross_database_schema_fqn = self ._get_cross_database_schema_fqn (
273+ cross_database_fqn ,
274+ trino_table ,
275+ cross_database_schema_fqn_mapping ,
276+ )
277+ if cross_database_schema_fqn :
278+ cross_database_table = (
279+ self ._get_case_insensitive_cross_database_table (
280+ cross_database_schema_fqn ,
281+ trino_table ,
282+ cross_database_table_schema_mapping ,
283+ )
284+ )
285+ cross_database_table_fqn_mapping [
286+ cross_database_table_fqn
287+ ] = cross_database_table
288+
289+ cross_database_table = cross_database_table_fqn_mapping [
290+ cross_database_table_fqn
291+ ]
292+ if cross_database_table and self .check_same_table (
293+ trino_table , cross_database_table
294+ ):
295+ return self .get_cross_database_lineage (
296+ cross_database_table , trino_table
297+ )
298+
299+ return None
300+
134301 def yield_cross_database_lineage (self ) -> Iterable [Either [AddLineageRequest ]]:
135302 try :
136303 all_cross_database_fqns = self .get_cross_database_fqn_from_service_names ()
137304 cross_database_table_fqn_mapping = {}
305+ cross_database_schema_fqn_mapping : Dict [str , Dict [str , str ]] = {}
306+ cross_database_table_schema_mapping : Dict [str , Dict [str , List [Table ]]] = {}
138307
139308 # Get all databases for the specified Trino service
140309 trino_databases = self .metadata .list_all_entities (
@@ -149,29 +318,16 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
149318 )
150319 # NOTE: Currently, tables in system-defined schemas will also be checked for lineage.
151320 for trino_table in trino_tables :
152- trino_table_fqn = trino_table .fullyQualifiedName .root
153- for cross_database_fqn in all_cross_database_fqns :
154- # Construct the FQN for cross-database tables
155- cross_database_table_fqn = trino_table_fqn .replace (
156- trino_database_fqn , cross_database_fqn
157- )
158- # Cache cross-database table against its FQN to avoid repeated API calls
159- cross_database_table = cross_database_table_fqn_mapping [
160- cross_database_table_fqn
161- ] = cross_database_table_fqn_mapping .get (
162- cross_database_table_fqn ,
163- self .metadata .get_by_name (
164- Table , fqn = cross_database_table_fqn
165- ),
166- )
167- # Create cross database lineage request if both tables are same
168- if cross_database_table and self .check_same_table (
169- trino_table , cross_database_table
170- ):
171- yield self .get_cross_database_lineage (
172- cross_database_table , trino_table
173- )
174- break
321+ cross_database_lineage = self ._get_cross_database_lineage_for_table (
322+ trino_database_fqn = trino_database_fqn ,
323+ trino_table = trino_table ,
324+ all_cross_database_fqns = all_cross_database_fqns ,
325+ cross_database_table_fqn_mapping = cross_database_table_fqn_mapping ,
326+ cross_database_schema_fqn_mapping = cross_database_schema_fqn_mapping ,
327+ cross_database_table_schema_mapping = cross_database_table_schema_mapping ,
328+ )
329+ if cross_database_lineage :
330+ yield cross_database_lineage
175331 except Exception as exc :
176332 yield Either (
177333 left = StackTraceError (
0 commit comments