-
Notifications
You must be signed in to change notification settings - Fork 1.3k
CSHARP-6029: Make ICoreServerSessionPool disposable and send endSessions on dispose #1995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
a42ca19
b1b8b07
f2bb7f7
88900f0
3eeced0
dcadd7d
d23b3f6
51d4499
5ddcc9a
dc009ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,78 +14,142 @@ | |
| */ | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Collections.Concurrent; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using Microsoft.Extensions.Logging; | ||
| using MongoDB.Bson; | ||
| using MongoDB.Bson.IO; | ||
| using MongoDB.Bson.Serialization.Serializers; | ||
| using MongoDB.Driver.Core.Bindings; | ||
| using MongoDB.Driver.Core.Clusters; | ||
| using MongoDB.Driver.Core.Logging; | ||
| using MongoDB.Driver.Core.Misc; | ||
| using MongoDB.Driver.Core.Servers; | ||
| using MongoDB.Driver.Core.WireProtocol; | ||
|
|
||
| namespace MongoDB.Driver | ||
| { | ||
| internal sealed class CoreServerSessionPool : ICoreServerSessionPool | ||
| { | ||
| // private fields | ||
| private readonly ICluster _cluster; | ||
| private readonly object _lock = new object(); | ||
| private readonly List<ICoreServerSession> _pool = new List<ICoreServerSession>(); | ||
| private readonly ILogger<LogCategories.Client> _logger; | ||
| private readonly ConcurrentStack<ICoreServerSession> _pool = new(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Accordingly to spec pool should be LIFO. |
||
| private volatile bool _isDisposed = false; | ||
| private long _sessionsCreated; | ||
| private long _sessionsDisposed; | ||
| private long _sessionsAcquired; | ||
| private long _sessionsReturned; | ||
|
|
||
| // constructors | ||
| public CoreServerSessionPool(ICluster cluster) | ||
| public CoreServerSessionPool(ICluster cluster, ILogger<LogCategories.Client> logger) | ||
| { | ||
| _logger = logger; | ||
| _cluster = Ensure.IsNotNull(cluster, nameof(cluster)); | ||
| } | ||
|
|
||
| public ICoreServerSession AcquireSession() | ||
| { | ||
| lock (_lock) | ||
| ThrowIfDisposed(); | ||
| ICoreServerSession session = null; | ||
| while (session == null && _pool.TryPop(out session)) | ||
| { | ||
| for (var i = _pool.Count - 1; i >= 0; i--) | ||
| if (IsAboutToExpireOrDirty(session)) | ||
| { | ||
| var pooledSession = _pool[i]; | ||
| if (IsAboutToExpireOrDirty(pooledSession)) | ||
| { | ||
| pooledSession.Dispose(); | ||
| } | ||
| else | ||
| { | ||
| var removeCount = _pool.Count - i; // the one we're about to return and any about to expire ones we skipped over | ||
| _pool.RemoveRange(i, removeCount); | ||
| return new ReleaseOnDisposeCoreServerSession(pooledSession, this); | ||
| } | ||
| session.Dispose(); | ||
| session = null; | ||
| } | ||
| } | ||
|
|
||
| _pool.Clear(); // they're all about to expire | ||
| if (session == null) | ||
| { | ||
| Interlocked.Increment(ref _sessionsCreated); | ||
| session = new CoreServerSession(); | ||
| } | ||
|
|
||
| return new ReleaseOnDisposeCoreServerSession(new CoreServerSession(), this); | ||
| Interlocked.Increment(ref _sessionsAcquired); | ||
| return new ReleaseOnDisposeCoreServerSession(session, this); | ||
| } | ||
|
|
||
| public void ReleaseSession(ICoreServerSession session) | ||
| { | ||
| lock (_lock) | ||
| ThrowIfDisposed(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this be a behavioral breaking change? In the case server is disposed but the session/operation in flight is not, the exception might happen even for a successful operation?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is, but if MongoClient/Cluster is being disposed - all resources associated with it should be cleaned up. Otherwise we might let cluster "resurrect" and have leaked resources.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the resources should be clean up.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If somebody disposed client/cluster and then suddenly started returning sessions to the pool: it means a) they disposed cluster too early b) the returned sessions might not be closed properly (if CloseAndDispose is already done).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it was discussed offline we will log an error for now to prevent any possible breaking change in the minor release. |
||
| Interlocked.Increment(ref _sessionsReturned); | ||
|
|
||
| if (IsAboutToExpireOrDirty(session)) | ||
|
BorisDog marked this conversation as resolved.
|
||
| { | ||
| Interlocked.Increment(ref _sessionsDisposed); | ||
| session.Dispose(); | ||
| } | ||
| else | ||
| { | ||
| _pool.Push(session); | ||
| } | ||
| } | ||
|
|
||
| public void CloseAndDispose(IServer server) | ||
| { | ||
| if (_isDisposed) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _isDisposed = true; | ||
| var timestamp = Stopwatch.GetTimestamp(); | ||
| _logger?.LogDebug( | ||
|
BorisDog marked this conversation as resolved.
|
||
| "Closing server session pool for {clusterId}: total sessions created {sessionsCreated}, total sessions acquired {sessionsAcquired}, sessions returned {sessionsReturned}, sessions disposed {sessionsDisposed}, pooled sessions {pooledSessions}.", | ||
| _cluster.ClusterId, _sessionsCreated, _sessionsAcquired, _sessionsReturned, _sessionsDisposed, _pool.Count); | ||
|
|
||
| var sessionsEnded = 0; | ||
| try | ||
| { | ||
| var removeCount = 0; | ||
| for (var i = 0; i < _pool.Count; i++) | ||
| while (true) | ||
| { | ||
| var pooledSession = _pool[i]; | ||
| if (IsAboutToExpireOrDirty(pooledSession)) | ||
| var batchSize = Math.Min(10000, _pool.Count); | ||
| var batch = new ICoreServerSession[batchSize]; | ||
|
|
||
| batchSize = _pool.TryPopRange(batch); | ||
|
BorisDog marked this conversation as resolved.
|
||
| if (batchSize == 0) | ||
| { | ||
| pooledSession.Dispose(); | ||
| removeCount++; | ||
| return; | ||
| } | ||
| else | ||
|
|
||
| var endSessionCommand = new BsonDocument("endSessions", new BsonArray(batch.Take(batchSize).Select(s => s.Id))); | ||
| var operationContext = OperationContext.NoTimeout; | ||
| using var channel = server.GetChannel(operationContext); | ||
| channel.Command( | ||
| operationContext, | ||
| NoCoreSession.Instance, | ||
| ReadPreference.PrimaryPreferred, | ||
| DatabaseNamespace.Admin, | ||
| endSessionCommand, | ||
| null, | ||
| NoOpElementNameValidator.Instance, | ||
| null, | ||
| null, | ||
| CommandResponseHandling.Return, | ||
| BsonDocumentSerializer.Instance, | ||
| null); | ||
|
BorisDog marked this conversation as resolved.
|
||
|
|
||
| sessionsEnded += batchSize; | ||
|
|
||
| for (var i = 0; i < batchSize; i++) | ||
| { | ||
| break; | ||
| batch[i].Dispose(); | ||
| } | ||
| } | ||
| _pool.RemoveRange(0, removeCount); | ||
|
|
||
| if (IsAboutToExpireOrDirty(session)) | ||
| { | ||
| session.Dispose(); | ||
| } | ||
| else | ||
| { | ||
| _pool.Add(session); | ||
| } | ||
| } | ||
| catch(Exception ex) | ||
| { | ||
| _logger?.LogError(ex, "Error closing server session pool for {clusterId}: {exception}", _cluster.ClusterId, ex); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pass
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's there, the last parameter.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but do we need it twice?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. Will remove.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think passing
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Will remove. |
||
| } | ||
| finally | ||
| { | ||
| _logger?.LogDebug( | ||
| "Closed server session pool for {clusterId} in {milliseconds}ms, total sessions ended {sessionsEnded}.", | ||
| _cluster.ClusterId, (Stopwatch.GetTimestamp() - timestamp) / (double)Stopwatch.Frequency * 1000, sessionsEnded); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -116,6 +180,14 @@ private bool IsAboutToExpireOrDirty(ICoreServerSession session) | |
| return IsAboutToExpire(session) || session.IsDirty; | ||
| } | ||
|
|
||
| private void ThrowIfDisposed() | ||
| { | ||
| if (_isDisposed) | ||
| { | ||
| throw new ObjectDisposedException(nameof(CoreServerSessionPool)); | ||
| } | ||
| } | ||
|
|
||
| // nested types | ||
| internal sealed class ReleaseOnDisposeCoreServerSession : WrappingCoreServerSession | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.