Skip to content

Commit adb3d8b

Browse files
authored
CSHARP-6004: Fix flaky Aggregate_with__out_includes_read_preference_for_5.0 test (#2025)
1 parent e811550 commit adb3d8b

4 files changed

Lines changed: 41 additions & 18 deletions

File tree

src/MongoDB.Driver/Core/Bindings/SingleServerReadBinding.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18+
using System.Net;
1819
using System.Threading.Tasks;
1920
using MongoDB.Driver.Core.Clusters;
2021
using MongoDB.Driver.Core.Clusters.ServerSelectors;
@@ -33,11 +34,10 @@ internal sealed class SingleServerReadBinding : IReadBinding
3334
private readonly IServerSelector _serverSelector;
3435
private readonly ICoreSessionHandle _session;
3536

36-
public SingleServerReadBinding(IClusterInternal cluster, IServer server, ReadPreference readPreference, ICoreSessionHandle session)
37+
public SingleServerReadBinding(IClusterInternal cluster, EndPoint serverEndpoint, ReadPreference readPreference, ICoreSessionHandle session)
3738
{
3839
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
39-
Ensure.IsNotNull(server, nameof(server));
40-
_serverSelector = new EndPointServerSelector(server.EndPoint);
40+
_serverSelector = new EndPointServerSelector(Ensure.IsNotNull(serverEndpoint, nameof(serverEndpoint)));
4141
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
4242
_session = Ensure.IsNotNull(session, nameof(session));
4343
}

src/MongoDB.Driver/GridFS/GridFSBucket.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ private IReadBindingHandle GetSingleServerReadBinding(OperationContext operation
10411041
var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
10421042
var selector = new ReadPreferenceServerSelector(readPreference);
10431043
var server = _cluster.SelectServer(operationContext, selector);
1044-
var binding = new SingleServerReadBinding(_cluster, server, readPreference, NoCoreSession.NewHandle());
1044+
var binding = new SingleServerReadBinding(_cluster, server.EndPoint, readPreference, NoCoreSession.NewHandle());
10451045
return new ReadBindingHandle(binding);
10461046
}
10471047

@@ -1050,7 +1050,7 @@ private async Task<IReadBindingHandle> GetSingleServerReadBindingAsync(Operation
10501050
var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
10511051
var selector = new ReadPreferenceServerSelector(readPreference);
10521052
var server = await _cluster.SelectServerAsync(operationContext, selector).ConfigureAwait(false);
1053-
var binding = new SingleServerReadBinding(_cluster, server, readPreference, NoCoreSession.NewHandle());
1053+
var binding = new SingleServerReadBinding(_cluster, server.EndPoint, readPreference, NoCoreSession.NewHandle());
10541054
return new ReadBindingHandle(binding);
10551055
}
10561056

tests/MongoDB.Driver.Tests/Core/Bindings/SingleServerReadBindingTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void constructor_should_initialize_instance()
3737
var readPreference = ReadPreference.Primary;
3838
var session = new Mock<ICoreSessionHandle>().Object;
3939

40-
var result = new SingleServerReadBinding(cluster, server, readPreference, session);
40+
var result = new SingleServerReadBinding(cluster, server.EndPoint, readPreference, session);
4141

4242
result._cluster().Should().BeSameAs(cluster);
4343
result._disposed().Should().BeFalse();
@@ -52,14 +52,14 @@ public void constructor_should_throw_when_cluster_is_null()
5252
var readPreference = ReadPreference.Primary;
5353
var session = new Mock<ICoreSessionHandle>().Object;
5454

55-
var exception = Record.Exception(() => new SingleServerReadBinding(null, server, readPreference, session));
55+
var exception = Record.Exception(() => new SingleServerReadBinding(null, server.EndPoint, readPreference, session));
5656

5757
var e = exception.Should().BeOfType<ArgumentNullException>().Subject;
5858
e.ParamName.Should().Be("cluster");
5959
}
6060

6161
[Fact]
62-
public void constructor_should_throw_when_server_is_null()
62+
public void constructor_should_throw_when_serverEndpoint_is_null()
6363
{
6464
var cluster = new Mock<IClusterInternal>().Object;
6565
var readPreference = ReadPreference.Primary;
@@ -68,7 +68,7 @@ public void constructor_should_throw_when_server_is_null()
6868
var exception = Record.Exception(() => new SingleServerReadBinding(cluster, null, readPreference, session));
6969

7070
var e = exception.Should().BeOfType<ArgumentNullException>().Subject;
71-
e.ParamName.Should().Be("server");
71+
e.ParamName.Should().Be("serverEndpoint");
7272
}
7373

7474
[Fact]
@@ -78,7 +78,7 @@ public void constructor_should_throw_when_readPreference_is_null()
7878
var server = CreateMockServer().Object;
7979
var session = new Mock<ICoreSessionHandle>().Object;
8080

81-
var exception = Record.Exception(() => new SingleServerReadBinding(cluster, server, null, session));
81+
var exception = Record.Exception(() => new SingleServerReadBinding(cluster, server.EndPoint, null, session));
8282

8383
var e = exception.Should().BeOfType<ArgumentNullException>().Subject;
8484
e.ParamName.Should().Be("readPreference");
@@ -91,7 +91,7 @@ public void constructor_should_throw_when_session_is_null()
9191
var server = CreateMockServer().Object;
9292
var readPreference = ReadPreference.Primary;
9393

94-
var exception = Record.Exception(() => new SingleServerReadBinding(cluster, server, readPreference, null));
94+
var exception = Record.Exception(() => new SingleServerReadBinding(cluster, server.EndPoint, readPreference, null));
9595

