Skip to content

Commit daaefd8

Browse files
committed
feat(NODE-7142): make token bucket optional in client backpressure
1 parent 22c6031 commit daaefd8

File tree

6 files changed

+171
-13
lines changed

6 files changed

+171
-13
lines changed

src/connection_string.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,10 @@ interface OptionDescriptor {
639639
}
640640

641641
export const OPTIONS = {
642+
adaptiveRetries: {
643+
default: false,
644+
type: 'boolean'
645+
},
642646
appName: {
643647
type: 'string'
644648
},

src/mongo_client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
229229
retryReads?: boolean;
230230
/** Enable retryable writes. */
231231
retryWrites?: boolean;
232+
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
233+
adaptiveRetries?: boolean;
232234
/** Allow a driver to force a Single topology type with a connection string containing one host */
233235
directConnection?: boolean;
234236
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
@@ -1041,6 +1043,7 @@ export interface MongoOptions
10411043
extends Required<
10421044
Pick<
10431045
MongoClientOptions,
1046+
| 'adaptiveRetries'
10441047
| 'autoEncryption'
10451048
| 'connectTimeoutMS'
10461049
| 'directConnection'

src/operations/execute_operation.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,13 @@ async function executeOperationWithRetries<
266266
try {
267267
try {
268268
const result = await server.command(operation, timeoutContext);
269-
topology.tokenBucket.deposit(
270-
attempt > 0
271-
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
272-
: RETRY_TOKEN_RETURN_RATE // otherwise
273-
);
269+
if (topology.s.options.adaptiveRetries) {
270+
topology.tokenBucket.deposit(
271+
attempt > 0
272+
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
273+
: RETRY_TOKEN_RETURN_RATE // otherwise
274+
);
275+
}
274276
return operation.handleOk(result);
275277
} catch (error) {
276278
return operation.handleError(error);
@@ -279,7 +281,11 @@ async function executeOperationWithRetries<
279281
// Should never happen but if it does - propagate the error.
280282
if (!(operationError instanceof MongoError)) throw operationError;
281283

282-
if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
284+
if (
285+
topology.s.options.adaptiveRetries &&
286+
attempt > 0 &&
287+
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
288+
) {
283289
// if a retry attempt fails with a non-overload error, deposit 1 token.
284290
topology.tokenBucket.deposit(RETRY_COST);
285291
}
@@ -318,7 +324,7 @@ async function executeOperationWithRetries<
318324
}
319325

320326
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
321-
if (!topology.tokenBucket.consume(RETRY_COST)) {
327+
if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
322328
throw error;
323329
}
324330

src/sdam/topology.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
146146
hosts: HostAddress[];
147147
retryWrites: boolean;
148148
retryReads: boolean;
149+
adaptiveRetries: boolean;
149150
/** How long to block for server selection before throwing an error */
150151
serverSelectionTimeoutMS: number;
151152
/** The name of the replica set to connect to */

test/integration/client-backpressure/client-backpressure.prose.test.ts

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import * as sinon from 'sinon';
44
import {
55
type Collection,
66
INITIAL_TOKEN_BUCKET_SIZE,
7+
MAX_RETRIES,
78
type MongoClient,
89
MongoServerError
910
} from '../../mongodb';
1011
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
12+
import { filterForCommands } from '../shared';
1113

1214
describe('Client Backpressure (Prose)', function () {
1315
let client: MongoClient;
@@ -64,18 +66,141 @@ describe('Client Backpressure (Prose)', function () {
6466
}
6567
);
6668

67-
it('Test 2: Token Bucket capacity is Enforced', async () => {
68-
// 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity
69-
// is DEFAULT_RETRY_TOKEN_CAPACITY.
69+
it('Test 2: Token Bucket capacity is Enforced', async function () {
70+
// 1. Let client be a MongoClient with adaptiveRetries=True.
71+
const client = this.configuration.newClient({
72+
adaptiveRetries: true
73+
});
74+
await client.connect();
75+
76+
// 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY.
7077
const tokenBucket = client.topology.tokenBucket;
7178
expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE);
7279
expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE);
7380

74-
// 3. Execute a successful ping command.
81+
// 3. Using client, execute a successful ping command.
7582
await client.db('admin').command({ ping: 1 });
7683

77-
// 4. Assert that the successful command did not increase the number of tokens in the bucket
78-
// above DEFAULT_RETRY_TOKEN_CAPACITY.
84+
// 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY.
7985
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);
86+
87+
await client.close();
8088
});
89+
90+
it(
91+
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
92+
{
93+
requires: {
94+
mongodb: '>=4.4'
95+
}
96+
},
97+
async function () {
98+
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
99+
const client = this.configuration.newClient({
100+
monitorCommands: true
101+
});
102+
await client.connect();
103+
104+
// 2. Let `coll` be a collection.
105+
const collection = client.db('foo').collection('bar');
106+
const commandsStarted = [];
107+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
108+
109+
/*
110+
* 3. Configure the following failpoint:
111+
{
112+
configureFailPoint: 'failCommand',
113+
mode: 'alwaysOn',
114+
data: {
115+
failCommands: ['find'],
116+
errorCode: 462, // IngressRequestRateLimitExceeded
117+
errorLabels: ['SystemOverloadedError', 'RetryableError']
118+
}
119+
}
120+
* */
121+
await configureFailPoint(this.configuration, {
122+
configureFailPoint: 'failCommand',
123+
mode: 'alwaysOn',
124+
data: {
125+
failCommands: ['find'],
126+
errorCode: 462,
127+
errorLabels: ['RetryableError', 'SystemOverloadedError']
128+
}
129+
});
130+
131+
// 4. Perform a find operation with `coll` that fails.
132+
const error = await collection.findOne({}).catch(e => e);
133+
134+
// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
135+
expect(error).to.be.instanceof(MongoServerError);
136+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
137+
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;
138+
139+
// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
140+
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);
141+
142+
await client.close();
143+
}
144+
);
145+
146+
it(
147+
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
148+
{
149+
requires: {
150+
mongodb: '>=4.4'
151+
}
152+
},
153+
async function () {
154+
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
155+
const client = this.configuration.newClient({
156+
adaptiveRetries: true,
157+
monitorCommands: true
158+
});
159+
await client.connect();
160+
161+
// 2. Set `client`'s retry token bucket to have 2 tokens.
162+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
163+
client.topology!.tokenBucket['budget'] = 2;
164+
165+
// 3. Let `coll` be a collection.
166+
const collection = client.db('foo').collection('bar');
167+
const commandsStarted = [];
168+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
169+
170+
/*
171+
* 4. Configure the following failpoint:
172+
{
173+
configureFailPoint: 'failCommand',
174+
mode: {times: 3},
175+
data: {
176+
failCommands: ['find'],
177+
errorCode: 462, // IngressRequestRateLimitExceeded
178+
errorLabels: ['SystemOverloadedError', 'RetryableError']
179+
}
180+
}
181+
* */
182+
await configureFailPoint(this.configuration, {
183+
configureFailPoint: 'failCommand',
184+
mode: { times: 3 },
185+
data: {
186+
failCommands: ['find'],
187+
errorCode: 462,
188+
errorLabels: ['RetryableError', 'SystemOverloadedError']
189+
}
190+
});
191+
192+
// 5. Perform a find operation with `coll` that fails.
193+
const error = await collection.findOne({}).catch(e => e);
194+
195+
// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
196+
expect(error).to.be.instanceof(MongoServerError);
197+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
198+
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;
199+
200+
// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
201+
expect(commandsStarted).to.have.length(3);
202+
203+
await client.close();
204+
}
205+
);
81206
});

test/unit/connection_string.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,4 +889,23 @@ describe('Connection String', function () {
889889
}
890890
});
891891
});
892+
893+
context('when adaptiveRetries is set', function () {
894+
it('defaults to false', function () {
895+
const options = parseOptions('mongodb://localhost:27017');
896+
expect(options.adaptiveRetries).to.equal(false);
897+
});
898+
899+
it('can be enabled via connection string', function () {
900+
const options = parseOptions('mongodb://localhost:27017?adaptiveRetries=true');
901+
expect(options.adaptiveRetries).to.equal(true);
902+
});
903+
904+
it('can be enabled via client options', function () {
905+
const options = parseOptions('mongodb://localhost:27017', {
906+
adaptiveRetries: true
907+
});
908+
expect(options.adaptiveRetries).to.equal(true);
909+
});
910+
});
892911
});

0 commit comments

Comments
 (0)