forked from veerendra2/fitbit-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfitbit_api.py
More file actions
174 lines (135 loc) · 6.54 KB
/
fitbit_api.py
File metadata and controls
174 lines (135 loc) · 6.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# -*- coding: utf-8 -*-
"""
Fitbit API
"""
import requests
from .exceptions import FitbitAPIError
from .fitbit_setup import update_fitbit_token
class FitbitAPI:
"""Fitbit API"""
TOKEN_API = "https://api.fitbit.com/oauth2/token"
def __init__(self, client_id, client_secret, access_token, refresh_token):
self.client_id = client_id
self.client_secret = client_secret
self.access_token = access_token
self.refresh_token = refresh_token
self.headers = self._create_headers()
def _create_headers(self):
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
def refresh_access_token(self):
"""Refresh token"""
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"refresh_token": self.refresh_token,
}
headers = {
"Authorization": f"Basic {self.client_secret}",
"Content-Type": "application/x-www-form-urlencoded",
}
response = requests.post(
FitbitAPI.TOKEN_API, data=payload, headers=headers, timeout=5
)
if response.status_code == 200:
tokens = response.json()
self.access_token = tokens.get("access_token")
self.refresh_token = tokens.get("refresh_token")
self.headers = self._create_headers()
update_fitbit_token(self.access_token, self.refresh_token)
else:
raise FitbitAPIError(f"Failed to refresh access token: {response.json()}")
def make_request(self, method, url, **kwargs):
"""Make an API request and handle token refresh if needed."""
try:
response = requests.request(
method, url, headers=self.headers, timeout=5, **kwargs
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
self.refresh_access_token()
response = requests.request(
method, url, headers=self.headers, timeout=5, **kwargs
)
response.raise_for_status()
else:
raise FitbitAPIError(f"HTTP error occurred: {response.json()}") from e
return response
def get_user_profile(self):
"""Get Profile"""
url = "https://api.fitbit.com/1/user/-/profile.json"
response = self.make_request("GET", url)
return response.json()
def get_devices(self):
"""Get Devices"""
url = "https://api.fitbit.com/1/user/-/devices.json"
response = self.make_request("GET", url)
return response.json()
def get_sleep_log(self, start_date, end_date=None):
"""Get Sleep Logs by Date Range and Date"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1.2/user/-/sleep/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_heart_rate_time_series(self, start_date, end_date=None):
"""Get Heart Rate Time Series by Date Range and Date"""
date_range = f"{start_date}/{end_date}" if end_date else f"{start_date}/1d"
url = f"https://api.fitbit.com/1/user/-/activities/heart/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_spo2_summary(self, start_date, end_date=None):
"""Get SpO2 Summary by Interval and Date"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1/user/-/spo2/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_spo2_intraday(self, start_date, end_date=None):
"""Get SpO2 Intraday by Interval and Date"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1/user/-/spo2/date/{date_range}/all.json"
response = self.make_request("GET", url)
return response.json()
def get_azm_time_series(self, start_date, end_date=None):
"""Get AZM Time Series by Interval and Data"""
date_range = f"{start_date}/{end_date}" if end_date else f"{start_date}/1d"
url = f"https://api.fitbit.com/1/user/-/activities/active-zone-minutes/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_azm_intraday(self, start_date, end_date=None):
"""Get AZM Intraday by Interval and Data"""
date_range = f"{start_date}/{end_date}" if end_date else f"{start_date}/1d"
url = f"https://api.fitbit.com/1/user/-/activities/active-zone-minutes/date/{date_range}/1min.json"
response = self.make_request("GET", url)
return response.json()
def get_breathing_rate_summary(self, start_date, end_date=None):
"""Get Breathing Rate Summary by Interval and Data"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1/user/-/br/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_breathing_rate_intraday(self, start_date, end_date=None):
"""Get Breathing Rate Intraday by Interval and Data"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1/user/-/br/date/{date_range}/all.json"
response = self.make_request("GET", url)
return response.json()
def get_hrv_summary(self, start_date, end_date=None):
"""Get HRV Summary by Interval and Date"""
date_range = f"{start_date}/{end_date}" if end_date else start_date
url = f"https://api.fitbit.com/1/user/-/hrv/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_body_time_series(self, resource_path, start_date, end_date=None):
"""Get Body Time Series by Interval and Date"""
date_range = f"{start_date}/{end_date}" if end_date else f"{start_date}/1d"
url = f"https://api.fitbit.com/1/user/-/body/{resource_path}/date/{date_range}.json"
response = self.make_request("GET", url)
return response.json()
def get_daily_activity_summary(self, start_date):
"""Get Daily Activity Summary"""
url = f"https://api.fitbit.com/1/user/-/activities/date/{start_date}.json"
response = self.make_request("GET", url)
return response.json()