Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 109 additions & 41 deletions packages/gotrue/lib/src/gotrue_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,26 @@ class GoTrueClient {

Timer? _autoRefreshTicker;

/// Deduplicates concurrent token refresh requests.
///
/// When multiple callers detect an expired token simultaneously (e.g. several
/// in-flight API requests all fail with 401), only the first creates a new
/// refresh network request. Subsequent callers receive the same [Future] and
/// wait for the single in-flight request to complete. The completer is cleared
/// after resolution so the next expiry cycle starts a fresh request.
Completer<AuthResponse>? _refreshTokenCompleter;
/// Monotonically increasing counter, incremented on every session write.
/// Used inside [_executeRefresh] to detect that the session changed while
/// a network request was in-flight, so the stale refresh result can be
/// discarded instead of overwriting a newer session.
int _sessionVersion = 0;

/// Serial future chain for refresh operations. Each call to
/// [_callRefreshToken] appends via `.then()` so that refreshes with
/// *different* tokens execute sequentially rather than overlapping.
/// Same-token calls still de-duplicate via [_pendingRefreshes].
Future<void> _pendingRefreshOperation = Future.value();

/// Tracks all pending (queued or in-flight) refreshes keyed by token.
/// Concurrent calls with the same token return the existing
/// [Completer.future] instead of enqueuing a duplicate refresh.
final Map<String, Completer<AuthResponse>> _pendingRefreshes = {};

/// Set by [dispose] to prevent [_executeRefresh] from mutating state
/// or emitting events on closed stream controllers.
bool _isDisposed = false;

JWKSet? _jwks;
DateTime? _jwksCachedAt;
Expand Down Expand Up @@ -779,6 +791,7 @@ class GoTrueClient {
);
final userResponse = UserResponse.fromJson(response);

_sessionVersion++;
_currentSession = currentSession?.copyWith(user: userResponse.user);
notifyAllSubscribers(AuthChangeEvent.userUpdated);

Expand Down Expand Up @@ -1095,6 +1108,7 @@ class GoTrueClient {
throw notifyException(AuthException('Initial session is missing data.'));
}

_sessionVersion++;
_currentSession = session;
notifyAllSubscribers(AuthChangeEvent.initialSession);
}
Expand Down Expand Up @@ -1162,12 +1176,16 @@ class GoTrueClient {
Future<void> _autoRefreshTokenTick() async {
try {
final now = DateTime.now();
final refreshToken = _currentSession?.refreshToken;

// Read the session once to avoid TOCTOU: both refreshToken and
// expiresAt must come from the same snapshot.
final session = _currentSession;
final refreshToken = session?.refreshToken;
if (refreshToken == null) {
return;
}

final expiresAt = _currentSession?.expiresAt;
final expiresAt = session?.expiresAt;
if (expiresAt == null) {
return;
}
Expand Down Expand Up @@ -1281,12 +1299,14 @@ class GoTrueClient {

/// set currentSession and currentUser
void _saveSession(Session session) {
_sessionVersion++;
_log.finest('Saving session: $session');
_log.fine('Saving session');
_currentSession = session;
}

void _removeSession() {
_sessionVersion++;
_log.fine('Removing session');
_currentSession = null;
Comment thread
brunovsiqueira marked this conversation as resolved.
}
Expand Down Expand Up @@ -1347,68 +1367,116 @@ class GoTrueClient {

@mustCallSuper
void dispose() {
_isDisposed = true;
_onAuthStateChangeController.close();
_onAuthStateChangeControllerSync.close();
_broadcastChannel?.close();
_broadcastChannelSubscription?.cancel();
final completer = _refreshTokenCompleter;
if (completer != null && !completer.isCompleted) {
completer.completeError(AuthException('Disposed'));
for (final completer in _pendingRefreshes.values) {
if (!completer.isCompleted) {
completer.completeError(AuthException('Disposed'));
}
}
_pendingRefreshes.clear();
_autoRefreshTicker?.cancel();
}

/// Generates a new JWT.
///
/// To prevent multiple simultaneous requests it catches an already ongoing request by using the global [_refreshTokenCompleter].
/// If that's not null and not completed it returns the future of the ongoing request.
Future<AuthResponse> _callRefreshToken(String refreshToken) async {
// Refreshing is already in progress
if (_refreshTokenCompleter != null) {
_log.finer("Don't call refresh token, already in progress");
return _refreshTokenCompleter!.future;
/// Concurrent calls with the **same** [refreshToken] are de-duplicated:
/// only the first enqueues a network request; subsequent callers receive
/// the same [Future]. Calls with **different** tokens are serialised via
/// [_pendingRefreshOperation] so they never overlap.
///
/// After the network round-trip, the result is only applied (session
/// saved, [AuthChangeEvent.tokenRefreshed] emitted) when
/// [_sessionVersion] has not changed — meaning no sign-in, sign-out,
/// or other session mutation occurred while the request was in-flight.
Future<AuthResponse> _callRefreshToken(String refreshToken) {
// De-duplicate: return existing future if this token is already
// pending (queued or in-flight).
final existing = _pendingRefreshes[refreshToken];
if (existing != null) {
_log.finer('Refresh already pending for this token');
return existing.future;
}

try {
_refreshTokenCompleter = Completer<AuthResponse>();
final completer = Completer<AuthResponse>();
completer.future.ignore();
_pendingRefreshes[refreshToken] = completer;

// Catch any error in case nobody awaits the future
_refreshTokenCompleter!.future.then(
(_) => null,
onError: (_, __) => null,
);
// Chain onto the serial queue so different-token refreshes
// never overlap.
_pendingRefreshOperation = _pendingRefreshOperation
.then((_) => _executeRefresh(refreshToken, completer))
.catchError((_) {});

return completer.future;
}

/// Executes a single token refresh. Called from the
/// [_pendingRefreshOperation] chain — never directly.
Future<void> _executeRefresh(
String refreshToken,
Completer<AuthResponse> completer,
) async {
if (_isDisposed) return;

final versionBeforeRefresh = _sessionVersion;

try {
_log.fine('Refresh access token');

final data = await _refreshAccessToken(refreshToken);

final session = data.session;
if (_isDisposed) return;

final session = data.session;
if (session == null) {
throw AuthSessionMissingException();
}

// Guard: if the session was mutated while we were awaiting
// the network request (e.g. a concurrent signIn or signOut),
// discard this result to avoid overwriting the newer session.
if (_sessionVersion != versionBeforeRefresh) {
_log.fine(
'Session changed during refresh '
'(version $versionBeforeRefresh → $_sessionVersion). '
'Discarding stale refresh result.',
);
if (!completer.isCompleted) completer.complete(data);
return;
}

_saveSession(session);
notifyAllSubscribers(AuthChangeEvent.tokenRefreshed);

_refreshTokenCompleter?.complete(data);
return data;
if (!completer.isCompleted) completer.complete(data);
} on AuthException catch (error, stack) {
if (error is! AuthRetryableFetchException) {
_removeSession();
notifyAllSubscribers(AuthChangeEvent.signedOut);
} else {
// Only remove the session if it hasn't been replaced
// while we were refreshing — otherwise we'd sign out a
// user who just signed in.
if (!_isDisposed && _sessionVersion == versionBeforeRefresh) {
_removeSession();
notifyAllSubscribers(AuthChangeEvent.signedOut);
}
} else if (!_isDisposed) {
notifyException(error, stack);
}

_refreshTokenCompleter?.completeError(error);

rethrow;
if (!completer.isCompleted) {
completer.completeError(error, stack);
}
} catch (error, stack) {
_refreshTokenCompleter?.completeError(error);
notifyException(error, stack);
rethrow;
if (!completer.isCompleted) {
completer.completeError(error, stack);
}
if (!_isDisposed) {
notifyException(error, stack);
}
} finally {
_refreshTokenCompleter = null;
_pendingRefreshes.remove(refreshToken);
}
}

Expand Down
Loading
Loading