Skip to content

Commit 22c6031

Browse files
baileympearsontadjik1nbbeeken
authored
feat(NODE-7142): Exponential backoff and jitter in retry loops (#4871)
Co-authored-by: Sergey Zelenov <sergey.zelenov@mongodb.com> Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
1 parent e5a85d0 commit 22c6031

30 files changed

+12791
-64
lines changed

src/cmap/connect.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
224224
compression: string[];
225225
saslSupportedMechs?: string;
226226
loadBalanced?: boolean;
227+
backpressure: true;
227228
}
228229

229230
/**
@@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(
241242

242243
const handshakeDoc: HandshakeDocument = {
243244
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
245+
backpressure: true,
244246
helloOk: true,
245247
client: clientMetadata,
246248
compression: compressors

src/cmap/connection.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
586586
this.throwIfAborted();
587587
}
588588
} catch (error) {
589+
if (options.session != null && !(error instanceof MongoServerError)) {
590+
updateSessionFromResponse(options.session, MongoDBResponse.empty);
591+
}
589592
if (this.shouldEmitAndLogCommand) {
590593
this.emitAndLogCommand(
591594
this.monitorCommands,

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ export type {
621621
TimeoutContext,
622622
TimeoutContextOptions
623623
} from './timeout';
624+
export type { TokenBucket } from './token_bucket';
624625
export type { Transaction, TransactionOptions, TxnState } from './transactions';
625626
export type {
626627
BufferPool,

src/operations/execute_operation.ts

Lines changed: 130 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -26,9 +28,17 @@ import {
2628
import type { Topology } from '../sdam/topology';
2729
import type { ClientSession } from '../sessions';
2830
import { TimeoutContext } from '../timeout';
31+
import {
32+
BASE_BACKOFF_MS,
33+
MAX_BACKOFF_MS,
34+
MAX_RETRIES,
35+
RETRY_COST,
36+
RETRY_TOKEN_RETURN_RATE
37+
} from '../token_bucket';
2938
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
3039
import { AggregateOperation } from './aggregate';
3140
import { AbstractOperation, Aspect } from './operation';
41+
import { RunCommandOperation } from './run_command';
3242

3343
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3444
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
5060
* The expectation is that this function:
5161
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
5262
* - Creates a session if none is provided and cleans up the session it creates
53-
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
63+
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
5464
*
5565
* @typeParam T - The operation's type
5666
* @typeParam TResult - The type of the operation's result, calculated from T
@@ -120,7 +130,7 @@ export async function executeOperation<
120130
});
121131

122132
try {
123-
return await tryOperation(operation, {
133+
return await executeOperationWithRetries(operation, {
124134
topology,
125135
timeoutContext,
126136
session,
@@ -183,8 +193,11 @@ type RetryOptions = {
183193
* @typeParam TResult - The type of the operation's result, calculated from T
184194
*
185195
* @param operation - The operation to execute
186-
* */
187-
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
196+
*/
197+
async function executeOperationWithRetries<
198+
T extends AbstractOperation,
199+
TResult = ResultTypeFromOperation<T>
200+
>(
188201
operation: T,
189202
{ topology, timeoutContext, session, readPreference }: RetryOptions
190203
): Promise<TResult> {
@@ -233,33 +246,94 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
233246
session.incrementTransactionNumber();
234247
}
235248

236-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
237-
let previousOperationError: MongoError | undefined;
238249
const deprioritizedServers = new DeprioritizedServers();
239250

240-
for (let tries = 0; tries < maxTries; tries++) {
241-
if (previousOperationError) {
242-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
251+
let maxAttempts =
252+
typeof operation.maxAttempts === 'number'
253+
? operation.maxAttempts
254+
: willRetry
255+
? timeoutContext.csotEnabled()
256+
? Infinity
257+
: 2
258+
: 1;
259+
260+
let error: MongoError | null = null;
261+
262+
for (let attempt = 0; attempt < maxAttempts; attempt++) {
263+
operation.attemptsMade = attempt + 1;
264+
operation.server = server;
265+
266+
try {
267+
try {
268+
const result = await server.command(operation, timeoutContext);
269+
topology.tokenBucket.deposit(
270+
attempt > 0
271+
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
272+
: RETRY_TOKEN_RETURN_RATE // otherwise
273+
);
274+
return operation.handleOk(result);
275+
} catch (error) {
276+
return operation.handleError(error);
277+
}
278+
} catch (operationError) {
279+
// Should never happen but if it does - propagate the error.
280+
if (!(operationError instanceof MongoError)) throw operationError;
281+
282+
if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
283+
// if a retry attempt fails with a non-overload error, deposit 1 token.
284+
topology.tokenBucket.deposit(RETRY_COST);
285+
}
286+
287+
// Preserve the original error once a write has been performed.
288+
// Only update to the latest error if no writes were performed.
289+
if (error == null) {
290+
error = operationError;
291+
} else {
292+
if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
293+
error = operationError;
294+
}
295+
}
296+
297+
// Reset timeouts
298+
timeoutContext.clear();
299+
300+
if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243301
throw new MongoServerError({
244302
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245303
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
246-
originalError: previousOperationError
304+
originalError: operationError
247305
});
248306
}
249307

250-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
251-
throw previousOperationError;
308+
if (!canRetry(operation, operationError)) {
309+
throw error;
310+
}
311+
312+
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
313+
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
252314
}
253315