9696
var e = exception.Should().BeOfType<ArgumentNullException>().Subject;
9797
e.ParamName.Should().Be("session");
@@ -203,7 +203,7 @@ private SingleServerReadBinding CreateSubject(IClusterInternal cluster = null, I
203203

204204
return new SingleServerReadBinding(
205205
cluster ?? new Mock<IClusterInternal>().Object,
206-
server ?? CreateMockServer().Object,
206+
(server ?? CreateMockServer().Object).EndPoint,
207207
readPreference ?? ReadPreference.Primary,
208208
session);
209209
}

tests/MongoDB.Driver.Tests/UnifiedTestOperations/UnifiedTestRunner.cs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
using MongoDB.Bson;
2323
using MongoDB.Bson.TestHelpers.JsonDrivenTests;
2424
using MongoDB.Driver.Core;
25+
using MongoDB.Driver.Core.Bindings;
2526
using MongoDB.Driver.Core.Logging;
2627
using MongoDB.Driver.Core.Misc;
28+
using MongoDB.Driver.Core.Operations;
29+
using MongoDB.Driver.Core.Servers;
2730
using MongoDB.Driver.Core.TestHelpers.Logging;
2831
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
32+
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
2933
using MongoDB.Driver.Tests.UnifiedTestOperations.Matchers;
3034
using MongoDB.TestHelpers.XunitExtensions;
3135
using Xunit.Sdk;
@@ -85,7 +89,9 @@ public void Run(JsonDrivenTestCase testCase)
8589
var outcome = testCase.Test.GetValue("outcome", null)?.AsBsonArray;
8690
var async = testCase.Test["async"].AsBoolean; // cannot be null
8791

88-
Run(schemaVersion, testSetRunOnRequirements, entities, initialData, runOnRequirements, skipReason, operations, expectEvents, expectedLogs, expectTracingMessages, outcome, async);
92+
var validateInitialDataPropagation = testCase.Name.Contains("Aggregate with $out includes read preference for 5.0+ server");
93+
94+
Run(schemaVersion, testSetRunOnRequirements, entities, initialData, runOnRequirements, skipReason, operations, expectEvents, expectedLogs, expectTracingMessages, outcome, async, validateInitialDataPropagation);
8995
}
9096

9197
public void Run(
@@ -100,7 +106,8 @@ public void Run(
100106
BsonArray expectedLogs,
101107
BsonArray expectTracingMessages,
102108
BsonArray outcome,
103-
bool async)
109+
bool async,
110+
bool validateInitialDataPropagation)
104111
{
105112
if (_runHasBeenCalled)
106113
{
@@ -127,7 +134,7 @@ public void Run(
127134
throw new SkipException($"Test skipped because '{skipReason}'.");
128135
}
129136

130-
var lastKnownClusterTime = AddInitialData(DriverTestConfiguration.Client, initialData);
137+
var lastKnownClusterTime = AddInitialData(DriverTestConfiguration.Client, initialData, validateInitialDataPropagation);
131138
_entityMap = UnifiedEntityMap.Create(_eventFormatters, _loggingService.LoggingSettings, async, lastKnownClusterTime);
132139
_entityMap.AddRange(entities);
133140

@@ -176,7 +183,7 @@ public void Dispose()
176183
}
177184

178185
// private methods
179-
private BsonDocument AddInitialData(IMongoClient client, BsonArray initialData)
186+
private BsonDocument AddInitialData(IMongoClient client, BsonArray initialData, bool validateInitialDataPropagation)
180187
{
181188
if (initialData == null)
182189
{
@@ -210,7 +217,7 @@ private BsonDocument AddInitialData(IMongoClient client, BsonArray initialData)
210217
}
211218

212219
_logger.LogDebug("Dropping {0}", collectionName);
213-
using var session = client.StartSession();
220+
using var session = client.StartSession(new ClientSessionOptions { CausalConsistency = true });
214221

215222
// For some QE spec tests we need to drop QE state collections (enxcol_.*.esc, enxcol_.*.ecoc).
216223
// DropCollection with EncryptedFields automatically handles cleanup of those QE state collections
@@ -224,6 +231,22 @@ private BsonDocument AddInitialData(IMongoClient client, BsonArray initialData)
224231
collection.InsertMany(session, documents);
225232
}
226233

234+
// do read from all secondaries in case of replica set to make sure the data is available there already
235+
if (validateInitialDataPropagation)
236+
{
237+
var cluster = client.GetClusterInternal();
238+
if (DriverTestConfiguration.IsReplicaSet(cluster))
239+
{
240+
var collection = database.GetCollection<BsonDocument>(collectionName);
241+
var runCommandOperation = new CountOperation(collection.CollectionNamespace, new MessageEncoderSettings()) { ReadConcern = ReadConcern.Local, };
242+
foreach (var server in cluster.Description.Servers.Where(s => s.Type == ServerType.ReplicaSetSecondary))
243+
{
244+
using var singleServerBinding = new SingleServerReadBinding(cluster, server.EndPoint, ReadPreference.Secondary, session.WrappedCoreSession.Fork());
245+
runCommandOperation.Execute(OperationContext.NoTimeout, singleServerBinding);
246+
}
247+
}
248+
}
249+
227250
lastKnownClusterTime = session.ClusterTime;
228251
}
229252

@@ -422,4 +445,4 @@ private void KillOpenTransactions(IMongoClient client)
422445
}
423446

424447
}
425-
}
448+
}

0 commit comments

Comments
 (0)