Skip to content

Commit df6833c

Browse files
authored
Merge pull request #598 from ably/ECO-5305/fix-rest-retry
[ECO-5305] fix: rest retry logic
2 parents 8f5c1cb + 0856086 commit df6833c

3 files changed

Lines changed: 120 additions & 17 deletions

File tree

ably/http/http.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ async def make_request(self, method, path, version=None, headers=None, body=None
188188

189189
hosts = self.get_rest_hosts()
190190
for retry_count, host in enumerate(hosts):
191+
def should_stop_retrying():
192+
time_passed = time.time() - requested_at
193+
# if it's the last try or cumulative timeout is done, we stop retrying
194+
return retry_count == len(hosts) - 1 or time_passed > http_max_retry_duration
195+
191196
base_url = "%s://%s:%d" % (self.preferred_scheme,
192197
host,
193198
self.preferred_port)
@@ -204,28 +209,33 @@ async def make_request(self, method, path, version=None, headers=None, body=None
204209
try:
205210
response = await self.__client.send(request)
206211
except Exception as e:
207-
# if last try or cumulative timeout is done, throw exception up
208-
time_passed = time.time() - requested_at
209-
if retry_count == len(hosts) - 1 or time_passed > http_max_retry_duration:
212+
if should_stop_retrying():
210213
raise e
211214
else:
215+
# RSC15l4
216+
cloud_front_error = (response.headers.get('Server', '').lower() == 'cloudfront'
217+
and response.status_code >= 400)
218+
# RSC15l3
219+
retryable_server_error = response.status_code >= 500 and response.status_code <= 504
220+
# Resending requests that have failed for other failure conditions will not fix the problem
221+
# and will simply increase the load on other datacenters unnecessarily
222+
should_fallback = cloud_front_error or retryable_server_error
223+
212224
try:
213225
if raise_on_error:
214226
AblyException.raise_for_response(response)
215227

228+
if should_fallback and not should_stop_retrying():
229+
continue
230+
216231
# Keep fallback host for later (RSC15f)
217232
if retry_count > 0 and host != self.options.get_rest_host():
218233
self.__host = host
219234
self.__host_expires = time.time() + (self.options.fallback_retry_timeout / 1000.0)
220235

221236
return Response(response)
222237
except AblyException as e:
223-
if not e.is_server_error:
224-
raise e
225-
226-
# if last try or cumulative timeout is done, throw exception up
227-
time_passed = time.time() - requested_at
228-
if retry_count == len(hosts) - 1 or time_passed > http_max_retry_duration:
238+
if should_stop_retrying() or not should_fallback:
229239
raise e
230240

231241
async def delete(self, url, headers=None, skip_auth=False, timeout=None):

test/ably/rest/resthttp_test.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ async def test_no_retry_if_not_500_to_599_http_code(self):
151151

152152
await ably.close()
153153

154+
@respx.mock
154155
async def test_500_errors(self):
155156
"""
156157
Raise error if all the servers reply with a 5xx error.
@@ -159,16 +160,13 @@ async def test_500_errors(self):
159160

160161
ably = AblyRest(token="foo")
161162

162-
def raise_ably_exception(*args, **kwargs):
163-
raise AblyException(message="", status_code=500, code=50000)
163+
mock_request = respx.route().mock(return_value=httpx.Response(500, text="Internal Server Error"))
164164

165-
with mock.patch('httpx.Request', wraps=httpx.Request):
166-
with mock.patch('ably.util.exceptions.AblyException.raise_for_response',
167-
side_effect=raise_ably_exception) as send_mock:
168-
with pytest.raises(AblyException):
169-
await ably.http.make_request('GET', '/', skip_auth=True)
165+
with pytest.raises(AblyException):
166+
await ably.http.make_request('GET', '/', skip_auth=True)
167+
168+
assert mock_request.call_count == 3
170169

171-
assert send_mock.call_count == 3
172170
await ably.close()
173171

174172
def test_custom_http_timeouts(self):

test/ably/rest/restrequest_test.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,101 @@ async def test_timeout(self):
126126
await ably.request('GET', '/time', version=Defaults.protocol_version)
127127
await ably.close()
128128

129+
# RSC15l3
130+
@dont_vary_protocol
131+
async def test_503_status_fallback(self):
132+
default_endpoint = 'https://sandbox-rest.ably.io/time'
133+
fallback_host = 'sandbox-a-fallback.ably-realtime.com'
134+
fallback_endpoint = f'https://{fallback_host}/time'
135+
ably = await TestApp.get_ably_rest(fallback_hosts=[fallback_host])
136+
with respx.mock:
137+
default_route = respx.get(default_endpoint)
138+
fallback_route = respx.get(fallback_endpoint)
139+
headers = {
140+
"Content-Type": "application/json"
141+
}
142+
default_route.return_value = httpx.Response(503, headers=headers)
143+
fallback_route.return_value = httpx.Response(200, headers=headers, text='[123]')
144+
result = await ably.request('GET', '/time', version=Defaults.protocol_version)
145+
assert default_route.called
146+
assert result.status_code == 200
147+
assert result.items[0] == 123
148+
await ably.close()
149+
150+
# RSC15l2
151+
@dont_vary_protocol
152+
async def test_httpx_timeout_fallback(self):
153+
default_endpoint = 'https://sandbox-rest.ably.io/time'
154+
fallback_host = 'sandbox-a-fallback.ably-realtime.com'
155+
fallback_endpoint = f'https://{fallback_host}/time'
156+
ably = await TestApp.get_ably_rest(fallback_hosts=[fallback_host])
157+
with respx.mock:
158+
default_route = respx.get(default_endpoint)
159+
fallback_route = respx.get(fallback_endpoint)
160+
headers = {
161+
"Content-Type": "application/json"
162+
}
163+
default_route.side_effect = httpx.ReadTimeout
164+
fallback_route.return_value = httpx.Response(200, headers=headers, text='[123]')
165+
result = await ably.request('GET', '/time', version=Defaults.protocol_version)
166+
assert default_route.called
167+
assert result.status_code == 200
168+
assert result.items[0] == 123
169+
await ably.close()
170+
171+
# RSC15l3
172+
@dont_vary_protocol
173+
async def test_503_status_fallback_on_publish(self):
174+
default_endpoint = 'https://sandbox-rest.ably.io/channels/test/messages'
175+
fallback_host = 'sandbox-a-fallback.ably-realtime.com'
176+
fallback_endpoint = f'https://{fallback_host}/channels/test/messages'
177+
178+
fallback_response_text = (
179+
'{"id": "unique_id:0", "channel": "test", "name": "test", "data": "data", '
180+
'"clientId": null, "connectionId": "connection_id", "timestamp": 1696944145000, '
181+
'"encoding": null}'
182+
)
183+
184+
ably = await TestApp.get_ably_rest(fallback_hosts=[fallback_host])
185+
with respx.mock:
186+
default_route = respx.post(default_endpoint)
187+
fallback_route = respx.post(fallback_endpoint)
188+
headers = {
189+
"Content-Type": "application/json"
190+
}
191+
default_route.return_value = httpx.Response(503, headers=headers)
192+
fallback_route.return_value = httpx.Response(
193+
200,
194+
headers=headers,
195+
text=fallback_response_text,
196+
)
197+
message_response = await ably.channels['test'].publish('test', 'data')
198+
assert default_route.called
199+
assert message_response.to_native()['data'] == 'data'
200+
await ably.close()
201+
202+
# RSC15l4
203+
@dont_vary_protocol
204+
async def test_400_cloudfront_fallback(self):
205+
default_endpoint = 'https://sandbox-rest.ably.io/time'
206+
fallback_host = 'sandbox-a-fallback.ably-realtime.com'
207+
fallback_endpoint = f'https://{fallback_host}/time'
208+
ably = await TestApp.get_ably_rest(fallback_hosts=[fallback_host])
209+
with respx.mock:
210+
default_route = respx.get(default_endpoint)
211+
fallback_route = respx.get(fallback_endpoint)
212+
headers = {
213+
"Server": "CloudFront",
214+
"Content-Type": "application/json",
215+
}
216+
default_route.return_value = httpx.Response(400, headers=headers, text='[456]')
217+
fallback_route.return_value = httpx.Response(200, headers=headers, text='[123]')
218+
result = await ably.request('GET', '/time', version=Defaults.protocol_version)
219+
assert default_route.called
220+
assert result.status_code == 200
221+
assert result.items[0] == 123
222+
await ably.close()
223+
129224
async def test_version(self):
130225
version = "150" # chosen arbitrarily
131226
result = await self.ably.request('GET', '/time', "150")

0 commit comments

Comments
 (0)