Skip to content

Commit 475b6d4

Browse files
committed
Added fix for endTransaction operations (commit/abort)
1 parent edac3df commit 475b6d4

4 files changed

Lines changed: 135 additions & 147 deletions

File tree

src/MongoDB.Driver/Core/Bindings/CoreSession.cs

Lines changed: 31 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -160,36 +160,18 @@ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, Canc
160160

161161
try
162162
{
163-
var firstAttempt = CreateAbortTransactionOperation(operationContext);
164-
ExecuteEndTransactionOnPrimary(operationContext, firstAttempt);
165-
return;
166-
}
167-
catch (Exception exception) when (ShouldRetryEndTransactionException(operationContext, exception))
168-
{
169-
// unpin if retryable error
170-
_currentTransaction.UnpinAll();
171-
172-
// ignore exception and retry
173-
}
174-
catch
175-
{
176-
return; // ignore exception and return
177-
}
178-
179-
try
180-
{
181-
var secondAttempt = CreateAbortTransactionOperation(operationContext);
182-
ExecuteEndTransactionOnPrimary(operationContext, secondAttempt);
163+
var operation = CreateAbortTransactionOperation(operationContext);
164+
ExecuteEndTransactionOnPrimary(operationContext, operation);
183165
}
184166
catch
185167
{
186-
return; // ignore exception and return
168+
// ignore all errors per spec
187169
}
188170
}
189171
finally
190172
{
191173
_currentTransaction.SetState(CoreTransactionState.Aborted);
192-
// The transaction is aborted.The session MUST be unpinned regardless
174+
// The transaction is aborted. The session MUST be unpinned regardless
193175
// of whether the abortTransaction command succeeds or fails
194176
_currentTransaction.UnpinAll();
195177

@@ -216,36 +198,18 @@ async Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions op
216198

217199
try
218200
{
219-
var firstAttempt = CreateAbortTransactionOperation(operationContext);
220-
await ExecuteEndTransactionOnPrimaryAsync(operationContext, firstAttempt).ConfigureAwait(false);
221-
return;
222-
}
223-
catch (Exception exception) when (ShouldRetryEndTransactionException(operationContext, exception))
224-
{
225-
// unpin if retryable error
226-
_currentTransaction.UnpinAll();
227-
228-
// ignore exception and retry
201+
var operation = CreateAbortTransactionOperation(operationContext);
202+
await ExecuteEndTransactionOnPrimaryAsync(operationContext, operation).ConfigureAwait(false);
229203
}
230204
catch
231205
{
232-
return; // ignore exception and return
233-
}
234-
235-
try
236-
{
237-
var secondAttempt = CreateAbortTransactionOperation(operationContext);
238-
await ExecuteEndTransactionOnPrimaryAsync(operationContext, secondAttempt).ConfigureAwait(false);
239-
}
240-
catch
241-
{
242-
return; // ignore exception and return
206+
// ignore all errors per spec
243207
}
244208
}
245209
finally
246210
{
247211
_currentTransaction.SetState(CoreTransactionState.Aborted);
248-
// The transaction is aborted.The session MUST be unpinned regardless
212+
// The transaction is aborted. The session MUST be unpinned regardless
249213
// of whether the abortTransaction command succeeds or fails
250214
_currentTransaction.UnpinAll();
251215

@@ -320,20 +284,8 @@ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, Ca
320284
return;
321285
}
322286

323-
try
324-
{
325-
var firstAttempt = CreateCommitTransactionOperation(operationContext, IsFirstCommitAttemptRetry());
326-
ExecuteEndTransactionOnPrimary(operationContext, firstAttempt);
327-
return;
328-
}
329-
catch (Exception exception) when (ShouldRetryEndTransactionException(operationContext, exception))
330-
{
331-
// unpin server if needed, then ignore exception and retry
332-
TransactionHelper.UnpinServerIfNeededOnRetryableCommitException(_currentTransaction, exception);
333-
}
334-
335-
var secondAttempt = CreateCommitTransactionOperation(operationContext, isCommitRetry: true);
336-
ExecuteEndTransactionOnPrimary(operationContext, secondAttempt);
287+
var operation = CreateCommitTransactionOperation(operationContext);
288+
ExecuteEndTransactionOnPrimary(operationContext, operation);
337289
}
338290
finally
339291
{
@@ -343,7 +295,6 @@ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, Ca
343295
}
344296
}
345297

346-
//TODO Both commit transaction and abort transaction need to be changed according to spec
347298
/// <inheritdoc />
348299
public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
349300
=> ((ICoreSessionInternal)this).CommitTransactionAsync(null, cancellationToken);
@@ -362,20 +313,8 @@ async Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions
362313
return;
363314
}
364315

365-
try
366-
{
367-
var firstAttempt = CreateCommitTransactionOperation(operationContext, IsFirstCommitAttemptRetry());
368-
await ExecuteEndTransactionOnPrimaryAsync(operationContext, firstAttempt).ConfigureAwait(false);
369-
return;
370-
}
371-
catch (Exception exception) when (ShouldRetryEndTransactionException(operationContext, exception))
372-
{
373-
// unpin server if needed, then ignore exception and retry
374-
TransactionHelper.UnpinServerIfNeededOnRetryableCommitException(_currentTransaction, exception);
375-
}
376-
377-
var secondAttempt = CreateCommitTransactionOperation(operationContext, isCommitRetry: true);
378-
await ExecuteEndTransactionOnPrimaryAsync(operationContext, secondAttempt).ConfigureAwait(false);
316+
var operation = CreateCommitTransactionOperation(operationContext);
317+
await ExecuteEndTransactionOnPrimaryAsync(operationContext, operation).ConfigureAwait(false);
379318
}
380319
finally
381320
{
@@ -466,16 +405,25 @@ public void WasUsed()
466405
}
467406

