From c585253e30c0f8d4f61596e10a07fa069a7d1af4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 20:07:00 +0000 Subject: [PATCH 1/4] Initial plan From 8b57e6912f344ca3e09060ce5cf22c344abda559 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 20:13:15 +0000 Subject: [PATCH 2/4] Add BatchSize parameter to MongoDB source to prevent cursor timeout errors Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../MongoSourceSettingsTests.cs | 52 +++++++++++++++++++ .../IRepository.cs | 2 +- .../MongoDataSourceExtension.cs | 15 ++++-- .../MongoRepository.cs | 10 +++- .../Settings/MongoSourceSettings.cs | 7 +++ Extensions/Mongo/README.md | 25 ++++++++- 6 files changed, 102 insertions(+), 9 deletions(-) diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension.UnitTests/MongoSourceSettingsTests.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension.UnitTests/MongoSourceSettingsTests.cs index 621ff3a..96dcf24 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension.UnitTests/MongoSourceSettingsTests.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension.UnitTests/MongoSourceSettingsTests.cs @@ -54,4 +54,56 @@ public void MongoSourceSettings_ShouldAllowEmptyQuery() // Assert Assert.AreEqual(string.Empty, settings.Query); } + + [TestMethod] + public void MongoSourceSettings_ShouldHaveBatchSizeProperty() + { + // Arrange & Act + var settings = new MongoSourceSettings(); + + // Assert + Assert.IsNull(settings.BatchSize); + } + + [TestMethod] + public void MongoSourceSettings_ShouldAllowBatchSizeToBeSet() + { + // Arrange + var settings = new MongoSourceSettings(); + var testBatchSize = 1000; + + // Act + settings.BatchSize = testBatchSize; + + // Assert + Assert.AreEqual(testBatchSize, settings.BatchSize); + } + + [TestMethod] + public void MongoSourceSettings_ShouldAllowNullBatchSize() + { + // Arrange + var settings = new MongoSourceSettings(); + + // Act + settings.BatchSize = null; + + // Assert + Assert.IsNull(settings.BatchSize); + } + + [TestMethod] + public void MongoSourceSettings_ShouldAcceptPositiveBatchSize() + { + // Arrange + var settings = new MongoSourceSettings(); + var positiveBatchSizes = new[] { 1, 100, 1000, 10000 }; + + // Act & Assert + foreach (var batchSize in positiveBatchSizes) + { + settings.BatchSize = batchSize; + Assert.AreEqual(batchSize, settings.BatchSize); + } + } } \ No newline at end of file diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs index a5db89b..3fde698 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs @@ -12,5 +12,5 @@ public interface IRepository ValueTask Remove(Expression> filter); ValueTask RemoveRange(Expression> filter); IQueryable AsQueryable(); - IAsyncEnumerable FindAsync(FilterDefinition filter); + IAsyncEnumerable FindAsync(FilterDefinition filter, int? batchSize = null); } diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs index ba4959d..f5e3419 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs @@ -28,7 +28,7 @@ public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogge foreach (var collection in collectionNames) { - await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, logger).WithCancellation(cancellationToken)) + await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, settings.BatchSize, logger).WithCancellation(cancellationToken)) { yield return item; } @@ -36,7 +36,7 @@ public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogge } } - public async IAsyncEnumerable EnumerateCollectionAsync(Context context, string collectionName, string? query, ILogger logger) + public async IAsyncEnumerable EnumerateCollectionAsync(Context context, string collectionName, string? query, int? batchSize, ILogger logger) { logger.LogInformation("Reading collection '{Collection}'", collectionName); var collection = context.GetRepository(collectionName); @@ -47,7 +47,7 @@ public async IAsyncEnumerable EnumerateCollectionAsync(Context contex if (!string.IsNullOrWhiteSpace(query)) { logger.LogInformation("Applying query filter to collection '{Collection}': {Query}", collectionName, query); - documents = GetQueryDocumentsAsync(collection, query, collectionName, logger); + documents = GetQueryDocumentsAsync(collection, query, collectionName, batchSize, logger); } else { @@ -76,7 +76,7 @@ private async IAsyncEnumerable GetAllDocumentsAsync(IRepository GetQueryDocumentsAsync(IRepository collection, string query, string collectionName, ILogger logger) + private async IAsyncEnumerable GetQueryDocumentsAsync(IRepository collection, string query, string collectionName, int? batchSize, ILogger logger) { // Handle query as either a file path or direct JSON string queryJson; @@ -113,7 +113,12 @@ private async IAsyncEnumerable GetQueryDocumentsAsync(IRepository< var filter = new BsonDocumentFilterDefinition(filterDocument); - await foreach (var record in collection.FindAsync(filter)) + if (batchSize.HasValue) + { + logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName); + } + + await foreach (var record in collection.FindAsync(filter, batchSize)) { yield return record; } diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs index 646bff2..afdffc9 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs @@ -47,9 +47,15 @@ public IQueryable AsQueryable() return collection.AsQueryable(); } - public async IAsyncEnumerable FindAsync(FilterDefinition filter) + public async IAsyncEnumerable FindAsync(FilterDefinition filter, int? batchSize = null) { - using var cursor = await collection.FindAsync(filter); + var findOptions = new FindOptions(); + if (batchSize.HasValue && batchSize.Value > 0) + { + findOptions.BatchSize = batchSize.Value; + } + + using var cursor = await collection.FindAsync(filter, findOptions); while (await cursor.MoveNextAsync()) { foreach (var document in cursor.Current) diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs index 3cbf15a..d3395d4 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs @@ -12,6 +12,13 @@ public class MongoSourceSettings : MongoBaseSettings /// public string? Query { get; set; } + /// + /// The number of documents to return per batch when reading from MongoDB. + /// This can help prevent cursor timeout errors when reading large collections. + /// If not specified, MongoDB's default batch size will be used. + /// + public int? BatchSize { get; set; } + [SensitiveValue] public Dictionary>? KMSProviders { get; set; } diff --git a/Extensions/Mongo/README.md b/Extensions/Mongo/README.md index 0da089c..3054181 100644 --- a/Extensions/Mongo/README.md +++ b/Extensions/Mongo/README.md @@ -17,7 +17,30 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para "ConnectionString": "", "DatabaseName": "", "Collection": "", - "Query": "" + "Query": "", + "BatchSize": 1000 +} +``` + +#### BatchSize Parameter + +The `BatchSize` parameter controls the number of documents returned per batch when reading from MongoDB. This is particularly useful for: +- Preventing cursor timeout errors when reading large collections (e.g., collections with 250k+ documents) +- Managing memory usage during data migration +- Improving performance in high-latency network environments + +**Default Behavior:** +- If `BatchSize` is not specified, MongoDB's default batch size will be used +- Recommended value: 1000 for large collections to prevent cursor timeouts +- Can be adjusted based on document size and network conditions + +**Example with BatchSize:** +```json +{ + "ConnectionString": "mongodb://localhost:27017", + "DatabaseName": "sales", + "Collection": "person", + "BatchSize": 1000 } ``` From e0f4cc2bff9e8ba3a41a61ec2976bed41369b958 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 20:15:29 +0000 Subject: [PATCH 3/4] Fix BatchSize support for collections without query filters Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../MongoDataSourceExtension.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs index f5e3419..db0ee12 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs @@ -52,8 +52,7 @@ public async IAsyncEnumerable EnumerateCollectionAsync(Context contex else { logger.LogInformation("No query filter specified for collection '{Collection}', reading all documents", collectionName); - // Use existing queryable approach when no filter is specified - documents = GetAllDocumentsAsync(collection); + documents = GetAllDocumentsAsync(collection, batchSize, logger, collectionName); } await foreach (var record in documents) @@ -68,9 +67,16 @@ public async IAsyncEnumerable EnumerateCollectionAsync(Context contex logger.LogWarning("No items read from collection '{Collection}'", collectionName); } - private async IAsyncEnumerable GetAllDocumentsAsync(IRepository collection) + private async IAsyncEnumerable GetAllDocumentsAsync(IRepository collection, int? batchSize, ILogger logger, string collectionName) { - foreach (var record in await Task.Run(() => collection.AsQueryable())) + if (batchSize.HasValue) + { + logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName); + } + + // Use FindAsync with empty filter to support BatchSize + var emptyFilter = Builders.Filter.Empty; + await foreach (var record in collection.FindAsync(emptyFilter, batchSize)) { yield return record; } From be43f2e729697a9a50e20e9a288850828238f1c1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 20:16:51 +0000 Subject: [PATCH 4/4] Refactor: Extract duplicate logging and add validation comment Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../MongoDataSourceExtension.cs | 18 ++++++++++-------- .../MongoRepository.cs | 2 ++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs index db0ee12..be73458 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs @@ -69,10 +69,7 @@ public async IAsyncEnumerable EnumerateCollectionAsync(Context contex private async IAsyncEnumerable GetAllDocumentsAsync(IRepository collection, int? batchSize, ILogger logger, string collectionName) { - if (batchSize.HasValue) - { - logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName); - } + LogBatchSizeIfSpecified(batchSize, collectionName, logger); // Use FindAsync with empty filter to support BatchSize var emptyFilter = Builders.Filter.Empty; @@ -119,10 +116,7 @@ private async IAsyncEnumerable GetQueryDocumentsAsync(IRepository< var filter = new BsonDocumentFilterDefinition(filterDocument); - if (batchSize.HasValue) - { - logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName); - } + LogBatchSizeIfSpecified(batchSize, collectionName, logger); await foreach (var record in collection.FindAsync(filter, batchSize)) { @@ -130,6 +124,14 @@ private async IAsyncEnumerable GetQueryDocumentsAsync(IRepository< } } + private void LogBatchSizeIfSpecified(int? batchSize, string collectionName, ILogger logger) + { + if (batchSize.HasValue) + { + logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName); + } + } + public IEnumerable GetSettings() { yield return new MongoSourceSettings(); diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs index afdffc9..2d85f61 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs @@ -50,6 +50,8 @@ public IQueryable AsQueryable() public async IAsyncEnumerable FindAsync(FilterDefinition filter, int? batchSize = null) { var findOptions = new FindOptions(); + // Only apply batch size if it's provided and positive; invalid values are silently ignored + // to maintain backward compatibility and prevent exceptions during data migration if (batchSize.HasValue && batchSize.Value > 0) { findOptions.BatchSize = batchSize.Value;