Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,56 @@ public void MongoSourceSettings_ShouldAllowEmptyQuery()
// Assert
Assert.AreEqual(string.Empty, settings.Query);
}

[TestMethod]
public void MongoSourceSettings_ShouldHaveBatchSizeProperty()
{
// Arrange & Act
var settings = new MongoSourceSettings();

// Assert
Assert.IsNull(settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAllowBatchSizeToBeSet()
{
// Arrange
var settings = new MongoSourceSettings();
var testBatchSize = 1000;

// Act
settings.BatchSize = testBatchSize;

// Assert
Assert.AreEqual(testBatchSize, settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAllowNullBatchSize()
{
// Arrange
var settings = new MongoSourceSettings();

// Act
settings.BatchSize = null;

// Assert
Assert.IsNull(settings.BatchSize);
}

[TestMethod]
public void MongoSourceSettings_ShouldAcceptPositiveBatchSize()
{
// Arrange
var settings = new MongoSourceSettings();
var positiveBatchSizes = new[] { 1, 100, 1000, 10000 };

// Act & Assert
foreach (var batchSize in positiveBatchSizes)
{
settings.BatchSize = batchSize;
Assert.AreEqual(batchSize, settings.BatchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public interface IRepository<TDocument>
ValueTask Remove(Expression<Func<TDocument, bool>> filter);
ValueTask RemoveRange(Expression<Func<TDocument, bool>> filter);
IQueryable<TDocument> AsQueryable();
IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter);
IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter, int? batchSize = null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge

foreach (var collection in collectionNames)
{
await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, logger).WithCancellation(cancellationToken))
await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, settings.BatchSize, logger).WithCancellation(cancellationToken))
{
yield return item;
}
}
}
}

public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, string? query, ILogger logger)
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, string? query, int? batchSize, ILogger logger)
{
logger.LogInformation("Reading collection '{Collection}'", collectionName);
var collection = context.GetRepository<BsonDocument>(collectionName);
Expand All @@ -47,13 +47,12 @@ public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context contex
if (!string.IsNullOrWhiteSpace(query))
{
logger.LogInformation("Applying query filter to collection '{Collection}': {Query}", collectionName, query);
documents = GetQueryDocumentsAsync(collection, query, collectionName, logger);
documents = GetQueryDocumentsAsync(collection, query, collectionName, batchSize, logger);
}
else
{
logger.LogInformation("No query filter specified for collection '{Collection}', reading all documents", collectionName);
// Use existing queryable approach when no filter is specified
documents = GetAllDocumentsAsync(collection);
documents = GetAllDocumentsAsync(collection, batchSize, logger, collectionName);
}

await foreach (var record in documents)
Expand All @@ -68,15 +67,19 @@ public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context contex
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
}

private async IAsyncEnumerable<BsonDocument> GetAllDocumentsAsync(IRepository<BsonDocument> collection)
private async IAsyncEnumerable<BsonDocument> GetAllDocumentsAsync(IRepository<BsonDocument> collection, int? batchSize, ILogger logger, string collectionName)
{
foreach (var record in await Task.Run(() => collection.AsQueryable()))
LogBatchSizeIfSpecified(batchSize, collectionName, logger);

// Use FindAsync with empty filter to support BatchSize
var emptyFilter = Builders<BsonDocument>.Filter.Empty;
await foreach (var record in collection.FindAsync(emptyFilter, batchSize))
{
yield return record;
}
}

private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<BsonDocument> collection, string query, string collectionName, ILogger logger)
private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<BsonDocument> collection, string query, string collectionName, int? batchSize, ILogger logger)
{
// Handle query as either a file path or direct JSON
string queryJson;
Expand Down Expand Up @@ -113,12 +116,22 @@ private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<

var filter = new BsonDocumentFilterDefinition<BsonDocument>(filterDocument);

await foreach (var record in collection.FindAsync(filter))
LogBatchSizeIfSpecified(batchSize, collectionName, logger);

await foreach (var record in collection.FindAsync(filter, batchSize))
{
yield return record;
}
}

private void LogBatchSizeIfSpecified(int? batchSize, string collectionName, ILogger logger)
{
if (batchSize.HasValue)
{
logger.LogInformation("Using batch size of {BatchSize} for collection '{Collection}'", batchSize.Value, collectionName);
}
}

public IEnumerable<IDataExtensionSettings> GetSettings()
{
yield return new MongoSourceSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,17 @@ public IQueryable<TDocument> AsQueryable()
return collection.AsQueryable();
}

public async IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter)
public async IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter, int? batchSize = null)
{
using var cursor = await collection.FindAsync(filter);
var findOptions = new FindOptions<TDocument, TDocument>();
// Only apply batch size if it's provided and positive; invalid values are silently ignored
// to maintain backward compatibility and prevent exceptions during data migration
if (batchSize.HasValue && batchSize.Value > 0)
{
findOptions.BatchSize = batchSize.Value;
}

using var cursor = await collection.FindAsync(filter, findOptions);
while (await cursor.MoveNextAsync())
{
foreach (var document in cursor.Current)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ public class MongoSourceSettings : MongoBaseSettings
/// </summary>
public string? Query { get; set; }

/// <summary>
/// The number of documents to return per batch when reading from MongoDB.
/// This can help prevent cursor timeout errors when reading large collections.
/// If not specified, MongoDB's default batch size will be used.
/// </summary>
public int? BatchSize { get; set; }

[SensitiveValue]
public Dictionary<string, IReadOnlyDictionary<string, object>>? KMSProviders { get; set; }

Expand Down
25 changes: 24 additions & 1 deletion Extensions/Mongo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,30 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para
"ConnectionString": "",
"DatabaseName": "",
"Collection": "",
"Query": ""
"Query": "",
"BatchSize": 1000
}
```

#### BatchSize Parameter

The `BatchSize` parameter controls the number of documents returned per batch when reading from MongoDB. This is particularly useful for:
- Preventing cursor timeout errors when reading large collections (e.g., collections with 250k+ documents)
- Managing memory usage during data migration
- Improving performance in high-latency network environments

**Default Behavior:**
- If `BatchSize` is not specified, MongoDB's default batch size will be used
- Recommended value: 1000 for large collections to prevent cursor timeouts
- Can be adjusted based on document size and network conditions

**Example with BatchSize:**
```json
{
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "sales",
"Collection": "person",
"BatchSize": 1000
}
```

Expand Down
Loading