Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
using MongoDB.Bson;
using MongoDB.Driver;
using Moq;

namespace Cosmos.DataTransfer.MongoExtension.UnitTests;

[TestClass]
public class MongoRepositoryTests
{
[TestMethod]
public async Task FindAsync_WithPositiveBatchSize_PassesBatchSizeToDriver()
{
// Arrange
var mockCollection = new Mock<IMongoCollection<BsonDocument>>();
var mockCursor = new Mock<IAsyncCursor<BsonDocument>>();

FindOptions<BsonDocument, BsonDocument>? capturedOptions = null;

mockCollection
.Setup(c => c.FindAsync(
It.IsAny<FilterDefinition<BsonDocument>>(),
It.IsAny<FindOptions<BsonDocument, BsonDocument>>(),
It.IsAny<CancellationToken>()))
.Callback<FilterDefinition<BsonDocument>, FindOptions<BsonDocument, BsonDocument>, CancellationToken>(
(filter, options, ct) => capturedOptions = options)
.ReturnsAsync(mockCursor.Object);

mockCursor.SetupSequence(c => c.MoveNextAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(true)
.ReturnsAsync(false);

mockCursor.Setup(c => c.Current).Returns(new List<BsonDocument> { new BsonDocument() });
mockCursor.Setup(c => c.Dispose());

var repository = new MongoRepository<BsonDocument>(mockCollection.Object);
var filter = Builders<BsonDocument>.Filter.Empty;
var batchSize = 1000;

// Act
var results = new List<BsonDocument>();
await foreach (var doc in repository.FindAsync(filter, batchSize))
{
results.Add(doc);
}

// Assert
Assert.IsNotNull(capturedOptions);
Assert.AreEqual(batchSize, capturedOptions.BatchSize);
Assert.AreEqual(1, results.Count);
}

[TestMethod]
public async Task FindAsync_WithNullBatchSize_DoesNotSetBatchSizeOption()
{
// Arrange
var mockCollection = new Mock<IMongoCollection<BsonDocument>>();
var mockCursor = new Mock<IAsyncCursor<BsonDocument>>();

FindOptions<BsonDocument, BsonDocument>? capturedOptions = null;

mockCollection
.Setup(c => c.FindAsync(
It.IsAny<FilterDefinition<BsonDocument>>(),
It.IsAny<FindOptions<BsonDocument, BsonDocument>>(),
It.IsAny<CancellationToken>()))
.Callback<FilterDefinition<BsonDocument>, FindOptions<BsonDocument, BsonDocument>, CancellationToken>(
(filter, options, ct) => capturedOptions = options)
.ReturnsAsync(mockCursor.Object);

mockCursor.SetupSequence(c => c.MoveNextAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(true)
.ReturnsAsync(false);

mockCursor.Setup(c => c.Current).Returns(new List<BsonDocument> { new BsonDocument() });
mockCursor.Setup(c => c.Dispose());

var repository = new MongoRepository<BsonDocument>(mockCollection.Object);
var filter = Builders<BsonDocument>.Filter.Empty;

// Act
var results = new List<BsonDocument>();
await foreach (var doc in repository.FindAsync(filter, null))
{
results.Add(doc);
}

// Assert
Assert.IsNotNull(capturedOptions);
Assert.IsNull(capturedOptions.BatchSize);
Assert.AreEqual(1, results.Count);
}

[TestMethod]
public async Task FindAsync_WithZeroBatchSize_DoesNotSetBatchSizeOption()
{
// Arrange
var mockCollection = new Mock<IMongoCollection<BsonDocument>>();
var mockCursor = new Mock<IAsyncCursor<BsonDocument>>();

FindOptions<BsonDocument, BsonDocument>? capturedOptions = null;

mockCollection
.Setup(c => c.FindAsync(
It.IsAny<FilterDefinition<BsonDocument>>(),
It.IsAny<FindOptions<BsonDocument, BsonDocument>>(),
It.IsAny<CancellationToken>()))
.Callback<FilterDefinition<BsonDocument>, FindOptions<BsonDocument, BsonDocument>, CancellationToken>(
(filter, options, ct) => capturedOptions = options)
.ReturnsAsync(mockCursor.Object);

mockCursor.SetupSequence(c => c.MoveNextAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(true)
.ReturnsAsync(false);

mockCursor.Setup(c => c.Current).Returns(new List<BsonDocument> { new BsonDocument() });
mockCursor.Setup(c => c.Dispose());

var repository = new MongoRepository<BsonDocument>(mockCollection.Object);
var filter = Builders<BsonDocument>.Filter.Empty;

// Act
var results = new List<BsonDocument>();
await foreach (var doc in repository.FindAsync(filter, 0))
{
results.Add(doc);
}

// Assert
Assert.IsNotNull(capturedOptions);
Assert.IsNull(capturedOptions.BatchSize);
Assert.AreEqual(1, results.Count);
}

[TestMethod]
public async Task FindAsync_WithNegativeBatchSize_DoesNotSetBatchSizeOption()
{
// Arrange
var mockCollection = new Mock<IMongoCollection<BsonDocument>>();
var mockCursor = new Mock<IAsyncCursor<BsonDocument>>();

FindOptions<BsonDocument, BsonDocument>? capturedOptions = null;

mockCollection
.Setup(c => c.FindAsync(
It.IsAny<FilterDefinition<BsonDocument>>(),
It.IsAny<FindOptions<BsonDocument, BsonDocument>>(),
It.IsAny<CancellationToken>()))
.Callback<FilterDefinition<BsonDocument>, FindOptions<BsonDocument, BsonDocument>, CancellationToken>(
(filter, options, ct) => capturedOptions = options)
.ReturnsAsync(mockCursor.Object);

mockCursor.SetupSequence(c => c.MoveNextAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(true)
.ReturnsAsync(false);

mockCursor.Setup(c => c.Current).Returns(new List<BsonDocument> { new BsonDocument() });
mockCursor.Setup(c => c.Dispose());

var repository = new MongoRepository<BsonDocument>(mockCollection.Object);
var filter = Builders<BsonDocument>.Filter.Empty;

// Act
var results = new List<BsonDocument>();
await foreach (var doc in repository.FindAsync(filter, -1))
{
results.Add(doc);
}

// Assert
Assert.IsNotNull(capturedOptions);
Assert.IsNull(capturedOptions.BatchSize);
Assert.AreEqual(1, results.Count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,56 @@ public void MongoSourceSettings_ShouldAllowEmptyQuery()
// Assert
Assert.AreEqual(string.Empty, settings.Query);
}

[TestMethod]
public void MongoSourceSettings_ShouldHaveBatchSizeProperty()
Comment thread
philnach marked this conversation as resolved.
{
// Arrange & Act
var settings = new MongoSourceSettings();

// Assert
Assert.IsNull(settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAllowBatchSizeToBeSet()
{
// Arrange
var settings = new MongoSourceSettings();
var testBatchSize = 1000;

// Act
settings.BatchSize = testBatchSize;

// Assert
Assert.AreEqual(testBatchSize, settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAllowNullBatchSize()
{
// Arrange
var settings = new MongoSourceSettings();

// Act
settings.BatchSize = null;

// Assert
Assert.IsNull(settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAcceptPositiveBatchSize()
{
// Arrange
var settings = new MongoSourceSettings();
var positiveBatchSizes = new[] { 1, 100, 1000, 10000 };

// Act & Assert
foreach (var batchSize in positiveBatchSizes)
{
settings.BatchSize = batchSize;
Assert.AreEqual(batchSize, settings.BatchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public interface IRepository<TDocument>
ValueTask Remove(Expression<Func<TDocument, bool>> filter);
ValueTask RemoveRange(Expression<Func<TDocument, bool>> filter);
IQueryable<TDocument> AsQueryable();
IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter);
IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter, int? batchSize = null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge

foreach (var collection in collectionNames)
{
await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, logger).WithCancellation(cancellationToken))
await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, settings.BatchSize, logger).WithCancellation(cancellationToken))
{
yield return item;
}
}
}
}

public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, string? query, ILogger logger)
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, string? query, int? batchSize, ILogger logger)
{
logger.LogInformation("Reading collection '{Collection}'", collectionName);
var collection = context.GetRepository<BsonDocument>(collectionName);
Expand All @@ -47,13 +47,12 @@ public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context contex
if (!string.IsNullOrWhiteSpace(query))
{
logger.LogInformation("Applying query filter to collection '{Collection}': {Query}", collectionName, query);
documents = GetQueryDocumentsAsync(collection, query, collectionName, logger);
documents = GetQueryDocumentsAsync(collection, query, collectionName, batchSize, logger);
}
else
{
logger.LogInformation("No query filter specified for collection '{Collection}', reading all documents", collectionName);
// Use existing queryable approach when no filter is specified
documents = GetAllDocumentsAsync(collection);
documents = GetAllDocumentsAsync(collection, batchSize, logger, collectionName);
}
Comment thread
philnach marked this conversation as resolved.

await foreach (var record in documents)
Expand All @@ -68,15 +67,19 @@ public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context contex
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
}

private async IAsyncEnumerable<BsonDocument> GetAllDocumentsAsync(IRepository<BsonDocument> collection)
private async IAsyncEnumerable<BsonDocument> GetAllDocumentsAsync(IRepository<BsonDocument> collection, int? batchSize, ILogger logger, string collectionName)
{
foreach (var record in await Task.Run(() => collection.AsQueryable()))
LogBatchSizeIfSpecified(batchSize, collectionName, logger);

// Use FindAsync with empty filter to support BatchSize
var emptyFilter = Builders<BsonDocument>.Filter.Empty;
await foreach (var record in collection.FindAsync(emptyFilter, batchSize))
{
yield return record;
}
}

private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<BsonDocument> collection, string query, string collectionName, ILogger logger)
private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<BsonDocument> collection, string query, string collectionName, int? batchSize, ILogger logger)
{
// Handle query as either a file path or direct JSON
string queryJson;
Expand Down Expand Up @@ -113,12 +116,29 @@ private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<

var filter = new BsonDocumentFilterDefinition<BsonDocument>(filterDocument);

await foreach (var record in collection.FindAsync(filter))
LogBatchSizeIfSpecified(batchSize, collectionName, logger);

await foreach (var record in collection.FindAsync(filter, batchSize))
{
yield return record;
}
}

private void LogBatchSizeIfSpecified(int? batchSize, string collectionName, ILogger logger)
{
if (batchSize.HasValue)
{
if (batchSize.Value <= 0)
{
logger.LogWarning("Ignoring invalid BatchSize {BatchSize} for collection '{Collection}'; must be > 0. Cursor timeout issues may occur on large collections.", batchSize.Value, collectionName);
}
else
{
logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName);
}
}
}

public IEnumerable<IDataExtensionSettings> GetSettings()
{
yield return new MongoSourceSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ public IQueryable<TDocument> AsQueryable()
return collection.AsQueryable();
}

public async IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter)
public async IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter, int? batchSize = null)
{
using var cursor = await collection.FindAsync(filter);
var findOptions = new FindOptions<TDocument, TDocument>();
if (batchSize.HasValue && batchSize.Value > 0)
Comment thread
philnach marked this conversation as resolved.
{
findOptions.BatchSize = batchSize.Value;
}

using var cursor = await collection.FindAsync(filter, findOptions);
while (await cursor.MoveNextAsync())
{
foreach (var document in cursor.Current)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ public class MongoSourceSettings : MongoBaseSettings
/// </summary>
public string? Query { get; set; }

/// <summary>
/// The number of documents to return per batch when reading from MongoDB.
/// This can help prevent cursor timeout errors when reading large collections.
/// If not specified, MongoDB's default batch size will be used.
/// </summary>
public int? BatchSize { get; set; }

[SensitiveValue]
public Dictionary<string, IReadOnlyDictionary<string, object>>? KMSProviders { get; set; }

Expand Down
25 changes: 25 additions & 0 deletions Extensions/Mongo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para
}
Comment thread
philnach marked this conversation as resolved.
```

#### BatchSize Parameter

The `BatchSize` parameter controls the number of documents returned per batch when reading from MongoDB. This is particularly useful for:
- Preventing cursor timeout errors when reading large collections (e.g., collections with 250k+ documents)
- Managing memory usage during data migration
- Improving performance in high-latency network environments

**How it works:**
`BatchSize` *mitigates* cursor timeouts by keeping the cursor active between fetches (smaller batches reset the cursor's idle timer), but does not completely disable timeouts. The MongoDB `NoCursorTimeout` option would be the actual disable switch, but it is often not honored on Azure Cosmos DB for MongoDB, which is why `BatchSize` is the recommended workaround.

**Default Behavior:**
- If `BatchSize` is not specified, MongoDB's default batch size will be used
- Recommended value: 1000 for large collections to prevent cursor timeouts
- Can be adjusted based on document size and network conditions

**Example with BatchSize:**
```json
{
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "sales",
Comment thread
philnach marked this conversation as resolved.
"Collection": "person",
"BatchSize": 1000
}
```

#### Query Parameter

The `Query` parameter allows you to filter documents during data migration using MongoDB query syntax in JSON format. This parameter supports two input methods:
Expand Down
Loading