-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcreate_anomaly_csv.py
More file actions
53 lines (38 loc) · 1.36 KB
/
create_anomaly_csv.py
File metadata and controls
53 lines (38 loc) · 1.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import csv
import datetime
import json
import os
import sys
import httpx
base_url = "https://api.raptormaps.com"
current = datetime.datetime.now()
inspection_id: int = sys.argv[1]
def get_bearer_token():
client_secret = os.environ["DEMO_CLIENT_SECRET"]
client_id = os.environ["DEMO_CLIENT_ID"]
headers = {"content-type": "application/json"}
url = f"{base_url}/oauth/token"
body = {
"client_id": client_id,
"client_secret": client_secret,
"audience": "api://customer-api"
}
token_response = httpx.post(url, headers=headers, data=json.dumps(body))
response_data = token_response.json()
return response_data.get("access_token")
token: str = get_bearer_token()
org_id: int = os.environ.get('DEMO_ORG_ID')
def get_anomaly_data(inspection_id):
headers = {'Authorization': f'Bearer {token}'}
anomalies = httpx.get(
f'{base_url}/v2/solar_inspections/{inspection_id}/anomalies?org_id={org_id}',
headers=headers
)
return anomalies.json()['defects']
anomalies_list: list[dict] = get_anomaly_data(inspection_id)
with open(f'anamolies_for_inspection_{inspection_id}_{current}.csv', 'w', newline='') as newcsv:
fieldnames = anomalies_list[0].keys()
writer = csv.DictWriter(newcsv, fieldnames=fieldnames)
writer.writeheader()
for item in anomalies_list:
writer.writerow(item)