Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
### Documentation

### Internal Changes
* Add retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds.

### API Changes
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,23 @@ private enum TokenState {
// monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood
// that the token is refreshed asynchronously if the auth server is down.
private static final Duration MAX_STALE_DURATION = Duration.ofMinutes(20);
// Delay before another async refresh may be attempted after an async refresh failure.
private static final Duration ASYNC_REFRESH_RETRY_BACKOFF = Duration.ofMinutes(1);
// Default additional buffer before expiry to consider a token as expired.
// This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds
// of expiry.
private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40);

// The token source to use for refreshing the token.
// Underlying token source used to fetch replacement tokens.
private final TokenSource tokenSource;
// Whether asynchronous refresh is enabled.
private boolean asyncDisabled = false;
// The legacy duration before expiry to consider a token as 'stale'.
private final Duration staticStaleDuration;
// Whether to use the dynamic stale duration computation or defer to the legacy duration.
private final boolean useDynamicStaleDuration;
// The dynamically computed duration before expiry to consider a token as 'stale'.
private volatile Duration dynamicStaleDuration;
private final boolean useLegacyStaleDuration;
// The earliest time at which the cached token should be considered stale.
private volatile Instant staleAfter;
// Additional buffer before expiry to consider a token as expired.
private final Duration expiryBuffer;
// Clock supplier for current time.
Expand All @@ -62,23 +64,16 @@ private enum TokenState {
protected volatile Token token;
// Whether a refresh is currently in progress (for async refresh).
private boolean refreshInProgress = false;
// Whether the last refresh attempt succeeded.
private boolean lastRefreshSucceeded = true;

private CachedTokenSource(Builder builder) {
this.tokenSource = builder.tokenSource;
this.asyncDisabled = builder.asyncDisabled;
this.staticStaleDuration = builder.staleDuration;
this.useDynamicStaleDuration = builder.useDynamicStaleDuration;
this.useLegacyStaleDuration = builder.useLegacyStaleDuration;
this.expiryBuffer = builder.expiryBuffer;
this.clockSupplier = builder.clockSupplier;
this.token = builder.token;

if (this.useDynamicStaleDuration && this.token != null) {
this.dynamicStaleDuration = computeStaleDuration(this.token);
} else {
this.dynamicStaleDuration = Duration.ofMinutes(0);
}
this.updateToken(builder.token);
}

/**
Expand All @@ -91,7 +86,7 @@ public static class Builder {
private final TokenSource tokenSource;
private boolean asyncDisabled = false;
private Duration staleDuration = DEFAULT_STALE_DURATION;
private boolean useDynamicStaleDuration = true;
private boolean useLegacyStaleDuration = false;
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
private ClockSupplier clockSupplier = new UtcClockSupplier();
private Token token;
Expand Down Expand Up @@ -139,15 +134,18 @@ public Builder setAsyncDisabled(boolean asyncDisabled) {
* Sets the duration before token expiry at which the token is considered stale.
*
* <p>When asynchronous refresh is enabled, tokens that are stale but not yet expired will
* trigger a background refresh while continuing to serve the current token.
* trigger a background refresh while continuing to serve the current token. Calling this method
* opts into the legacy fixed stale-duration behavior instead of the default dynamic stale
* computation, preserving backward compatibility for callers that already provide a custom
* stale duration.
*
* @param staleDuration The duration before expiry to consider a token stale. Must be greater
* than the expiry buffer duration.
* @return This builder instance for method chaining.
*/
public Builder setStaleDuration(Duration staleDuration) {
this.staleDuration = staleDuration;
this.useDynamicStaleDuration = false;
this.useLegacyStaleDuration = true;
return this;
}

Expand Down Expand Up @@ -190,6 +188,69 @@ public CachedTokenSource build() {
}
}

/**
* Replaces the cached token and recomputes the time after which it should be treated as stale.
*
* <p>Legacy mode uses the configured fixed stale duration. Dynamic mode derives the stale window
* from the token's remaining TTL and caps it at {@link #MAX_STALE_DURATION}. The stale threshold
* is written before the volatile token write so readers that observe the new token also observe
* the matching {@code staleAfter} value.
*
* @param t The token to cache. May be null.
*/
private void updateToken(Token t) {
if (t == null || t.getExpiry() == null) {
this.staleAfter = null;
this.token = t;
return;
}

if (this.useLegacyStaleDuration) {
this.staleAfter = t.getExpiry().minus(staticStaleDuration);
} else {
Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
Duration staleDuration = ttl.dividedBy(2);
if (staleDuration.compareTo(MAX_STALE_DURATION) > 0) {
staleDuration = MAX_STALE_DURATION;
}
if (staleDuration.compareTo(Duration.ZERO) <= 0) {
staleDuration = Duration.ZERO;
}

this.staleAfter = t.getExpiry().minus(staleDuration);
}

// Publish the token after staleAfter so readers that observe the new token also observe the
// stale threshold computed for that token.
this.token = t;
}

