-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
556 lines (433 loc) · 19.5 KB
/
utils.py
File metadata and controls
556 lines (433 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
import re
from enum import Enum
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from datapilot.core.platforms.dbt.constants import BASE
from datapilot.core.platforms.dbt.constants import FOLDER
from datapilot.core.platforms.dbt.constants import INTERMEDIATE
from datapilot.core.platforms.dbt.constants import MART
from datapilot.core.platforms.dbt.constants import MODEL
from datapilot.core.platforms.dbt.constants import OTHER
from datapilot.core.platforms.dbt.constants import STAGING
from datapilot.core.platforms.dbt.exceptions import AltimateInvalidManifestError
from datapilot.core.platforms.dbt.factory import DBTFactory
from datapilot.core.platforms.dbt.schemas.catalog import Catalog
from datapilot.core.platforms.dbt.schemas.catalog import CatalogV1
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestExposureNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestSourceNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestTestNode
from datapilot.core.platforms.dbt.schemas.manifest import Manifest
from datapilot.core.platforms.dbt.schemas.run_results import RunResults
from datapilot.core.platforms.dbt.schemas.sources import Sources
from datapilot.exceptions.exceptions import AltimateFileNotFoundError
from datapilot.exceptions.exceptions import AltimateInvalidJSONError
from datapilot.utils.utils import extract_dir_name_from_file_path
from datapilot.utils.utils import extract_folders_in_path
from datapilot.utils.utils import is_superset_path
from datapilot.utils.utils import load_json
from vendor.dbt_artifacts_parser.parser import parse_manifest
from vendor.dbt_artifacts_parser.parser import parse_run_results
from vendor.dbt_artifacts_parser.parser import parse_sources
MODEL_TYPE_PATTERNS = {
STAGING: r"^stg_.*", # Example: models starting with 'stg_'
MART: r"^(mrt_|mart_|fct_|dim_).*", # Example: models starting with 'mrt_' or 'mart_'
INTERMEDIATE: r"^int_.*", # Example: models starting with 'int_'
BASE: r"^base_.*", # Example: models starting with 'base_'
# Add other model types with their regex patterns here
}
FOLDER_MAP = {
STAGING: STAGING,
MART: MART,
INTERMEDIATE: INTERMEDIATE,
BASE: BASE,
# Add other model types with their folder names here
}
class SelectOption(Enum):
DIRECTORY = "directory"
MODEL_NAME = "model_name"
MODEL_PATH = "model_path"
def combine_dict(dict1: Dict, dict2: Optional[Dict]) -> Dict:
dict2 = dict2 or {}
return {**dict1, **dict2}
def load_manifest(manifest_path: str) -> Manifest:
try:
manifest_dict = load_json(manifest_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Manifest file not found: {manifest_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid manifest file: {manifest_path}. Error: {e}") from e
except Exception as e:
raise AltimateInvalidManifestError(
f"Invalid manifest file: {manifest_path}. Error: {e}. Please ensure that you are providing the path to a manifest file"
) from e
try:
manifest: Manifest = parse_manifest(manifest_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid manifest file: {manifest_path}. Error: {e}") from e
return manifest
def load_catalog(catalog_path: str) -> Catalog:
try:
catalog_dict = load_json(catalog_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Catalog file not found: {catalog_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {catalog_path}. Error: {e}") from e
try:
catalog: Catalog = CatalogV1(**catalog_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid catalog file: {catalog_path}. Error: {e}") from e
return catalog
def load_run_results(run_results_path: str) -> RunResults:
try:
run_results_dict = load_json(run_results_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Run results file not found: {run_results_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {run_results_path}. Error: {e}") from e
try:
run_results: RunResults = parse_run_results(run_results_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid run results file: {run_results_path}. Error: {e}") from e
return run_results
def load_sources(sources_path: str) -> Sources:
try:
sources_dict = load_json(sources_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Sources file not found: {sources_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {sources_path}. Error: {e}") from e
try:
sources: Sources = parse_sources(sources_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid sources file: {sources_path}. Error: {e}") from e
return sources
# TODO: Add tests!
def get_table_name_from_source(source: AltimateManifestSourceNode) -> str:
db = source.database
schema = source.schema_name
identifier = source.identifier
if db:
return f"{db}.{schema}.{identifier}"
return f"{schema}.{identifier}"
def classify_model_type_by_name(
model_name: str,
model_name_pattern: Optional[Dict[str, str]],
):
types_patterns = combine_dict(MODEL_TYPE_PATTERNS, model_name_pattern)
for model_type, pattern in types_patterns.items():
if re.match(pattern, model_name):
return model_type
return None
def classify_model_type_by_folder(model_path: str, model_folder_pattern: Optional[Dict[str, str]]) -> str:
folder_patterns = combine_dict(FOLDER_MAP, model_folder_pattern)
dirname = extract_dir_name_from_file_path(model_path)
for model_type, pattern in folder_patterns.items():
if re.match(pattern, dirname):
return model_type
return OTHER
# TODO: Add tests!
def classify_model_type(
model_name: str,
folder_path: Optional[str] = None,
patterns: Optional[Dict[str, Optional[Dict[str, str]]]] = None,
) -> Optional[str]:
"""
Classify the type of a model based on its name using regex patterns.
:param model_name: The name of the model.
:param types_patterns: A dictionary mapping model types to their regex patterns.
:return: The type of the model or None if no match is found.
"""
type_patterns = patterns.get(MODEL, {})
model_type = classify_model_type_by_name(model_name, type_patterns)
if model_type:
return model_type
if folder_path:
folder_patterns = patterns.get(FOLDER, {})
model_type = classify_model_type_by_folder(folder_path, folder_patterns)
if model_type:
return model_type
return OTHER # if no pattern matches
def _check_model_naming_convention(
model_name: str, expected_model_type: str, patterns: Optional[Dict[str, str]]
) -> Tuple[bool, Optional[str]]:
model_patterns = combine_dict(MODEL_TYPE_PATTERNS, patterns)
expected_model_pattern = model_patterns.get(expected_model_type)
if expected_model_pattern:
if re.match(expected_model_pattern, model_name):
return True, None
return False, expected_model_pattern
def get_node_source_name(
node: AltimateManifestNode,
sources: Dict[str, AltimateManifestSourceNode],
) -> str:
for node_id in node.depends_on.nodes:
if node_id in sources:
return sources[node_id].source_name
def _check_mart_convention(folder_patterns, directory_name, node_name):
if re.match(folder_patterns.get(MART, ""), directory_name):
return True, None
return (
False,
f"*/{folder_patterns.get(MART, '')}/{node_name}.sql",
)
def _staging_error_message(source_name, node_name, staging_pattern):
return f"*/{staging_pattern}/{source_name}/{node_name}.sql"
def _check_staging_convention(folder_path, folder_patterns, directory_name, node, sources):
directories = extract_folders_in_path(folder_path)
source_name = get_node_source_name(node, sources)
if not source_name:
return True, None
if directory_name != source_name:
return False, _staging_error_message(source_name, node.name, folder_patterns.get(STAGING, ""))
staging_pattern = folder_patterns.get(STAGING)
if staging_pattern and len(directories) > 2 and not re.match(staging_pattern, directories[-2]):
return False, _staging_error_message(source_name, node.name, staging_pattern)
return True, None
def _check_source_folder_convention(source_name, folder_path, patterns=Optional[Dict[str, Dict[str, str]]]):
folder_patterns = combine_dict(FOLDER_MAP, patterns.get(FOLDER))
directories = extract_folders_in_path(folder_path)
directory_name = extract_dir_name_from_file_path(folder_path)
if directory_name != source_name:
return False, f"{folder_patterns.get(STAGING)}/{source_name}/source.yml"
if len(directories) > 2 and not re.match(folder_patterns.get(STAGING), directories[-2]):
return False, f"{folder_patterns.get(STAGING)}/{source_name}/source.yml"
return True, None
def _check_model_folder_convention(
model_type: str,
folder_path: str,
patterns: Dict[str, Optional[Dict[str, str]]],
node: AltimateManifestNode,
sources: Dict[str, AltimateManifestSourceNode],
) -> Tuple[bool, Optional[str]]:
folder_patterns = patterns.get(FOLDER, {}) or {}
folder_patterns = {**FOLDER_MAP, **folder_patterns}
directory_name = extract_dir_name_from_file_path(folder_path)
if model_type == MART:
return _check_mart_convention(folder_patterns, directory_name, node.name)
if model_type == STAGING:
return _check_staging_convention(folder_path, folder_patterns, directory_name, node, sources)
return True, None
# TODO: Add tests!
def get_children_map(nodes: Dict[str, AltimateManifestNode]) -> Dict[str, AltimateManifestNode]:
"""
Current manifest contains information about parents
THis gives an information of node to children
:param nodes: A dictionary of nodes in a manifest.
:return: A dictionary of all the children of a node.
"""
children_map = {}
for node_id, node in nodes.items():
for parent in node.depends_on.nodes:
children_map.setdefault(parent, set()).add(node_id)
return children_map
# TODO: Add tests!
def get_hard_coded_references(sql_code):
"""
Find all hard-coded references in the given SQL code.
:param sql_code: A string containing the SQL code to be analyzed.
:return: A set of unique hard-coded references found in the SQL code.
"""
# Define regex patterns to match different types of hard-coded references
from_hard_coded_references = {
"from_var_1": r"""(?ix)
# first matching group
# from or join followed by at least 1 whitespace character
(from | join)\s +
# second matching group
# opening {{, 0 or more whitespace character(s), var, 0 or more whitespace character(s), an opening parenthesis, 0 or more whitespace character(s), 1 or 0 quotation mark
({{\s * var\s * \(\s *[\'\"]?)
# third matching group
# at least 1 of anything except a parenthesis or quotation mark
([^)\'\"]+)
# fourth matching group
# 1 or 0 quotation mark, 0 or more whitespace character(s)
([\'\"]?\s*)
# fifth matching group
# a closing parenthesis, 0 or more whitespace character(s), closing }}
(\)\s *}})
""",
"from_var_2": r"""(?ix)
# first matching group
# from or join followed by at least 1 whitespace character
(
from | join)\s +
# second matching group
# opening {{, 0 or more whitespace character(s), var, 0 or more whitespace character(s), an opening parenthesis, 0 or more whitespace character(s), 1 or 0 quotation mark
({{\s * var\s * \(\s *[\'\"]?)
# third matching group
# at least 1 of anything except a parenthesis or quotation mark
([^)\'\"]+)
# fourth matching group
# 1 or 0 quotation mark, 0 or more whitespace character(s)
([\'\"]?\s*)
# fifth matching group
# a comma
(,)
# sixth matching group
# 0 or more whitespace character(s), 1 or 0 quotation mark
(\s *[\'\"]?)
# seventh matching group
# at least 1 of anything except a parenthesis or quotation mark
([^)\'\"]+)
# eighth matching group
# 1 or 0 quotation mark, 0 or more whitespace character(s)
([\'\"]?\s*)
# ninth matching group
# a closing parenthesis, 0 or more whitespace character(s), closing }}
(\)\s *}})
""",
"from_table_1": r"""(?ix)
# first matching group
# from or join followed by at least 1 whitespace character
(
from | join)\s +
# second matching group
# 1 or 0 of (opening bracket, backtick, or quotation mark)
([\[`\"\']?)
# third matching group
# at least 1 word character
(\w+)
# fouth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark)
([\]`\"\']?)
# fifth matching group
# a period
(\.)
# sixth matching group
# 1 or 0 of (opening bracket, backtick, or quotation mark)
([\[`\"\']?)
# seventh matching group
# at least 1 word character
(\w+)
# eighth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark) folowed by a whitespace character or end of string
([\]`\"\']?)(?=\s|$)
""",
"from_table_2": r"""(?ix)
# first matching group
# from or join followed by at least 1 whitespace character
(
from | join)\s +
# second matching group
# 1 or 0 of (opening bracket, backtick, or quotation mark)
([\[`\"\']?)
# third matching group
# at least 1 word character
(\w+)
# fouth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark)
([\]`\"\']?)
# fifth matching group
# a period
(\.)
# sixth matching group
# 1 or 0 of (opening bracket, backtick, or quotation mark)
([\[`\"\']?)
# seventh matching group
# at least 1 word character
(\w+)
# eighth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark)
([\]`\"\']?)
# ninth matching group
# a period
(\.)
# tenth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark)
([\[`\"\']?)
# eleventh matching group
# at least 1 word character
(\w+)
# twelfth matching group
# 1 or 0 of (closing bracket, backtick, or quotation mark) folowed by a whitespace character or end of string
([\]`\"\']?)(?=\s|$)
""",
"from_table_3": r"""(?ix)
# first matching group
# from or join followed by at least 1 whitespace character
(
from | join)\s +
# second matching group
# 1 of (opening bracket, backtick, or quotation mark)
([\[`\"\'])
# third matching group
# at least 1 word character or space
([\w]+)
# fourth matching group
# 1 of (closing bracket, backtick, or quotation mark) folowed by a whitespace character or end of string
([\]`\"\'])(?=\s|$)
""",
}
# Set to store all unique hard-coded references
hard_coded_references = set()
for regex_pattern in from_hard_coded_references.values():
# Compile the regex pattern
all_regex_matches = re.findall(regex_pattern, sql_code)
# Find all matches in the SQL code
# Process each match
for match in all_regex_matches:
# Extract all groups except the first one and join them
raw_reference = "".join(match[1:]).strip() #
hard_coded_references.add(raw_reference)
return hard_coded_references
def parse_argument(argument: str) -> dict:
"""
Parses the given argument to categorize it as a model path, directory, or model name.
Parameters:
- argument (str): The input argument to be parsed.
Returns:
- dict: A dictionary containing the 'type' and 'name' of the parsed argument.
"""
# Determine if the argument is a model path or directory based on its prefix and suffix.
if argument.startswith("path:"):
path_type = SelectOption.MODEL_PATH if argument.endswith(".sql") else SelectOption.DIRECTORY
path = argument.split(":", 1)[1]
return {"type": path_type, "name": path}
# Identify argument as a model path if it ends with '.sql'.
if argument.endswith(".sql"):
return {"type": SelectOption.MODEL_PATH, "name": argument}
# Identify argument as a directory if it contains path separators.
if "/" in argument or "\\" in argument:
return {"type": SelectOption.DIRECTORY, "name": argument}
# Default case: treat the argument as a model name.
return {"type": SelectOption.MODEL_NAME, "name": argument}
def add_models_by_type(selected_category: dict, entities: dict, final_models: List[str]):
"""
Adds models to the final list based on the selected category.
Parameters:
- selected_category (dict): The category selected for adding models.
- entities (dict): A dictionary of entities, each associated with a type.
- final_models (List[str]): The list to which the models' unique IDs are added.
"""
for entity in entities.values():
if selected_category["type"] in (SelectOption.MODEL_NAME, SelectOption.MODEL_PATH):
if entity.name == selected_category.get("name") or entity.original_file_path == selected_category.get("name"):
final_models.append(entity.unique_id)
elif selected_category["type"] == SelectOption.DIRECTORY:
if is_superset_path(selected_category["name"], entity.original_file_path):
final_models.append(entity.unique_id)
def get_models(
selected_model_list: Optional[List[str]],
entities: Dict[str, Union[AltimateManifestNode, AltimateManifestExposureNode, AltimateManifestSourceNode, AltimateManifestTestNode]],
) -> List[str]:
"""
Retrieves models based on a selected list and entities.
Parameters:
- selected_model_list (Optional[List[str]]): The list of selected models.
- entities (Dict): A dictionary containing entity types and their instances.
Returns:
- List[str]: A list of unique model IDs based on the selection criteria.
"""
final_models = []
for selected_model in selected_model_list or []:
selected_category = parse_argument(selected_model)
for entity_type in entities:
add_models_by_type(selected_category, entities[entity_type], final_models)
return list(set(final_models))
def get_manifest_wrapper(manifest_path: str):
manifest = load_manifest(manifest_path)
return DBTFactory.get_manifest_wrapper(manifest)