Skip to content

Commit 7faf466

Browse files
committed
CSHARP-3556: Reconsider server pinning in GridFS
1 parent 5e6ca7a commit 7faf466

5 files changed

Lines changed: 220 additions & 270 deletions

File tree

src/MongoDB.Driver/Core/Bindings/SingleServerReadBinding.cs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2013-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -15,27 +15,31 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18-
using System.Threading;
1918
using System.Threading.Tasks;
19+
using MongoDB.Driver.Core.Clusters;
20+
using MongoDB.Driver.Core.Clusters.ServerSelectors;
2021
using MongoDB.Driver.Core.Misc;
2122
using MongoDB.Driver.Core.Servers;
2223

2324
namespace MongoDB.Driver.Core.Bindings
2425
{
2526
internal sealed class SingleServerReadBinding : IReadBinding
2627
{
28+
#pragma warning disable CA2213 // Disposable fields should be disposed
29+
private readonly IClusterInternal _cluster;
30+
#pragma warning restore CA2213 // Disposable fields should be disposed
2731
private bool _disposed;
2832
private readonly ReadPreference _readPreference;
29-
private readonly IServer _server;
30-
private readonly TimeSpan _serverSelectionTimeout;
33+
private readonly IServerSelector _serverSelector;
3134
private readonly ICoreSessionHandle _session;
3235

33-
public SingleServerReadBinding(IServer server, ReadPreference readPreference, ICoreSessionHandle session, TimeSpan serverSelectionTimeout)
36+
public SingleServerReadBinding(IClusterInternal cluster, IServer server, ReadPreference readPreference, ICoreSessionHandle session)
3437
{
35-
_server = Ensure.IsNotNull(server, nameof(server));
38+
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
39+
Ensure.IsNotNull(server, nameof(server));
40+
_serverSelector = new EndPointServerSelector(server.EndPoint);
3641
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
3742
_session = Ensure.IsNotNull(session, nameof(session));
38-
_serverSelectionTimeout = Ensure.IsGreaterThanOrEqualToZero(serverSelectionTimeout, nameof(serverSelectionTimeout));
3943
}
4044

4145
public ReadPreference ReadPreference
@@ -51,24 +55,22 @@ public ICoreSessionHandle Session
5155
public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext)
5256
{
5357
ThrowIfDisposed();
54-
return GetChannelSourceHelper(operationContext);
58+
var server = _cluster.SelectServer(operationContext, _serverSelector);
59+
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
5560
}
5661

57-
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext)
62+
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext)
5863
{
5964
ThrowIfDisposed();
60-
return Task.FromResult(GetChannelSourceHelper(operationContext));
65+
var server = await _cluster.SelectServerAsync(operationContext, _serverSelector).ConfigureAwait(false);
66+
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
6167
}
6268

63-
public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
64-
{
65-
return GetReadChannelSource(operationContext);
66-
}
69+
public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) =>
70+
GetReadChannelSource(operationContext);
6771

68-
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
69-
{
70-
return GetReadChannelSourceAsync(operationContext);
71-
}
72+
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) =>
73+
GetReadChannelSourceAsync(operationContext);
7274

7375
public void Dispose()
7476
{
@@ -79,31 +81,6 @@ public void Dispose()
7981
}
8082
}
8183

82-
private IChannelSourceHandle GetChannelSourceHelper(OperationContext operationContext)
83-
{
84-
// server might be in unknown state due to previous failed operation, allow description to be updated
85-
// this is done instead of server selection
86-
// TODO parameterize timeout and avoid busy wait, or offload waiting to server level
87-
// Should be addressed by CSHARP-3556
88-
if (_server.Description.State == ServerState.Disconnected)
89-
{
90-
using var serverSelectionContext = operationContext.WithTimeout(_serverSelectionTimeout);
91-
92-
if (SpinWait.SpinUntil(() =>
93-
_server.Description.State == ServerState.Connected || serverSelectionContext.CancellationToken.IsCancellationRequested,
94-
serverSelectionContext.RemainingTimeout))
95-
{
96-
serverSelectionContext.CancellationToken.ThrowIfCancellationRequested();
97-
}
98-
else
99-
{
100-
throw new TimeoutException($"A timeout occurred after {serverSelectionContext.Elapsed.TotalMilliseconds}ms waiting for the server to become available.");
101-
}
102-
}
103-
104-
return new ChannelSourceHandle(new ServerChannelSource(_server, _session.Fork()));
105-
}
106-
10784
private void ThrowIfDisposed()
10885
{
10986
if (_disposed)

src/MongoDB.Driver/Core/Clusters/ServerSelectors/EndPointServerSelector.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2013-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -39,6 +39,11 @@ public EndPointServerSelector(EndPoint endPoint)
3939
_endPoint = Ensure.IsNotNull(endPoint, nameof(endPoint));
4040
}
4141

42+
/// <summary>
43+
/// Gets the endpoint associated with the server selector.
44+
/// </summary>
45+
public EndPoint EndPoint => _endPoint;
46+
4247
// methods
4348
/// <inheritdoc/>
4449
public IEnumerable<ServerDescription> SelectServers(ClusterDescription cluster, IEnumerable<ServerDescription> servers)

src/MongoDB.Driver/GridFS/GridFSBucket.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2016-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -1041,7 +1041,7 @@ private IReadBindingHandle GetSingleServerReadBinding(OperationContext operation
10411041
var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
10421042
var selector = new ReadPreferenceServerSelector(readPreference);
10431043
var server = _cluster.SelectServer(operationContext, selector);
1044-
var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle(), _cluster.Settings.ServerSelectionTimeout);
1044+
var binding = new SingleServerReadBinding(_cluster, server, readPreference, NoCoreSession.NewHandle());
10451045
return new ReadBindingHandle(binding);
10461046
}
10471047

@@ -1050,7 +1050,7 @@ private async Task<IReadBindingHandle> GetSingleServerReadBindingAsync(Operation
10501050
var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
10511051
var selector = new ReadPreferenceServerSelector(readPreference);
10521052
var server = await _cluster.SelectServerAsync(operationContext, selector).ConfigureAwait(false);
1053-
var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle(), _cluster.Settings.ServerSelectionTimeout);
1053+
var binding = new SingleServerReadBinding(_cluster, server, readPreference, NoCoreSession.NewHandle());
10541054
return new ReadBindingHandle(binding);
10551055
}
10561056

0 commit comments

Comments
 (0)