-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpropagate_tag.py
More file actions
executable file
·99 lines (78 loc) · 3.24 KB
/
propagate_tag.py
File metadata and controls
executable file
·99 lines (78 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
import argparse
import datetime
from common import client, load_query
def set_tag_on_column(data_source_id, database, schema, table, column, tag, set_tag):
# --------------- find central table and get usage stats
table_data = client.execute(
load_query('queries/table_by_path.gql'),
{'path': f'"{str(data_source_id)}"."{database}"."{schema}"."{table}"'}
)
table_uid = table_data['table']['uid']
column_name_to_uid = {
c['prop']['name']: c['uid'] for c in table_data['table']['columns']
}
column_uid = column_name_to_uid[column]
print('table uid:', table_uid)
print('column uid:', column_uid)
# --------------- get lineage data
tags_data = client.execute(load_query('queries/get_tags.gql'))
tag_name_to_uid = {t['name']: t['uid'] for t in tags_data['tags']['items']}
tag_uid = tag_name_to_uid[tag]
print('tag uid:', tag_uid)
lineage_data = client.execute(load_query('queries/lineage_1col_stripped.gql'), {
'primaryUid': table_uid,
'depthDownstream': 100,
'depthUpstream': 100,
'biPopularity': [0, 4],
'biLastUsedDays': 90,
'allowedList': [column_uid],
})
columns_to_ignore = set(column_name_to_uid.values()) - {column_uid}
connected_columns = set()
for e in lineage_data['lineage']['edges']:
if e['sourceUid'] is None or e['destinationUid'] is None:
continue
connected_columns.add(e['sourceUid'])
connected_columns.add(e['destinationUid'])
tags_on_columns_data = client.execute(
load_query('queries/get_columns.gql'),
{'uids': list(connected_columns)}
)
columns_to_alter = []
columns_noop_n = 0
for col in tags_on_columns_data['columns']['items']:
# some are None
if col:
uid = col['uid']
col['tagIds']
if (tag_uid in col['tagIds']) != set_tag:
columns_to_alter.append(uid)
else:
columns_noop_n += 1
print('Columns with no operation required:', columns_noop_n)
print('Columns with operation completing:', len(columns_to_alter))
# --------------- set/unset tags
attach_tags_data = client.execute(load_query('queries/attach_tags.gql'), {
'tagUid': tag_uid,
'objectUids': columns_to_alter,
'attach': set_tag,
})
def main():
parser = argparse.ArgumentParser(description='Analyze table usage by column')
parser.add_argument('data_source_id', type=int)
parser.add_argument('database', type=str,
help='database case-sensitive')
parser.add_argument('schema', type=str,
help='schema case-sensitive')
parser.add_argument('table', type=str,
help='table case-sensitive')
parser.add_argument('column', type=str,
help='column case-sensitive')
parser.add_argument('tag', type=str,
help='tag case-sensitive')
parser.add_argument('--set-tag', action=argparse.BooleanOptionalAction)
args = parser.parse_args()
set_tag_on_column(args.data_source_id, args.database, args.schema, args.table, args.column, args.tag, args.set_tag)
if __name__ == '__main__':
main()