Skip to content

Commit 6f58002

Browse files
authored
CSHARP-5887: Simplify retryable read and writes (#1882)
1 parent 490be9c commit 6f58002

24 files changed

Lines changed: 420 additions & 376 deletions

src/MongoDB.Driver/Core/Operations/AggregateOperation.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
namespace MongoDB.Driver.Core.Operations
3131
{
32-
internal sealed class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>, IExecutableInRetryableReadContext<IAsyncCursor<TResult>>
32+
internal sealed class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>, IExecutableInRetryableReadContext<IAsyncCursor<TResult>>, ICommandCreator
3333
{
3434
// fields
3535
private bool? _allowDiskUse;
@@ -279,7 +279,7 @@ public IAsyncCursor<TResult> Execute(OperationContext operationContext, IReadBin
279279
Ensure.IsNotNull(binding, nameof(binding));
280280

281281
using (BeginOperation())
282-
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
282+
using (var context = new RetryableReadContext(binding, _retryRequested))
283283
{
284284
return Execute(operationContext, context);
285285
}
@@ -308,7 +308,7 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(OperationContext operation
308308
Ensure.IsNotNull(binding, nameof(binding));
309309

310310
using (BeginOperation())
311-
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
311+
using (var context = new RetryableReadContext(binding, _retryRequested))
312312
{
313313
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
314314
}
@@ -331,7 +331,7 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(OperationContext operation
331331
}
332332
}
333333

334-
internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription)
334+
public BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription)
335335
{
336336
var readConcern = ReadConcernHelper.GetReadConcernForCommand(session, connectionDescription, _readConcern);
337337
var command = new BsonDocument
@@ -362,9 +362,13 @@ internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSess
362362
private ReadCommandOperation<AggregateResult> CreateOperation(OperationContext operationContext, RetryableReadContext context)
363363
{
364364
var databaseNamespace = _collectionNamespace == null ? _databaseNamespace : _collectionNamespace.DatabaseNamespace;
365-
var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription);
366365
var serializer = new AggregateResultDeserializer(_resultSerializer);
367-
return new ReadCommandOperation<AggregateResult>(databaseNamespace, command, serializer, MessageEncoderSettings, OperationName)
366+
return new ReadCommandOperation<AggregateResult>(
367+
databaseNamespace,
368+
this,
369+
serializer,
370+
MessageEncoderSettings,
371+
OperationName)
368372
{
369373
RetryRequested = _retryRequested // might be overridden by retryable read context
370374
};

src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public WriteConcern WriteConcern
145145
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
146146
{
147147
using (BeginOperation())
148-
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
148+
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
149149
{
150150
EnsureHintIsSupportedIfAnyRequestHasHint();
151151
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);
@@ -160,7 +160,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, IWrit
160160
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
161161
{
162162
using (BeginOperation())
163-
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
163+
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
164164
{
165165
EnsureHintIsSupportedIfAnyRequestHasHint();
166166
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);

src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, Retry
131131
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
132132
{
133133
using (BeginOperation())
134-
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
134+
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
135135
{
136136
return Execute(operationContext, context);
137137
}
@@ -147,7 +147,7 @@ public Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationCon
147147
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
148148
{
149149
using (BeginOperation())
150-
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
150+
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
151151
{
152152
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
153153
}
@@ -286,7 +286,7 @@ public BulkWriteOperationResult CreateFinalResultOrThrow(IChannelHandle channel)
286286
{
287287
var combiner = new BulkWriteBatchResultCombiner(_batchResults, _writeConcern.IsAcknowledged);
288288
var remainingRequests = _requests.GetUnprocessedItems();
289-
return combiner.CreateResultOrThrowIfHasErrors(channel.ConnectionDescription.ConnectionId, remainingRequests);
289+
return combiner.CreateResultOrThrowIfHasErrors(channel?.ConnectionDescription.ConnectionId, remainingRequests);
290290
}
291291

292292
// private methods

src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public IChangeStreamCursor<TResult> Execute(OperationContext operationContext, I
266266
IAsyncCursor<RawBsonDocument> cursor;
267267
ICursorBatchInfo cursorBatchInfo;
268268
BsonTimestamp initialOperationTime;
269-
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
269+
using (var context = new RetryableReadContext(binding, _retryRequested))
270270
{
271271
cursor = ExecuteAggregateOperation(operationContext, context);
272272
cursorBatchInfo = (ICursorBatchInfo)cursor;
@@ -301,7 +301,7 @@ public async Task<IChangeStreamCursor<TResult>> ExecuteAsync(OperationContext op
301301
IAsyncCursor<RawBsonDocument> cursor;
302302
ICursorBatchInfo cursorBatchInfo;
303303
BsonTimestamp initialOperationTime;
304-
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
304+
using (var context = new RetryableReadContext(binding, _retryRequested))
305305
{
306306
cursor = await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false);
307307
cursorBatchInfo = (ICursorBatchInfo)cursor;
@@ -326,7 +326,7 @@ public async Task<IChangeStreamCursor<TResult>> ExecuteAsync(OperationContext op
326326
/// <inheritdoc />
327327
public IAsyncCursor<RawBsonDocument> Resume(OperationContext operationContext, IReadBinding binding)
328328
{
329-
using (var context = RetryableReadContext.Create(operationContext, binding, retryRequested: false))
329+
using (var context = new RetryableReadContext(binding, retryRequested: false))
330330
{
331331
return ExecuteAggregateOperation(operationContext, context);
332332
}
@@ -335,7 +335,7 @@ public IAsyncCursor<RawBsonDocument> Resume(OperationContext operationContext, I
335335
/// <inheritdoc />
336336
public async Task<IAsyncCursor<RawBsonDocument>> ResumeAsync(OperationContext operationContext, IReadBinding binding)
337337
{
338-
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, retryRequested: false).ConfigureAwait(false))
338+
using (var context = new RetryableReadContext(binding, retryRequested: false))
339339
{
340340
return await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false);
341341
}

src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
9696
var bulkWriteResults = new BulkWriteRawResult();
9797
while (true)
9898
{
99-
using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested());
99+
using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested());
100100
BsonDocument serverResponse = null;
101101
try
102102
{
@@ -112,7 +112,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
112112
bulkWriteResults.TopLevelException = commandException;
113113
serverResponse = commandException.Result;
114114
}
115-
catch (Exception exception)
115+
catch (Exception exception) when (context.Channel is not null)
116116
{
117117
bulkWriteResults.TopLevelException = exception;
118118
}
@@ -154,7 +154,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
154154
var bulkWriteResults = new BulkWriteRawResult();
155155
while (true)
156156
{
157-
using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested());
157+
using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested());
158158
BsonDocument serverResponse = null;
159159
try
160160
{
@@ -170,7 +170,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
170170
bulkWriteResults.TopLevelException = commandException;
171171
serverResponse = commandException.Result;
172172
}
173-
catch (Exception exception)
173+
catch (Exception exception) when (context.Channel is not null)
174174
{
175175
bulkWriteResults.TopLevelException = exception;
176176
}

src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,17 @@ namespace MongoDB.Driver.Core.Operations
2727
internal abstract class CommandOperationBase<TCommandResult>
2828
{
2929
private BsonDocument _additionalOptions;
30-
private BsonDocument _command;
3130
private IElementNameValidator _commandValidator = NoOpElementNameValidator.Instance;
3231
private string _comment;
3332
private DatabaseNamespace _databaseNamespace;
3433
private MessageEncoderSettings _messageEncoderSettings;
3534
private IBsonSerializer<TCommandResult> _resultSerializer;
3635

37-
protected CommandOperationBase(
38-
DatabaseNamespace databaseNamespace,
39-
BsonDocument command,
36+
protected CommandOperationBase(DatabaseNamespace databaseNamespace,
4037
IBsonSerializer<TCommandResult> resultSerializer,
4138
MessageEncoderSettings messageEncoderSettings)
4239
{
4340
_databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
44-
_command = Ensure.IsNotNull(command, nameof(command));
4541
_resultSerializer = Ensure.IsNotNull(resultSerializer, nameof(resultSerializer));
4642
_messageEncoderSettings = messageEncoderSettings;
4743
}
@@ -52,11 +48,6 @@ public BsonDocument AdditionalOptions
5248
set { _additionalOptions = value; }
5349
}
5450

55-
public BsonDocument Command
56-
{
57-
get { return _command; }
58-
}
59-
6051
public IElementNameValidator CommandValidator
6152
{
6253
get { return _commandValidator; }
@@ -84,7 +75,7 @@ public IBsonSerializer<TCommandResult> ResultSerializer
8475
get { return _resultSerializer; }
8576
}
8677

87-
protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference)
78+
protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command)
8879
{
8980
var additionalOptions = GetEffectiveAdditionalOptions();
9081

@@ -93,7 +84,7 @@ protected TCommandResult ExecuteProtocol(OperationContext operationContext, ICha
9384
session,
9485
readPreference,
9586
_databaseNamespace,
96-
_command,
87+
command,
9788
null, // commandPayloads
9889
_commandValidator,
9990
additionalOptions,
@@ -107,15 +98,16 @@ protected TCommandResult ExecuteProtocol(
10798
OperationContext operationContext,
10899
IChannelSource channelSource,
109100
ICoreSessionHandle session,
110-
ReadPreference readPreference)
101+
ReadPreference readPreference,
102+
BsonDocument command)
111103
{
112104
using (var channel = channelSource.GetChannel(operationContext))
113105
{
114-
return ExecuteProtocol(operationContext, channel, session, readPreference);
106+
return ExecuteProtocol(operationContext, channel, session, readPreference, command);
115107
}
116108
}
117109

118-
protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference)
110+
protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command)
119111
{
120112
var additionalOptions = GetEffectiveAdditionalOptions();
121113

@@ -124,7 +116,7 @@ protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationCo
124116
session,
125117
readPreference,
126118
_databaseNamespace,
127-
_command,
119+
command,
128120
null, // TODO: support commandPayloads
129121
_commandValidator,
130122
additionalOptions,
@@ -138,11 +130,12 @@ protected async Task<TCommandResult> ExecuteProtocolAsync(
138130
OperationContext operationContext,
139131
IChannelSource channelSource,
140132
ICoreSessionHandle session,
141-
ReadPreference readPreference)
133+
ReadPreference readPreference,
134+
BsonDocument command)
142135
{
143136
using (var channel = await channelSource.GetChannelAsync(operationContext).ConfigureAwait(false))
144137
{
145-
return await ExecuteProtocolAsync(operationContext, channel, session, readPreference).ConfigureAwait(false);
138+
return await ExecuteProtocolAsync(operationContext, channel, session, readPreference, command).ConfigureAwait(false);
146139
}
147140
}
148141

src/MongoDB.Driver/Core/Operations/CountOperation.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
namespace MongoDB.Driver.Core.Operations
2727
{
28-
internal sealed class CountOperation : IReadOperation<long>, IExecutableInRetryableReadContext<long>
28+
internal sealed class CountOperation : IReadOperation<long>, IExecutableInRetryableReadContext<long>, ICommandCreator
2929
{
3030
private Collation _collation;
3131
private readonly CollectionNamespace _collectionNamespace;
@@ -133,7 +133,7 @@ public long Execute(OperationContext operationContext, IReadBinding binding)
133133
Ensure.IsNotNull(binding, nameof(binding));
134134

135135
using (BeginOperation())
136-
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
136+
using (var context = new RetryableReadContext(binding, _retryRequested))
137137
{
138138
return Execute(operationContext, context);
139139
}
@@ -151,7 +151,7 @@ public async Task<long> ExecuteAsync(OperationContext operationContext, IReadBin
151151
Ensure.IsNotNull(binding, nameof(binding));
152152

153153
using (BeginOperation())
154-
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
154+
using (var context = new RetryableReadContext(binding, _retryRequested))
155155
{
156156
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
157157
}
@@ -168,8 +168,12 @@ public async Task<long> ExecuteAsync(OperationContext operationContext, Retryabl
168168

169169
private ReadCommandOperation<BsonDocument> CreateOperation(OperationContext operationContext, RetryableReadContext context)
170170
{
171-
var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription);
172-
return new ReadCommandOperation<BsonDocument>(_collectionNamespace.DatabaseNamespace, command, BsonDocumentSerializer.Instance, _messageEncoderSettings, OperationName)
171+
return new ReadCommandOperation<BsonDocument>(
172+
_collectionNamespace.DatabaseNamespace,
173+
this,
174+
BsonDocumentSerializer.Instance,
175+
_messageEncoderSettings,
176+
OperationName)
173177
{
174178
RetryRequested = _retryRequested // might be overridden by retryable read context
175179
};

0 commit comments

Comments
 (0)