/**
* Delays the next async refresh attempt after an async refresh failure.
*
* <p>The cached token remains usable until it becomes expired. Moving {@code staleAfter} into the
* future prevents callers from immediately retrying async refresh on every stale read while the
* auth service is unhealthy.
*/
private void handleFailedAsyncRefresh() {
if (this.staleAfter != null) {
Instant now = Instant.now(clockSupplier.getClock());
this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF);
}
}

/**
* Returns {@code true} when the currently cached token has a later expiry than {@code candidate},
* meaning the candidate should be discarded. This prevents an async refresh that was started
* before a blocking refresh from overwriting the newer token obtained by the blocking path.
*/
private boolean cachedTokenIsNewer(Token candidate) {
return token != null
&& token.getExpiry() != null
&& candidate.getExpiry() != null
&& token.getExpiry().isAfter(candidate.getExpiry());
}

/**
* Gets the current token, refreshing if necessary. If async refresh is enabled, may return a
* stale token while a refresh is in progress.
Expand All @@ -206,21 +267,6 @@ public Token getToken() {
return getTokenAsync();
}

private Duration computeStaleDuration(Token t) {
if (t.getExpiry() == null) {
return Duration.ZERO; // Tokens with no expiry are considered permanent.
}

Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());

if (ttl.compareTo(Duration.ZERO) <= 0) {
return Duration.ZERO;
}

Duration halfTtl = ttl.dividedBy(2);
return halfTtl.compareTo(MAX_STALE_DURATION) > 0 ? MAX_STALE_DURATION : halfTtl;
}

/**
* Determine the state of the current token (fresh, stale, or expired).
*
Expand All @@ -234,12 +280,11 @@ protected TokenState getTokenState(Token t) {
return TokenState.FRESH; // Tokens with no expiry are considered permanent.
}

Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
if (lifeTime.compareTo(expiryBuffer) <= 0) {
Instant now = Instant.now(clockSupplier.getClock());
if (now.isAfter(t.getExpiry().minus(expiryBuffer))) {
return TokenState.EXPIRED;
}
Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration;
if (lifeTime.compareTo(staleDuration) <= 0) {
if (now.isAfter(staleAfter)) {
return TokenState.STALE;
}
return TokenState.FRESH;
Expand All @@ -265,23 +310,15 @@ protected Token getTokenBlocking() {
if (getTokenState(token) != TokenState.EXPIRED) {
return token;
}
lastRefreshSucceeded = false;
Token newToken;
try {
newToken = tokenSource.getToken();
} catch (Exception e) {
logger.error("Failed to refresh token synchronously", e);
throw e;
}
lastRefreshSucceeded = true;

// Write dynamicStaleDuration before publishing the new token via the volatile write,
// so unsynchronized readers that see the new token are guaranteed to also see the
// updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
updateToken(newToken);
return token;
}
}
Expand Down Expand Up @@ -318,31 +355,28 @@ protected Token getTokenAsync() {
private synchronized void triggerAsyncRefresh() {
// Check token state again inside the synchronized block to avoid triggering a refresh if
// another thread updated the token in the meantime.
if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) {
refreshInProgress = true;
CompletableFuture.runAsync(
() -> {
try {
// Attempt to refresh the token in the background.
Token newToken = tokenSource.getToken();
synchronized (this) {
// Write dynamicStaleDuration before publishing the new token via the volatile
// write, so unsynchronized readers that see the new token are guaranteed to also
// see the updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
refreshInProgress = false;
}
} catch (Exception e) {
synchronized (this) {
lastRefreshSucceeded = false;
refreshInProgress = false;
logger.error("Asynchronous token refresh failed", e);
if (refreshInProgress || getTokenState(token) != TokenState.STALE) {
return;
}

refreshInProgress = true;
CompletableFuture.runAsync(
() -> {
try {
Token newToken = tokenSource.getToken();
synchronized (this) {
if (newToken != null && !cachedTokenIsNewer(newToken)) {
updateToken(newToken);
}
refreshInProgress = false;
}
});
}
} catch (Exception e) {
synchronized (this) {
handleFailedAsyncRefresh();
refreshInProgress = false;
logger.error("Asynchronous token refresh failed", e);
}
}
});
}
}
Loading
Loading