Skip to content

Commit 4fb0a0a

Browse files
tadjik1nbbeeken
andauthored
feat(NODE-7467): make token bucket optional in client backpressure (#4878)
Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
1 parent 87a3465 commit 4fb0a0a

File tree

9 files changed

+261
-29
lines changed

9 files changed

+261
-29
lines changed

src/connection_string.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,10 @@ interface OptionDescriptor {
639639
}
640640

641641
export const OPTIONS = {
642+
adaptiveRetries: {
643+
default: false,
644+
type: 'boolean'
645+
},
642646
appName: {
643647
type: 'string'
644648
},

src/mongo_client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
229229
retryReads?: boolean;
230230
/** Enable retryable writes. */
231231
retryWrites?: boolean;
232+
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
233+
adaptiveRetries?: boolean;
232234
/** Allow a driver to force a Single topology type with a connection string containing one host */
233235
directConnection?: boolean;
234236
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
@@ -1041,6 +1043,7 @@ export interface MongoOptions
10411043
extends Required<
10421044
Pick<
10431045
MongoClientOptions,
1046+
| 'adaptiveRetries'
10441047
| 'autoEncryption'
10451048
| 'connectTimeoutMS'
10461049
| 'directConnection'

src/operations/execute_operation.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,13 @@ async function executeOperationWithRetries<
267267
try {
268268
try {
269269
const result = await server.command(operation, timeoutContext);
270-
topology.tokenBucket.deposit(
271-
attempt > 0
272-
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
273-
: RETRY_TOKEN_RETURN_RATE // otherwise
274-
);
270+
if (topology.s.options.adaptiveRetries) {
271+
topology.tokenBucket.deposit(
272+
attempt > 0
273+
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
274+
: RETRY_TOKEN_RETURN_RATE // otherwise
275+
);
276+
}
275277
return operation.handleOk(result);
276278
} catch (error) {
277279
return operation.handleError(error);
@@ -280,7 +282,11 @@ async function executeOperationWithRetries<
280282
// Should never happen but if it does - propagate the error.
281283
if (!(operationError instanceof MongoError)) throw operationError;
282284

283-
if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
285+
if (
286+
topology.s.options.adaptiveRetries &&
287+
attempt > 0 &&
288+
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
289+
) {
284290
// if a retry attempt fails with a non-overload error, deposit 1 token.
285291
topology.tokenBucket.deposit(RETRY_COST);
286292
}
@@ -319,17 +325,17 @@ async function executeOperationWithRetries<
319325
}
320326

321327
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
322-
if (!topology.tokenBucket.consume(RETRY_COST)) {
323-
throw error;
324-
}
325-
326328
const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);
327329

328330
// if the backoff would exhaust the CSOT timeout, short-circuit.
329331
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
330332
throw error;
331333
}
332334

335+
if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
336+
throw error;
337+
}
338+
333339
await setTimeout(backoffMS);
334340
}
335341

src/sdam/topology.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
146146
hosts: HostAddress[];
147147
retryWrites: boolean;
148148
retryReads: boolean;
149+
adaptiveRetries: boolean;
149150
/** How long to block for server selection before throwing an error */
150151
serverSelectionTimeoutMS: number;
151152
/** The name of the replica set to connect to */

test/integration/client-backpressure/client-backpressure.prose.test.ts

Lines changed: 155 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,21 @@ import { expect } from 'chai';
22
import * as sinon from 'sinon';
33

44
import {
5-
type Collection,
65
INITIAL_TOKEN_BUCKET_SIZE,
6+
MAX_RETRIES,
77
type MongoClient,
88
MongoServerError
99
} from '../../mongodb';
1010
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
11+
import { filterForCommands } from '../shared';
1112