254-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
255-
throw previousOperationError;
316+
if (attempt + 1 >= maxAttempts) {
317+
throw error;
318+
}
319+
320+
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
321+
if (!topology.tokenBucket.consume(RETRY_COST)) {
322+
throw error;
323+
}
324+
325+
const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);
326+
327+
// if the backoff would exhaust the CSOT timeout, short-circuit.
328+
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
329+
throw error;
330+
}
256331

257-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
258-
throw previousOperationError;
332+
await setTimeout(backoffMS);
259333
}
260334

261335
if (
262-
previousOperationError instanceof MongoNetworkError &&
336+
operationError instanceof MongoNetworkError &&
263337
operation.hasAspect(Aspect.CURSOR_CREATING) &&
264338
session != null &&
265339
session.isPinned &&
@@ -268,52 +342,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
268342
session.unpin({ force: true, forceClear: true });
269343
}
270344

345+
deprioritizedServers.add(server.description);
346+
271347
server = await topology.selectServer(selector, {
272348
session,
273349
operationName: operation.commandName,
274350
deprioritizedServers,
275351
signal: operation.options.signal
276352
});
277353

278-
if (hasWriteAspect && !supportsRetryableWrites(server)) {
354+
if (
355+
hasWriteAspect &&
356+
!supportsRetryableWrites(server) &&
357+
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
358+
) {
279359
throw new MongoUnexpectedServerResponseError(
280360
'Selected server does not support retryable writes'
281361
);
282362
}
283-
}
284363

285-
operation.server = server;
286-
287-
try {
288-
// If tries > 0 and we are command batching we need to reset the batch.
289-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
364+
// Batched operations must reset the batch before retry,
365+
// otherwise building a command will build the _next_ batch, not the current batch.
366+
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
290367
operation.resetBatch();
291368
}
292-
293-
try {
294-
const result = await server.command(operation, timeoutContext);
295-
return operation.handleOk(result);
296-
} catch (error) {
297-
return operation.handleError(error);
298-
}
299-
} catch (operationError) {
300-
if (!(operationError instanceof MongoError)) throw operationError;
301-
if (
302-
previousOperationError != null &&
303-
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
304-
) {
305-
throw previousOperationError;
306-
}
307-
deprioritizedServers.add(server.description);
308-
previousOperationError = operationError;
309-
310-
// Reset timeouts
311-
timeoutContext.clear();
312369
}
313370
}
314371

315372
throw (
316-
previousOperationError ??
317-
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
373+
error ??
374+
new MongoRuntimeError(
375+
'Should never happen: operation execution loop terminated but no error was recorded.'
376+
)
318377
);
378+
379+
function canRetry(operation: AbstractOperation, error: MongoError) {
380+
// always retryable
381+
if (
382+
error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
383+
error.hasErrorLabel(MongoErrorLabel.RetryableError)
384+
) {
385+
return true;
386+
}
387+
388+
// run command is only retryable if we get retryable overload errors
389+
if (operation instanceof RunCommandOperation) {
390+
return false;
391+
}
392+
393+
// batch operations are only retryable if the batch is retryable
394+
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
395+
return operation.canRetryWrite && isRetryableWriteError(error);
396+
}
397+
398+
return (
399+
(hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) ||
400+
(hasReadAspect && willRetryRead && isRetryableReadError(error))
401+
);
402+
}
319403
}

src/operations/operation.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ export abstract class AbstractOperation<TResult = any> {
6666
/** Specifies the time an operation will run until it throws a timeout error. */
6767
timeoutMS?: number;
6868

69+
/** Used by commitTransaction to share the retry budget across two executeOperation calls. */
70+
maxAttempts?: number;
71+
72+
/** Tracks how many attempts were made in the last executeOperation call. */
73+
attemptsMade: number;
74+
6975
private _session: ClientSession | undefined;
7076

7177
static aspects?: Set<symbol>;
@@ -82,6 +88,8 @@ export abstract class AbstractOperation<TResult = any> {
8288

8389
this.options = options;
8490
this.bypassPinningCheck = !!options.bypassPinningCheck;
91+
92+
this.attemptsMade = 0;
8593
}
8694

8795
/** Must match the first key of the command object sent to the server.

src/sdam/topology.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -207,18 +208,15 @@ export type TopologyEvents = {
207208
* @internal
208209
*/
209210
export class Topology extends TypedEventEmitter<TopologyEvents> {
210-
/** @internal */
211211
s: TopologyPrivate;
212-
/** @internal */
213212
waitQueue: List<ServerSelectionRequest>;
214-
/** @internal */
215213
hello?: Document;
216-
/** @internal */
217214
_type?: string;
218215

216+
tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);
217+
219218
client!: MongoClient;
220219

221-
/** @internal */
222220
private connectionLock?: Promise<Topology>;
223221

224222
/** @event */
@@ -595,7 +593,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
595593
)
596594
);
597595
}
598-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
596+
597+
if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
598+
timeout?.clear();
599+
}
600+
599601
return transaction.server;
600602
}
601603

@@ -666,7 +668,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
666668
throw error;
667669
} finally {
668670
abortListener?.[kDispose]();
669-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
671+
if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
672+
timeout?.clear();
673+
}
670674
}
671675
}
672676
/**

0 commit comments

Comments
 (0)