468407
// private methods
469-
private IReadOperation<BsonDocument> CreateAbortTransactionOperation(OperationContext operationContext)
408+
private AbortTransactionOperation CreateAbortTransactionOperation(OperationContext operationContext)
470409
{
471410
return new AbortTransactionOperation(_currentTransaction.RecoveryToken, GetTransactionWriteConcern(operationContext));
472411
}
473412

474-
private IReadOperation<BsonDocument> CreateCommitTransactionOperation(OperationContext operationContext, bool isCommitRetry)
413+
private CommitTransactionOperation CreateCommitTransactionOperation(OperationContext operationContext)
475414
{
476-
var writeConcern = GetCommitTransactionWriteConcern(operationContext, isCommitRetry);
415+
var writeConcern = GetTransactionWriteConcern(operationContext);
477416
var maxCommitTime = _currentTransaction.TransactionOptions.MaxCommitTime;
478-
return new CommitTransactionOperation(_currentTransaction.RecoveryToken, writeConcern) { MaxCommitTime = maxCommitTime };
417+
var operation = new CommitTransactionOperation(_currentTransaction.RecoveryToken, writeConcern) { MaxCommitTime = maxCommitTime };
418+
419+
// When the user re-calls commitTransaction after the state is already "Committed",
420+
// the spec considers this a retry and write concern should be upgraded to w:majority.
421+
if (_currentTransaction.State == CoreTransactionState.Committed)
422+
{
423+
operation.RequiresMajorityWriteConcern = true;
424+
}
425+
426+
return operation;
479427
}
480428

481429
private void EnsureAbortTransactionCanBeCalled(string methodName)
@@ -579,27 +527,29 @@ private void EnsureTransactionsAreSupported()
579527
}
580528
}
581529

582-
private TResult ExecuteEndTransactionOnPrimary<TResult>(OperationContext operationContext, IReadOperation<TResult> operation)
530+
private void ExecuteEndTransactionOnPrimary(OperationContext operationContext, EndTransactionOperation operation)
583531
{
584532
using var transactionActivityScope = TransactionActivityScope.CreateIfNeeded(_currentTransaction);
585533
using var operationActivity = MongoTelemetry.StartOperationActivity(operationContext);
586534

587535
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
588536
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
537+
using (var context = new RetryableWriteContext(binding, retryRequested: true, canBeRetried: true))
589538
{
590-
return operation.Execute(operationContext, binding);
539+
RetryableWriteOperationExecutor.Execute(operationContext, operation, context);
591540
}
592541
}
593542

594-
private async Task<TResult> ExecuteEndTransactionOnPrimaryAsync<TResult>(OperationContext operationContext, IReadOperation<TResult> operation)
543+
private async Task ExecuteEndTransactionOnPrimaryAsync(OperationContext operationContext, EndTransactionOperation operation)
595544
{
596545
using var transactionActivityScope = TransactionActivityScope.CreateIfNeeded(_currentTransaction);
597546
using var operationActivity = MongoTelemetry.StartOperationActivity(operationContext);
598547

599548
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
600549
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
550+
using (var context = new RetryableWriteContext(binding, retryRequested: true, canBeRetried: true))
601551
{
602-
return await operation.ExecuteAsync(operationContext, binding).ConfigureAwait(false);
552+
await RetryableWriteOperationExecutor.ExecuteAsync(operationContext, operation, context).ConfigureAwait(false);
603553
}
604554
}
605555

@@ -628,36 +578,5 @@ private WriteConcern GetTransactionWriteConcern(OperationContext operationContex
628578

629579
return writeConcern;
630580
}
631-
632-
private WriteConcern GetCommitTransactionWriteConcern(OperationContext operationContext, bool isCommitRetry)
633-
{
634-
var writeConcern = GetTransactionWriteConcern(operationContext);
635-
if (isCommitRetry)
636-
{
637-
writeConcern = writeConcern.With(mode: "majority");
638-
if (writeConcern.WTimeout == null && !operationContext.IsRootContextTimeoutConfigured())
639-
{
640-
writeConcern = writeConcern.With(wTimeout: TimeSpan.FromMilliseconds(10000));
641-
}
642-
}
643-
644-
return writeConcern;
645-
}
646-
647-
private bool IsFirstCommitAttemptRetry()
648-
{
649-
// According to the spec, trying to commit again while the state is "committed" is considered a retry.
650-
return _currentTransaction.State == CoreTransactionState.Committed;
651-
}
652-
653-
private bool ShouldRetryEndTransactionException(OperationContext operationContext, Exception exception)
654-
{
655-
if (!RetryabilityHelper.IsRetryableWriteException(exception))
656-
{
657-
return false;
658-
}
659-
660-
return operationContext.IsRootContextTimeoutConfigured() ? !operationContext.IsTimedOut() : true;
661-
}
662581
}
663582
}

0 commit comments

Comments
 (0)