-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
323 lines (262 loc) · 11.3 KB
/
utils.py
File metadata and controls
323 lines (262 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import json
import os
import re
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import Dict
from typing import List
from typing import Union
from datapilot.config.config import load_config
from datapilot.core.platforms.dbt.schemas.catalog import CatalogV1
from datapilot.schemas.nodes import ModelNode
from datapilot.schemas.nodes import SourceNode
from vendor.dbt_artifacts_parser.parser import parse_manifest
def load_json(file_path: str) -> Dict:
try:
with Path(file_path).open() as f:
return json.load(f)
except FileNotFoundError:
raise
except json.decoder.JSONDecodeError as e:
raise ValueError(f"Invalid JSON file: {file_path}") from e
except IsADirectoryError as e:
raise ValueError(f"Please provide a A valid manifest file path. {file_path} is a directory") from e
def extract_dir_name_from_file_path(path: str) -> str:
# Handle both Windows and Linux paths using os.path
# Get root directory name
return Path(path).parent.name
def extract_folders_in_path(path: str) -> list:
# Split the path into parts
path_parts = path.split(os.path.sep)
# Exclude the last part if it's a file (has a file extension)
if "." in path_parts[-1]:
path_parts = path_parts[:-1]
path_parts = [part for part in path_parts if part != ""]
return path_parts
def get_dir_path(path: str) -> str:
"""
Get the directory path of a file path.
For example, if the path is /a/b/c/d.txt, the directory path is /a/b/c
:param path:
:return:
"""
return Path(path).parent
def is_superset_path(superset_path: str, path: str):
"""
Check if the path is a sub-path of the superset path.
:param superset_path: The superset path
:param path: The path to be checked
:return: True if the path is a sub-path of the superset path, False otherwise
"""
try:
Path(path).relative_to(superset_path)
return True
except ValueError:
return False
def get_changed_files(include_untracked=True):
command = ["git", "status", "--porcelain"]
if include_untracked:
command.append("-uall")
result = subprocess.run(command, capture_output=True, text=True) # noqa
changed_files = []
for line in result.stdout.splitlines():
if line.startswith("??") and include_untracked:
changed_files.append(line.split()[1])
elif line.startswith(("M", "A", "D", "R", " M", " A", " D", " R")):
changed_files.append(line.split()[1])
return changed_files
def get_tmp_dir_path():
tmp_dir = Path(tempfile.gettempdir()) / str(uuid.uuid4())
tmp_dir.mkdir(parents=True, exist_ok=True)
return tmp_dir
def get_column_type(dtype: str) -> str:
dtype = dtype.lower()
if re.match(r".*int.*", dtype):
return "INTEGER"
elif re.match(r".*float.*", dtype):
return "FLOAT"
elif re.match(r".*bool.*", dtype):
return "BOOLEAN"
elif re.match(r".*date.*", dtype):
return "DATE"
elif re.match(r".*time.*", dtype):
return "TIME"
elif re.match(r".*timestamp.*", dtype):
return "TIMESTAMP"
elif re.match(r".*text.*", dtype):
return "TEXT"
elif re.match(r".*char.*", dtype):
return "TEXT"
elif re.match(r".*varchar.*", dtype):
return "TEXT"
elif re.match(r".*numeric.*", dtype):
return "NUMERIC"
elif re.match(r".*decimal.*", dtype):
return "DECIMAL"
elif re.match(r".*double.*", dtype):
return "DOUBLE"
elif re.match(r".*real.*", dtype):
return "REAL"
else:
return "TEXT"
def get_manifest_model_nodes(manifest: Dict, models: List) -> List[ModelNode]:
nodes = []
for node in manifest["nodes"].values():
if node["name"] in models:
if node["resource_type"] == "model" and node["config"]["materialized"] in ["table", "view"]:
nodes.append(
ModelNode(
unique_id=node["unique_id"],
name=node["name"],
resource_type=node["resource_type"],
database=node["database"],
alias=node["alias"],
table_schema=node["schema"],
)
)
return nodes
def get_manifest_source_nodes(manifest: Dict, sources: List) -> List[SourceNode]:
nodes = []
for node in manifest["sources"].values():
if node["source_name"] in sources:
nodes.append(
SourceNode(
unique_id=node["unique_id"],
name=node["source_name"],
resource_type=node["resource_type"],
table=node["identifier"],
database=node["database"],
table_schema=node["schema"],
)
)
return nodes
def get_model_tables(models: List[ModelNode]) -> List[str]:
tables = []
for model in models:
tables.append(f"{model.database}.{model.table_schema}.{model.alias}")
return tables
def get_source_tables(sources: List[SourceNode]) -> List[str]:
tables = []
for source in sources:
tables.append(f"{source.database}.{source.table_schema}.{source.name}")
return tables
def get_table_name(node: Union[ModelNode, SourceNode], node_type: str) -> str:
if node_type == "nodes":
return f"{node.database}.{node.table_schema}.{node.alias}"
return f"{node.database}.{node.table_schema}.{node.name}"
def fill_catalog(table_columns_map: Dict, manifest: Dict, catalog: Dict, nodes: List[Union[ModelNode, SourceNode]], node_type: str) -> Dict:
if not nodes:
catalog[node_type] = {}
return catalog
for node in nodes:
columns = {}
for column in table_columns_map[node.unique_id]:
column_type = get_column_type(column["dtype"])
columns[column["column"]] = {
"type": column_type,
"index": len(columns) + 1,
"name": column["column"],
"comment": None,
}
catalog[node_type] = {
node.unique_id: {
"metadata": {
"type": "BASE TABLE",
"schema": manifest[node_type][node.unique_id]["schema"],
"name": node.alias if node_type == "nodes" else node.name,
"database": manifest[node_type][node.unique_id]["database"],
"comment": None,
"owner": None,
},
"columns": columns,
"stats": {},
"unique_id": node.unique_id,
}
}
return catalog
def run_macro(macro: str, base_path: str) -> str:
dbt_compile = subprocess.run(
["dbt", "compile", "--inline", macro], # noqa
capture_output=True,
cwd=base_path,
text=True,
)
return dbt_compile.stdout
def generate_partial_manifest_catalog(changed_files, base_path: str = "./"):
try:
# print(f"Running generate_partial_manifest_catalog for {changed_files}")
yaml_files = [
f for f in changed_files if Path(f).suffix in [".yml", ".yaml"] and Path(f).name not in ["dbt_project.yml", "profiles.yml"]
]
model_stem = [Path(f).stem for f in changed_files if Path(f).suffix in [".sql"]]
# print(f"yaml_files: {yaml_files}")
# print(f"model_stem: {model_stem}")
model_set = set()
source_set = set()
for file in yaml_files:
parsed_file = load_config(file)
if "models" in parsed_file:
for model in parsed_file["models"]:
model_set.add(model.get("name", ""))
if "sources" in parsed_file:
for source in parsed_file["sources"]:
source_set.add(source.get("name", ""))
for model in model_stem:
model_set.add(model)
models = list(model_set)
source_list = list(source_set)
# print(f"models: {models}")
# print(f"sources: {source_list}")
subprocess.run(["dbt", "parse"], cwd=base_path, stdout=subprocess.PIPE) # noqa
manifest_file = Path(Path(base_path) / "target/manifest.json")
with manifest_file.open() as f:
manifest = json.load(f)
nodes = get_manifest_model_nodes(manifest, models)
sources = get_manifest_source_nodes(manifest, source_list)
nodes_data = [{"name": node.name, "resource_type": node.resource_type, "unique_id": node.unique_id, "table": ""} for node in nodes]
sources_data = [
{"name": source.name, "resource_type": source.resource_type, "unique_id": source.unique_id, "table": source.table}
for source in sources
]
nodes_str = ",\n".join(json.dumps(data) for data in nodes_data + sources_data)
query = (
"{% set result = {} %}{% set nodes = ["
+ nodes_str
+ '] %}{% for n in nodes %}{% if n["resource_type"] == "source" %}{% set columns = adapter.get_columns_in_relation(source(n["name"], n["table"])) %}{% else %}{% set columns = adapter.get_columns_in_relation(ref(n["name"])) %}{% endif %}{% set new_columns = [] %}{% for column in columns %}{% do new_columns.append({"column": column.name, "dtype": column.dtype}) %}{% endfor %}{% do result.update({n["unique_id"]:new_columns}) %}{% endfor %}{{ tojson(result) }}'
)
dbt_compile_output = run_macro(query, base_path)
# print(dbt_compile_output)
compiled_inline_node = dbt_compile_output.split("Compiled inline node is:")[1].strip().replace("'", "").strip()
table_columns_map = json.loads(compiled_inline_node)
# we need to get all columns from compiled_dict which is a list of dictionaries
# and each item in the list is a dictionary with keys table, name, type
# we need to create a map of all the columns for each table
# and then create a catalog for each table
catalog = {
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"dbt_version": "1.7.2",
"generated_at": "2024-03-04T11:13:52.284167Z",
"invocation_id": "e2970ef7-c397-404b-ac5d-63a71a45b628",
"env": {},
},
"errors": None,
}
catalog = fill_catalog(table_columns_map, manifest, catalog, nodes, "nodes")
catalog = fill_catalog(table_columns_map, manifest, catalog, sources, "sources")
selected_models = [node.unique_id for node in nodes + sources]
return selected_models, parse_manifest(manifest), CatalogV1(**catalog)
except Exception as e:
raise Exception("Unable to generate partial manifest and catalog") from e
def map_url_to_instance(url, instance):
# Base URLs and their corresponding patterns
url_mapping = {
"https://api.tryaltimate.com": f"https://{instance}.demo.tryaltimate.com",
"https://api.myaltimate.com": f"https://{instance}.app.myaltimate.com",
"https://api.getaltimate.com": f"https://{instance}.app.getaltimate.com",
"http://localhost:8000": f"http://{instance}.localhost:3000",
}
# Check if the URL is in the dictionary and return the corresponding instance URL
return url_mapping.get(url)