-
Notifications
You must be signed in to change notification settings - Fork 60
Expand file tree
/
Copy pathtest_oauth2.py
More file actions
293 lines (261 loc) · 11.6 KB
/
test_oauth2.py
File metadata and controls
293 lines (261 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# -*- coding: utf-8 -*-
import datetime
import time
import pytest
from tests import FakeMethod, FakeClass
from tests.conftest import get_token_value
from xero_python.api_client import ApiClient
from xero_python.api_client.configuration import Configuration
from xero_python.api_client.oauth2 import TokenApi, OAuth2Token
from xero_python.exceptions import AccessTokenExpiredError
# from os.path import join, dirname
def test_access_token_configuration(oauth2_token):
# given oauth2_token is oauthlib token dictionary
assert isinstance(oauth2_token, dict)
token = OAuth2Token()
# When updating OAuth2Token configuration
token.update_token(**oauth2_token)
# Then expect token configuration created
assert token.access_token == oauth2_token["access_token"]
class FakeOAuth2Token:
def __init__(self, value):
self.value = value
def get_auth_settings(self):
return self.value
def test_configuration_uses_oauth2_token():
# Given properly configured ApiClient
result = "auth configuration"
configuration = Configuration()
configuration.oauth2_token = FakeOAuth2Token(result)
# When auth_settings settings created
auth_settings = configuration.auth_settings("OAuth2")
# Then auth_settings correct value
assert auth_settings is result
def test_auth2_token_is_valid():
# Given valid token
expires_at = time.time() + 3
oauth2_token = OAuth2Token(expiration_buffer=0)
oauth2_token.expires_at = expires_at
# Then check token is currently valid
assert oauth2_token.is_access_token_valid()
# and it will be valid at specific time
assert oauth2_token.is_access_token_valid(at_time=expires_at - 1)
def test_auth2_token_is_invalid_expiration_buffer():
# Given valid token
expires_at = time.time() + OAuth2Token.EXPIRATION_BUFFER_DEFAULT - 1
oauth2_token = OAuth2Token()
oauth2_token.expires_at = expires_at
# Then check token is currently invalid (expiration buffer applied)
assert not oauth2_token.is_access_token_valid()
# and it will be valid at specific time (expiration buffer not applied)
assert oauth2_token.is_access_token_valid(at_time=expires_at - 1)
def test_auth2_token_is_valid_indefinitely():
# Given indefinitely valid token
far_future = datetime.datetime.today() + datetime.timedelta(days=365)
oauth2_token = OAuth2Token()
# Then check token is currently valid
assert oauth2_token.is_access_token_valid()
# and it will be valid in far future
assert oauth2_token.is_access_token_valid(at_time=far_future.timestamp())
def test_auth2_get_valid_token():
# given OAuth2Token with valid access_token
token = "access token value"
oauth2_token = OAuth2Token()
oauth2_token.access_token = token
# Then correct access token value expected
assert oauth2_token.get_valid_access_token() == token
def test_auth2_get_invalid_token():
# given OAuth2Token with invalid access_token
token = "access token value"
expires_at = time.time() - 4
oauth2_token = OAuth2Token()
oauth2_token.access_token = token
oauth2_token.expires_at = expires_at
# Then correct access token value expected
with pytest.raises(AccessTokenExpiredError):
oauth2_token.get_valid_access_token()
with pytest.raises(AccessTokenExpiredError):
oauth2_token.get_valid_access_token(at_time=expires_at + 1)
def test_auth2_refresh_access_token():
# given OAuth2Token with expired access_token
api_client = FakeClass()
api_client.set_oauth2_token = FakeMethod()
refresh_token = "refresh-token-value"
scope = [
"email",
"profile",
"openid",
"accounting.reports.read",
"accounting.attachments.read",
"accounting.settings",
"accounting.settings.read",
"accounting.attachments",
"accounting.transactions",
"accounting.journals.read",
"accounting.transactions.read",
"accounting.contacts",
"accounting.contacts.read",
"offline_access",
]
new_token = {
"id_token": "new-id-token-value",
"access_token": "new-access-token-value",
"expires_in": 1800,
"expires_at": time.time() + 1800,
"token_type": "Bearer",
"refresh_token": "new-refresh-token-value",
"scope": scope,
}
oauth2_token = OAuth2Token(client_id="client_id", client_secret="client_secret")
oauth2_token.refresh_token = refresh_token
oauth2_token.scope = scope
oauth2_token.fetch_access_token = FakeMethod(return_value=new_token)
# When refreshing access_token
assert oauth2_token.refresh_access_token(api_client=api_client)
# Then expected set new token function called
assert len(oauth2_token.fetch_access_token.calls) == 1
assert len(api_client.set_oauth2_token.calls) == 1
call_args, call_kwargs = api_client.set_oauth2_token.calls[0]
assert call_args == (new_token,)
assert call_kwargs == {}
# Then expected new valid access and refresh tokens set on oauth2_token
assert oauth2_token.expires_at == new_token["expires_at"]
assert oauth2_token.is_access_token_valid()
assert oauth2_token.id_token == new_token["id_token"]
assert oauth2_token.expires_in == new_token["expires_in"]
assert oauth2_token.token_type == new_token["token_type"]
assert oauth2_token.scope == new_token["scope"]
assert oauth2_token.access_token == new_token["access_token"]
assert oauth2_token.refresh_token == new_token["refresh_token"]
def test_auth2_refresh_access_token_having_scope_as_string():
# given OAuth2Token with expired access_token
api_client = FakeClass()
api_client.set_oauth2_token = FakeMethod()
refresh_token = "refresh-token-value"
scope = (
"email profile openid accounting.reports.read "
"accounting.attachments.read accounting.settings "
"accounting.settings.read accounting.attachments "
"accounting.transactions accounting.journals.read "
"accounting.transactions.read accounting.contacts "
"accounting.contacts.read offline_access"
)
new_token = {
"id_token": "new-id-token-value",
"access_token": "new-access-token-value",
"expires_in": 1800,
"expires_at": time.time() + 1800,
"token_type": "Bearer",
"refresh_token": "new-refresh-token-value",
"scope": scope,
}
oauth2_token = OAuth2Token(client_id="client_id", client_secret="client_secret")
oauth2_token.refresh_token = refresh_token
oauth2_token.scope = scope
oauth2_token.fetch_access_token = FakeMethod(return_value=new_token)
# When refreshing access_token
assert oauth2_token.refresh_access_token(api_client=api_client)
# Then expected set new token function called
assert len(oauth2_token.fetch_access_token.calls) == 1
assert len(api_client.set_oauth2_token.calls) == 1
call_args, call_kwargs = api_client.set_oauth2_token.calls[0]
assert call_args == (new_token,)
assert call_kwargs == {}
# Then expected new valid access and refresh tokens set on oauth2_token
assert oauth2_token.expires_at == new_token["expires_at"]
assert oauth2_token.is_access_token_valid()
assert oauth2_token.id_token == new_token["id_token"]
assert oauth2_token.expires_in == new_token["expires_in"]
assert oauth2_token.token_type == new_token["token_type"]
assert oauth2_token.scope == new_token["scope"]
assert oauth2_token.access_token == new_token["access_token"]
assert oauth2_token.refresh_token == new_token["refresh_token"]
def test_auth2_fetch_access_token():
# Given OAuth2Token with valid refresh_token
oauth2_token = OAuth2Token()
token_valid_from = time.time()
scope = (
"email profile openid accounting.reports.read "
"accounting.attachments.read accounting.settings "
"accounting.settings.read accounting.attachments "
"accounting.transactions accounting.journals.read "
"accounting.transactions.read accounting.contacts "
"accounting.contacts.read offline_access"
)
new_token = {
"id_token": "new-id-token-value",
"access_token": "new-access-token-value",
"expires_in": 1800,
"token_type": "Bearer",
"refresh_token": "new-refresh-token-value",
"scope": scope,
}
oauth2_token.call_refresh_token_api = FakeMethod(return_value=new_token.copy())
token_api = None
# When we fetch new oauth2 token
received_token = oauth2_token.fetch_access_token(
token_api=token_api, token_valid_from=token_valid_from
)
# Then new token valid token to be received
assert len(oauth2_token.call_refresh_token_api.calls) == 1
assert received_token["id_token"] == new_token["id_token"]
assert received_token["access_token"] == new_token["access_token"]
assert received_token["expires_in"] == new_token["expires_in"]
assert received_token["token_type"] == new_token["token_type"]
assert received_token["refresh_token"] == new_token["refresh_token"]
assert received_token["scope"] == scope.split()
assert received_token["expires_at"] == token_valid_from + new_token["expires_in"]
def test_auth2_call_refresh_token_api(oauth2_refresh_token):
# Given valid refresh token and client credentials
oauth2_token = OAuth2Token()
oauth2_token.update_token(**oauth2_refresh_token)
token = {}
token_api = FakeClass()
token_api.refresh_token = FakeMethod(return_value=token)
# When refresh token API endpoint called
new_token = oauth2_token.call_refresh_token_api(token_api)
# Then new oauth2 token received
assert len(token_api.refresh_token.calls) == 1
call_args, call_kwargs = token_api.refresh_token.calls[0]
assert call_args == (oauth2_token.refresh_token, oauth2_token.scope)
assert call_kwargs == {}
assert new_token is token
def test_auth2_call_refresh_token_api_without_id_token(oauth2_token_without_id_token):
# Given valid refresh token and client credentials without using OpenID scope (id_token absent)
oauth2_token = OAuth2Token()
oauth2_token.update_token(**oauth2_token_without_id_token)
token = {}
token_api = FakeClass()
token_api.refresh_token = FakeMethod(return_value=token)
# When refresh token API endpoint called
new_token = oauth2_token.call_refresh_token_api(token_api)
# Then new oauth2 token received
assert len(token_api.refresh_token.calls) == 1
call_args, call_kwargs = token_api.refresh_token.calls[0]
assert call_args == (oauth2_token.refresh_token, oauth2_token.scope)
assert call_kwargs == {}
assert new_token is token
def test_token_api_refresh_token(
xero_client_id, xero_client_secret, xero_scope, vcr, vcr_cassette_name
):
# Given all oauth2 credential and refresh token
api_client = ApiClient(pool_threads=1)
token_api = TokenApi(api_client, xero_client_id, xero_client_secret)
refresh_token = get_token_value("test_refresh_token") or "test-refresh-token"
# When getting new token
with vcr.use_cassette(vcr_cassette_name, record_mode="none") as cassette:
token = token_api.refresh_token(refresh_token, xero_scope)
assert cassette.all_played
# uncomment to save new token for next run use
# with open(join(dirname(__file__), "oauth2_token.py"), "w") as token_file:
# token_file.write("token = {!r}".format(token))
# Then new oauth2 token received
assert isinstance(token, dict)
assert token.get("refresh_token")
assert token["refresh_token"] != refresh_token
assert token.get("access_token")
assert token.get("expires_in")
assert token.get("id_token")
assert token.get("token_type")
assert token.get("scope")
assert token["scope"].split() == xero_scope