Skip to content

Commit 0d0e807

Browse files
Replace async-disabling mechanism with retry backoff on refresh failure (#1315)
## Summary Replace the async-disabling mechanism on token refresh failure with a 1-minute retry backoff, allowing the SDK to recover from transient errors without waiting for a full token expiry. ## Why When an asynchronous token refresh failed, the `Refreshable` class set a `_refresh_err` flag that completely disabled async refresh. The only way to clear this flag was through a blocking refresh, which only triggers when the token fully expires. This meant the SDK could not recover from transient refresh failures (e.g. a brief network blip) until the token expired — potentially tens of minutes later — even though the underlying issue may have resolved in seconds. This PR replaces the binary disable flag with a short cooldown: after a failed async refresh, the `_stale_after` threshold is pushed 1 minute into the future so the token appears fresh for a brief backoff period. Once the cooldown elapses the token becomes stale again and a new async refresh is attempted, giving the SDK a chance to recover proactively. ## What changed ### Interface changes None. ### Behavioral changes - **Async refresh retry on failure** — Previously, a failed async refresh disabled all future async attempts until a blocking refresh on expiry. Now, the SDK waits 1 minute (`_ASYNC_REFRESH_RETRY_BACKOFF`) and then retries the async refresh. This makes token refresh more resilient to transient errors. - **Late async result guard** — When a slow async refresh completes after a blocking refresh already obtained a newer token, the stale async result is now discarded instead of overwriting the fresher token. ### Internal changes - **`_stale_after` replaces `_stale_duration`** — Staleness is now tracked as an absolute timestamp (`_stale_after`) instead of a relative `timedelta` (`_stale_duration`). This simplifies `_token_state()` to a direct comparison rather than computing `expiry - now` and comparing against a duration. - **`_handle_failed_async_refresh()`** — New method that advances `_stale_after` by the backoff period, replacing the `_refresh_err` flag. - **`_now()` helper** — Centralises "current time" so that naive and timezone-aware `datetime` objects from different token sources are compared consistently. - **`_use_dynamic_stale_duration` renamed to `_use_legacy_stale_duration`** — Inverted boolean to clarify intent: the legacy path is the one where callers supply an explicit `stale_duration`. - **`_MockRefreshable.refresh()` no longer mutates `self._token`** — The mock now returns the token without setting `self._token` as a side effect, avoiding a data race between async and blocking refresh threads. The production code's `_update_token` handles storage. ## How is this tested? Tests are rewritten to be fully deterministic by introducing a `_ManualExecutor` that replaces the real `ThreadPoolExecutor`. Async refreshes are queued but only execute when `executor.run_all()` is called, eliminating all `time.sleep()` calls and thread synchronization from async-path tests. This makes the test suite faster and removes flakiness from timing-dependent assertions. New test cases: - `test_repeated_calls_during_async_failure_cooldown_do_not_refresh` — verifies that calls during the cooldown period do not trigger additional async refreshes. - `test_call_after_async_failure_cooldown_refreshes_token_async` — verifies that a call after the cooldown elapses triggers a new async refresh that succeeds. - `test_late_async_refresh_does_not_overwrite_blocking_refresh` — verifies that a slow async refresh completing after a blocking refresh does not overwrite the newer token. - `test_stale_after_is_recomputed_after_blocking_refresh` — verifies that `_stale_after` is recomputed from the refreshed token after a blocking refresh. - `test_stale_after_computation` — verifies that `_stale_after` is computed correctly for both the dynamic and legacy stale-duration paths. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 61089f5 commit 0d0e807

3 files changed

Lines changed: 305 additions & 190 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@
1111
### Documentation
1212

1313
### Internal Changes
14+
* Replace the async-disabling mechanism on token refresh failure with a 1-minute retry backoff. Previously, a single failed async refresh would disable proactive token renewal until the token expired. Now, the SDK waits a short cooldown period and retries, improving resilience to transient errors.
1415

1516
### API Changes

databricks/sdk/oauth.py

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,11 @@ class Refreshable(TokenSource):
248248

249249
_EXECUTOR = None
250250
_EXECUTOR_LOCK = threading.Lock()
251-
# Legacy default duration for the stale period. This value is chosen to cover the
252-
# maximum monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes).
253-
_DEFAULT_STALE_DURATION = timedelta(minutes=5)
254251
# Default maximum stale duration. Chosen to cover the maximum monthly downtime
255252
# allowed by a 99.99% uptime SLA (~4.38 minutes) with generous overhead guarantees
256253
_MAX_STALE_DURATION = timedelta(minutes=20)
254+
# Backoff time after an async refresh failure before trying another one.
255+
_ASYNC_REFRESH_RETRY_BACKOFF = timedelta(minutes=1)
257256

258257
@classmethod
259258
def _get_executor(cls):
@@ -272,15 +271,25 @@ def __init__(
272271
stale_duration: Optional[timedelta] = None,
273272
):
274273
# Config properties
275-
self._use_dynamic_stale_duration = stale_duration is None
274+
self._use_legacy_stale_duration = stale_duration is not None
275+
# Only read on the legacy path (when _use_legacy_stale_duration is True).
276276
self._stale_duration = stale_duration if stale_duration is not None else timedelta(seconds=0)
277277
self._disable_async = disable_async
278278
# Lock
279279
self._lock = threading.Lock()
280280
# Non Thread safe properties. They should be accessed only when protected by the lock above.
281+
self._stale_after: Optional[datetime] = None
282+
self._token_generation: int = 0
281283
self._update_token(token or Token(""))
282284
self._is_refreshing = False
283-
self._refresh_err = False
285+
286+
def _now(self) -> datetime:
287+
"""Return the current time, matching the tz-awareness of the cached token."""
288+
if self._token.expiry:
289+
return datetime.now(tz=self._token.expiry.tzinfo)
290+
if self._stale_after:
291+
return datetime.now(tz=self._stale_after.tzinfo)
292+
return datetime.now()
284293

285294
def _update_token(self, token: Token) -> None:
286295
"""Stores the new token and pre-computes the stale threshold.
@@ -290,17 +299,28 @@ def _update_token(self, token: Token) -> None:
290299
291300
This ensures short-lived tokens (e.g. FastPath with 10-minute TTL) get a
292301
proportionally smaller stale window, while standard OAuth tokens (≥1 hour TTL)
293-
use the full cap of _DEFAULT_STALE_DURATION.
302+
use the full cap of _MAX_STALE_DURATION.
294303
"""
295304
self._token = token
305+
self._token_generation += 1
306+
self._stale_after = None
296307

297-
if self._use_dynamic_stale_duration and self._token.expiry:
298-
ttl = self._token.expiry - datetime.now()
299-
300-
if ttl < timedelta(seconds=0):
301-
self._stale_duration = timedelta(seconds=0)
308+
if self._token.expiry:
309+
if self._use_legacy_stale_duration:
310+
self._stale_after = self._token.expiry - self._stale_duration
302311
else:
303-
self._stale_duration = min(ttl // 2, self._MAX_STALE_DURATION)
312+
ttl = self._token.expiry - self._now()
313+
stale_duration = max(timedelta(seconds=0), min(ttl // 2, self._MAX_STALE_DURATION))
314+
self._stale_after = self._token.expiry - stale_duration
315+
316+
def _handle_failed_async_refresh(self) -> None:
317+
"""Pushes _stale_after forward by the retry backoff, making the token appear fresh temporarily.
318+
319+
This may set _stale_after past the token's expiry; that is safe because
320+
_token_state() checks expiry before staleness.
321+
"""
322+
if self._stale_after:
323+
self._stale_after = self._now() + self._ASYNC_REFRESH_RETRY_BACKOFF
304324

305325
# This is the main entry point for the Token. Do not access the token
306326
# using any of the internal functions.
@@ -334,19 +354,16 @@ def _token_state(self) -> _TokenState:
334354
if not self._token.expiry:
335355
return _TokenState.FRESH
336356

337-
lifespan = self._token.expiry - datetime.now()
338-
if lifespan < timedelta(seconds=0):
357+
now = self._now()
358+
if self._token.expiry < now:
339359
return _TokenState.EXPIRED
340-
if lifespan < self._stale_duration:
360+
if self._stale_after and self._stale_after < now:
341361
return _TokenState.STALE
342362
return _TokenState.FRESH
343363

344364
def _blocking_token(self) -> Token:
345365
"""Returns a token, blocking if necessary to refresh it."""
346366
state = self._token_state()
347-
# This is important to recover from potential previous failed attempts
348-
# to refresh the token asynchronously.
349-
self._refresh_err = False
350367
self._is_refreshing = False
351368

352369
# It's possible that the token got refreshed (either by a _blocking_refresh or
@@ -360,28 +377,31 @@ def _blocking_token(self) -> Token:
360377

361378
def _trigger_async_refresh(self):
362379
"""Starts an asynchronous refresh if none is in progress."""
380+
gen_at_submit = self._token_generation
363381

364382
def _refresh_internal():
365383
new_token = None
366384
try:
367385
new_token = self.refresh()
368386
except Exception as e:
369387
# This happens on a thread, so we don't want to propagate the error.
370-
# Instead, if there is no new_token for any reason, we will disable async refresh below
371-
# But we will do it inside the lock.
388+
# Instead, if there is no new_token for any reason, we apply a retry
389+
# backoff below so the token appears fresh for a short cooldown period.
372390
logger.warning(f"Tried to refresh token asynchronously, but failed: {e}")
373391

374392
with self._lock:
375-
if new_token is not None:
393+
if self._token_generation != gen_at_submit:
394+
logger.debug("Async refresh completed but token was already updated; discarding result.")
395+
elif new_token is not None:
376396
self._update_token(new_token)
377397
else:
378-
self._refresh_err = True
398+
self._handle_failed_async_refresh()
379399
self._is_refreshing = False
380400

381401
# The token may have been refreshed by another thread.
382402
if self._token_state() == _TokenState.FRESH:
383403
return
384-
if not self._is_refreshing and not self._refresh_err:
404+
if not self._is_refreshing:
385405
self._is_refreshing = True
386406
Refreshable._get_executor().submit(_refresh_internal)
387407

0 commit comments

Comments
 (0)