1213
describe('Client Backpressure (Prose)', function () {
1314
let client: MongoClient;
14-
let collection: Collection;
15-
16-
beforeEach(async function () {
17-
client = this.configuration.newClient();
18-
await client.connect();
19-
20-
collection = client.db('foo').collection('bar');
21-
});
2215

2316
afterEach(async function () {
2417
sinon.restore();
2518
await client.close();
19+
client = undefined;
2620
await clearFailPoint(this.configuration);
2721
});
2822

@@ -34,6 +28,30 @@ describe('Client Backpressure (Prose)', function () {
3428
}
3529
},
3630
async function () {
31+
// 1. Let `client` be a `MongoClient`
32+
client = this.configuration.newClient();
33+
await client.connect();
34+
35+
// 2. Let `collection` be a collection
36+
const collection = client.db('foo').collection('bar');
37+
38+
// 3. Now, run transactions without backoff:
39+
// i. Configure the random number generator used for jitter to always return `0` -- this effectively disables backoff.
40+
const stub = sinon.stub(Math, 'random');
41+
stub.returns(0);
42+
43+
// ii. Configure the following failPoint:
44+
// ```javascript
45+
// {
46+
// configureFailPoint: 'failCommand',
47+
// mode: 'alwaysOn',
48+
// data: {
49+
// failCommands: ['insert'],
50+
// errorCode: 2,
51+
// errorLabels: ['SystemOverloadedError', 'RetryableError']
52+
// }
53+
// }
54+
// ```
3755
await configureFailPoint(this.configuration, {
3856
configureFailPoint: 'failCommand',
3957
mode: 'alwaysOn',
@@ -44,38 +62,156 @@ describe('Client Backpressure (Prose)', function () {
4462
}
4563
});
4664

47-
const stub = sinon.stub(Math, 'random');
48-
49-
stub.returns(0);
50-
65+
// iii. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command execution.
5166
const { duration: durationNoBackoff } = await measureDuration(async () => {
5267
const error = await collection.insertOne({ a: 1 }).catch(e => e);
5368
expect(error).to.be.instanceof(MongoServerError);
5469
});
5570

71+
// iv. Configure the random number generator used for jitter to always return a number as close as possible to `1`.
5672
stub.returns(0.99);
5773

74+
// v. Execute step iii again.
5875
const { duration: durationBackoff } = await measureDuration(async () => {
5976
const error = await collection.insertOne({ a: 1 }).catch(e => e);
6077
expect(error).to.be.instanceof(MongoServerError);
6178
});
6279

80+
// vi. Compare the two time between the two runs.
81+
// The sum of 5 backoffs is 3.1 seconds. There is a 1-second window to account for potential variance between the two runs.
6382
expect(durationBackoff - durationNoBackoff).to.be.within(3100 - 1000, 3100 + 1000);
6483
}
6584
);
6685

67-
it('Test 2: Token Bucket capacity is Enforced', async () => {
68-
// 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity
69-
// is DEFAULT_RETRY_TOKEN_CAPACITY.
86+
it('Test 2: Token Bucket capacity is Enforced', async function () {
87+
// 1. Let client be a MongoClient with adaptiveRetries=True.
88+
client = this.configuration.newClient({
89+
adaptiveRetries: true
90+
});
91+
await client.connect();
92+
93+
// 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY.
7094
const tokenBucket = client.topology.tokenBucket;
7195
expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE);
7296
expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE);
7397

74-
// 3. Execute a successful ping command.
98+
// 3. Using client, execute a successful ping command.
7599
await client.db('admin').command({ ping: 1 });
76100

77-
// 4. Assert that the successful command did not increase the number of tokens in the bucket
78-
// above DEFAULT_RETRY_TOKEN_CAPACITY.
101+
// 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY.
79102
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);
80103
});
104+
105+
it(
106+
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
107+
{
108+
requires: {
109+
mongodb: '>=4.4'
110+
}
111+
},
112+
async function () {
113+
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
114+
client = this.configuration.newClient({
115+
monitorCommands: true
116+
});
117+
await client.connect();
118+
119+
// 2. Let `coll` be a collection.
120+
const collection = client.db('foo').collection('bar');
121+
const commandsStarted = [];
122+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
123+
124+
/*
125+
* 3. Configure the following failpoint:
126+
{
127+
configureFailPoint: 'failCommand',
128+
mode: 'alwaysOn',
129+
data: {
130+
failCommands: ['find'],
131+
errorCode: 462, // IngressRequestRateLimitExceeded
132+
errorLabels: ['SystemOverloadedError', 'RetryableError']
133+
}
134+
}
135+
* */
136+
await configureFailPoint(this.configuration, {
137+
configureFailPoint: 'failCommand',
138+
mode: 'alwaysOn',
139+
data: {
140+
failCommands: ['find'],
141+
errorCode: 462,
142+
errorLabels: ['RetryableError', 'SystemOverloadedError']
143+
}
144+
});
145+
146+
// 4. Perform a find operation with `coll` that fails.
147+
const error = await collection.findOne({}).catch(e => e);
148+
149+
// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
150+
expect(error).to.be.instanceof(MongoServerError);
151+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
152+
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;
153+
154+
// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
155+
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);
156+
}
157+
);
158+
159+
it(
160+
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
161+
{
162+
requires: {
163+
mongodb: '>=4.4'
164+
}
165+
},
166+
async function () {
167+
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
168+
client = this.configuration.newClient({
169+
adaptiveRetries: true,
170+
monitorCommands: true
171+
});
172+
await client.connect();
173+
174+
// 2. Set `client`'s retry token bucket to have 2 tokens.
175+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
176+
client.topology!.tokenBucket['budget'] = 2;
177+
178+
// 3. Let `coll` be a collection.
179+
const collection = client.db('foo').collection('bar');
180+
const commandsStarted = [];
181+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
182+
183+
/*
184+
* 4. Configure the following failpoint:
185+
{
186+
configureFailPoint: 'failCommand',
187+
mode: {times: 3},
188+
data: {
189+
failCommands: ['find'],
190+
errorCode: 462, // IngressRequestRateLimitExceeded
191+
errorLabels: ['SystemOverloadedError', 'RetryableError']
192+
}
193+
}
194+
* */
195+
await configureFailPoint(this.configuration, {
196+
configureFailPoint: 'failCommand',
197+
mode: { times: 3 },
198+
data: {
199+
failCommands: ['find'],
200+
errorCode: 462,
201+
errorLabels: ['RetryableError', 'SystemOverloadedError']
202+
}
203+
});
204+
205+
// 5. Perform a find operation with `coll` that fails.
206+
const error = await collection.findOne({}).catch(e => e);
207+
208+
// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
209+
expect(error).to.be.instanceof(MongoServerError);
210+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
211+
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;
212+
213+
// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
214+
expect(commandsStarted).to.have.length(3);
215+
}
216+
);
81217
});
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"tests": [
3+
{
4+
"description": "adaptiveRetries=true is parsed correctly",
5+
"uri": "mongodb://example.com/?adaptiveRetries=true",
6+
"valid": true,
7+
"warning": false,
8+
"hosts": null,
9+
"auth": null,
10+
"options": {
11+
"adaptiveRetries": true
12+
}
13+
},
14+
{
15+
"description": "adaptiveRetries=false is parsed correctly",
16+
"uri": "mongodb://example.com/?adaptiveRetries=false",
17+
"valid": true,
18+
"warning": false,
19+
"hosts": null,
20+
"auth": null,
21+
"options": {
22+
"adaptiveRetries": false
23+
}
24+
},
25+
{
26+
"description": "adaptiveRetries with invalid value causes a warning",
27+
"uri": "mongodb://example.com/?adaptiveRetries=invalid",
28+
"valid": true,
29+
"warning": true,
30+
"hosts": null,
31+
"auth": null,
32+
"options": null
33+
}
34+
]
35+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
tests:
2+
-
3+
description: "adaptiveRetries=true is parsed correctly"
4+
uri: "mongodb://example.com/?adaptiveRetries=true"
5+
valid: true
6+
warning: false
7+
hosts: ~
8+
auth: ~
9+
options:
10+
adaptiveRetries: true
11+
-
12+
description: "adaptiveRetries=false is parsed correctly"
13+
uri: "mongodb://example.com/?adaptiveRetries=false"
14+
valid: true
15+
warning: false
16+
hosts: ~
17+
auth: ~
18+
options:
19+
adaptiveRetries: false
20+
-
21+
description: "adaptiveRetries with invalid value causes a warning"
22+
uri: "mongodb://example.com/?adaptiveRetries=invalid"
23+
valid: true
24+
warning: true
25+
hosts: ~
26+
auth: ~
27+
options: ~

test/tools/uri_spec_runner.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ export function executeUriValidationTest(
358358
case 'serverSelectionTimeoutMS':
359359
case 'serverMonitoringMode':
360360
case 'socketTimeoutMS':
361+
case 'adaptiveRetries':
361362
case 'retryWrites':
362363
case 'directConnection':
363364
case 'loadBalanced':

0 commit comments

Comments
 (0)