-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathextract_data.py
More file actions
133 lines (115 loc) · 4.6 KB
/
extract_data.py
File metadata and controls
133 lines (115 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Extract data from JSON. Based on custom JMSPath implementation."""
import re
import warnings
from collections.abc import Mapping
from typing import Any, Dict, List, Optional, Union
import jmespath
from .utils.data_normalization import exclude_filter, flatten_list
from .utils.jmespath_parsers import (
associate_key_of_my_value,
jmespath_refkey_parser,
jmespath_value_parser,
keys_values_zipper,
multi_reference_keys,
)
def extract_data_from_json(
data: Union[Mapping, List],
path: str = "*",
exclude: Optional[List] = None,
) -> Any:
"""Return wanted data from outpdevice data based on the check path. See unit test for complete example.
Get the wanted values to be evaluated if JMESPath expression is defined,
otherwise use the entire data if jmespath is not defined in check. This covers the "raw" diff type.
Exclude data not desired to compare.
Notes:
https://jmespath.org/ shows how JMESPath works.
Args:
data: json data structure
path: JMESPath to extract specific values
exclude: list of keys to exclude
Returns:
Evaluated data, may be anything depending on JMESPath used.
"""
if exclude and isinstance(data, (Dict, List)):
if not isinstance(exclude, list):
raise ValueError(
f"Exclude list must be defined as a list. You have {type(exclude)}",
)
# exclude unwanted elements
exclude_filter(data, exclude)
if not path:
warnings.warn(
"JMSPath cannot be empty string or type 'None'. Path argument reverted to default value '*'",
)
path = "*"
if path == "*":
# return if path is not specified
return data
# Multi ref_key
if len(re.findall(r"\$.*?\$", path)) > 1:
clean_path = path.replace("$", "")
values = jmespath.search(
f"{clean_path}{' | []' * (path.count('*') - 1)}",
data,
)
return keys_values_zipper(
multi_reference_keys(path, data),
associate_key_of_my_value(clean_path, values),
)
values = jmespath.search(jmespath_value_parser(path), data)
if values is None:
raise TypeError(
"JMSPath returned 'None'. Please, verify your JMSPath regex.",
)
# check for multi-nested lists
if not isinstance(values, (str, int, float, bool)) and any(isinstance(i, list) for i in values):
# process elements to check if lists should be flattened
for element in values:
for item in element:
# raise if there is a dict, path must be more specific to extract data
if isinstance(item, dict):
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have "{values}".',
)
if isinstance(item, list):
values = flatten_list(
values,
) # flatten list and rewrite values
break # items are the same, need to check only first to see if this is a nested list
# We need to get a list of reference keys - list of strings.
# Based on the expression or data we might have different data types
# therefore we need to normalize.
if re.search(r"\$.*\$", path):
paired_key_value = associate_key_of_my_value(
jmespath_value_parser(path),
values,
)
wanted_reference_keys = jmespath.search(
jmespath_refkey_parser(path),
data,
)
if isinstance(
wanted_reference_keys,
dict,
): # when wanted_reference_keys is dict() type
list_of_reference_keys = list(wanted_reference_keys.keys())
elif any(
isinstance(element, list) for element in wanted_reference_keys
): # when wanted_reference_keys is a nested list
list_of_reference_keys = flatten_list(wanted_reference_keys)[0]
elif isinstance(
wanted_reference_keys,
list,
): # when wanted_reference_keys is a list
list_of_reference_keys = wanted_reference_keys
else:
raise ValueError(
"Reference Key normalization failure. Please verify data type returned.",
)
normalized = keys_values_zipper(
list_of_reference_keys,
paired_key_value,
)
# Data between pre and post may come in different order, so it needs to be sorted.
return sorted(normalized, key=lambda arg: list(arg.keys()))
return values