From 766823e5e1e4c744f5071672a705e0d93263f2b2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:49:30 +0000 Subject: [PATCH 1/5] Initial plan From cfda4ac60b0cefb8450e3ab8814ce1b0bf3e93e1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:57:27 +0000 Subject: [PATCH 2/5] Add documentation and tests confirming multi-account Cosmos DB support Co-authored-by: markjbrown <800166+markjbrown@users.noreply.github.com> --- ExampleConfigs.md | 29 +++ .../CosmosMultiAccountSupportTests.cs | 210 ++++++++++++++++++ Extensions/Cosmos/README.md | 17 ++ README.md | 7 + 4 files changed, 263 insertions(+) create mode 100644 Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs diff --git a/ExampleConfigs.md b/ExampleConfigs.md index d67810c..d402fbd 100644 --- a/ExampleConfigs.md +++ b/ExampleConfigs.md @@ -1,5 +1,34 @@ # Example `migrationsettings.json` Files +## Cosmos-NoSQL to Cosmos-NoSQL (Different Accounts) + +The tool supports simultaneous connections to two different Cosmos DB accounts, allowing you to migrate data directly from one account to another. + +```json +{ + "Source": "Cosmos-nosql", + "Sink": "Cosmos-nosql", + "SourceSettings": { + "ConnectionString": "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=;", + "Database": "sourceDatabase", + "Container": "sourceContainer", + "IncludeMetadataFields": false + }, + "SinkSettings": { + "ConnectionString": "AccountEndpoint=https://destination-account.documents.azure.com:443/;AccountKey=;", + "Database": "destinationDatabase", + "Container": "destinationContainer", + "PartitionKeyPath": "/id", + "RecreateContainer": false, + "WriteMode": "UpsertStream", + "CreatedContainerMaxThroughput": 10000, + "UseAutoscaleForCreatedContainer": true + } +} +``` + +> **Note**: The tool creates separate CosmosClient instances for the source and sink, allowing you to connect to different Cosmos DB accounts simultaneously. Each connection can have its own configuration including connection mode, proxy settings, and authentication method (connection string or RBAC). + ## JSON to Cosmos-NoSQL ```json diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs new file mode 100644 index 0000000..c0bdcf2 --- /dev/null +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs @@ -0,0 +1,210 @@ +using Cosmos.DataTransfer.Common.UnitTests; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Cosmos.DataTransfer.CosmosExtension.UnitTests; + +/// +/// Tests to verify that the tool can handle multiple Cosmos DB accounts simultaneously. +/// This confirms that separate CosmosClient instances are created for source and sink operations. +/// +[TestClass] +public class CosmosMultiAccountSupportTests +{ + [TestMethod] + public void CreateClient_WithDifferentSettings_CreatesSeparateInstances() + { + // Arrange - Create two different connection configurations + // Using valid Base64-encoded keys (dummy keys for testing) + var sourceSettings = new CosmosSourceSettings + { + ConnectionString = "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sourceDb", + Container = "sourceContainer", + ConnectionMode = ConnectionMode.Gateway + }; + + var sinkSettings = new CosmosSinkSettings + { + ConnectionString = "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sinkDb", + Container = "sinkContainer", + PartitionKeyPath = "/id", + ConnectionMode = ConnectionMode.Direct + }; + + // Act - Create clients using the extension service method + CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); + CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + + // Assert - Verify that two distinct client instances are created + Assert.IsNotNull(sourceClient, "Source client should be created"); + Assert.IsNotNull(sinkClient, "Sink client should be created"); + Assert.AreNotSame(sourceClient, sinkClient, "Source and sink should use separate CosmosClient instances"); + + // Verify client configurations are independent + Assert.AreEqual(ConnectionMode.Gateway, sourceClient.ClientOptions.ConnectionMode); + Assert.AreEqual(ConnectionMode.Direct, sinkClient.ClientOptions.ConnectionMode); + + // Dispose clients + sourceClient.Dispose(); + sinkClient.Dispose(); + } + + [TestMethod] + public void CreateClient_WithRbacAuth_CreatesSeparateInstancesForDifferentAccounts() + { + // Arrange - Create two different RBAC-based connection configurations + var sourceSettings = new CosmosSourceSettings + { + UseRbacAuth = true, + AccountEndpoint = "https://source-account.documents.azure.com:443/", + Database = "sourceDb", + Container = "sourceContainer", + EnableInteractiveCredentials = false + }; + + var sinkSettings = new CosmosSinkSettings + { + UseRbacAuth = true, + AccountEndpoint = "https://sink-account.documents.azure.com:443/", + Database = "sinkDb", + Container = "sinkContainer", + PartitionKeyPath = "/id", + EnableInteractiveCredentials = false + }; + + // Act - Create clients + CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); + CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + + // Assert - Verify separate instances + Assert.IsNotNull(sourceClient); + Assert.IsNotNull(sinkClient); + Assert.AreNotSame(sourceClient, sinkClient, "RBAC-based source and sink should use separate CosmosClient instances"); + + // Dispose clients + sourceClient.Dispose(); + sinkClient.Dispose(); + } + + [TestMethod] + public void CreateClient_WithSameAccount_StillCreatesSeparateInstances() + { + // Arrange - Create two configurations pointing to the same account + // This tests that even when using the same account, separate client instances are created + var connectionString = "AccountEndpoint=https://same-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"; + + var sourceSettings = new CosmosSourceSettings + { + ConnectionString = connectionString, + Database = "db1", + Container = "container1" + }; + + var sinkSettings = new CosmosSinkSettings + { + ConnectionString = connectionString, + Database = "db2", + Container = "container2", + PartitionKeyPath = "/id" + }; + + // Act + CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); + CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + + // Assert - Even with the same account, separate client instances should be created + Assert.IsNotNull(sourceClient); + Assert.IsNotNull(sinkClient); + Assert.AreNotSame(sourceClient, sinkClient, "Source and sink should use separate client instances even for the same account"); + + // Dispose clients + sourceClient.Dispose(); + sinkClient.Dispose(); + } + + [TestMethod] + public void CreateClient_WithDifferentProxySettings_CreatesSeparateInstances() + { + // Arrange - Create configurations with different proxy settings + var sourceSettings = new CosmosSourceSettings + { + ConnectionString = "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sourceDb", + Container = "sourceContainer", + WebProxy = "http://proxy1.example.com:8080", + UseDefaultProxyCredentials = true + }; + + var sinkSettings = new CosmosSinkSettings + { + ConnectionString = "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sinkDb", + Container = "sinkContainer", + PartitionKeyPath = "/id", + WebProxy = "http://proxy2.example.com:8080", + UseDefaultProxyCredentials = false + }; + + // Act + CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); + CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + + // Assert + Assert.IsNotNull(sourceClient); + Assert.IsNotNull(sinkClient); + Assert.AreNotSame(sourceClient, sinkClient, "Clients with different proxy settings should be separate instances"); + + // Verify proxy settings are properly configured + Assert.IsNotNull(sourceClient.ClientOptions.WebProxy); + Assert.IsNotNull(sinkClient.ClientOptions.WebProxy); + + // Dispose clients + sourceClient.Dispose(); + sinkClient.Dispose(); + } + + [TestMethod] + public void ExtensionInitialization_CreatesIndependentClients() + { + // Arrange - This test verifies the actual extension behavior + var sourceConfig = TestHelpers.CreateConfig(new Dictionary() + { + { "ConnectionString", "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, + { "Database", "sourceDb" }, + { "Container", "sourceContainer" }, + }); + + var sinkConfig = TestHelpers.CreateConfig(new Dictionary() + { + { "ConnectionString", "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, + { "Database", "sinkDb" }, + { "Container", "sinkContainer" }, + { "PartitionKeyPath", "/id" } + }); + + // Get settings from configuration + var sourceSettings = sourceConfig.Get(); + var sinkSettings = sinkConfig.Get(); + + // Act - Simulate what the extensions do internally + CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings!, "Cosmos-nosql"); + CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings!, "Cosmos-nosql", "Cosmos-nosql"); + + // Assert + Assert.IsNotNull(sourceClient, "Source extension should create a client"); + Assert.IsNotNull(sinkClient, "Sink extension should create a client"); + Assert.AreNotSame(sourceClient, sinkClient, + "Source and sink extensions should create and use separate CosmosClient instances"); + + // Verify that both clients can be used independently + Assert.IsTrue(sourceClient.ClientOptions.AllowBulkExecution, "Source client should have bulk execution enabled"); + Assert.IsTrue(sinkClient.ClientOptions.AllowBulkExecution, "Sink client should have bulk execution enabled"); + + // Dispose clients + sourceClient.Dispose(); + sinkClient.Dispose(); + } +} diff --git a/Extensions/Cosmos/README.md b/Extensions/Cosmos/README.md index ba3d519..36eba48 100644 --- a/Extensions/Cosmos/README.md +++ b/Extensions/Cosmos/README.md @@ -4,6 +4,23 @@ The Cosmos data transfer extension provides source and sink capabilities for rea > **Note**: When specifying the JSON extension as the Source or Sink property in configuration, utilize the name **Cosmos-nosql**. +## Multi-Account Support + +The tool supports simultaneous connections to two different Cosmos DB accounts, enabling direct account-to-account data migration. Each source and sink connection creates its own independent CosmosClient instance with separate configurations. + +**Key capabilities:** +- Connect to different Cosmos DB accounts simultaneously for source and sink +- Each connection can use different authentication methods (connection string or RBAC) +- Independent configuration for connection mode, proxy settings, and client options +- Both connections can use the same or different Cosmos DB accounts + +**Example use cases:** +- Migrate data between production and development accounts +- Copy data between regions by connecting to different regional accounts +- Transfer data between accounts with different authentication requirements + +See the [Cosmos-to-Cosmos configuration example](../../ExampleConfigs.md#cosmos-nosql-to-cosmos-nosql-different-accounts) for details. + ## Settings Source and sink require settings used to locate and access the Cosmos DB account. This can be done in one of two ways: diff --git a/README.md b/README.md index 6ea3a9a..5a70d0e 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,13 @@ To access the archived version of the tool, navigate to the [**Archive branch**] The Azure Cosmos DB Desktop Data Migration Tool is an open-source project containing a command-line application that provides import and export functionality for Azure Cosmos DB. +**Key Features:** +- Support for multiple data sources and sinks including Cosmos DB, MongoDB, JSON, CSV, SQL Server, and more +- Direct account-to-account migration between different Cosmos DB accounts with simultaneous connections +- Batch processing and bulk execution for optimal performance +- Flexible authentication options including connection strings and RBAC +- Independent client configurations for source and sink connections + ## Quick Installation To use the tool, download the latest archive file for your platform (win-x64, win-arm64, mac-x64, mac-arm64, linux-x64, linux-arm64) from [Releases](https://github.com/AzureCosmosDB/data-migration-desktop-tool/releases) and extract all files to your desired install location. To begin a data transfer operation, first populate the `migrationsettings.json` file with appropriate settings for your data source and sink (see [detailed instructions](#using-the-command-line) below or [review examples](ExampleConfigs.md)), and then run the application from a command line: `dmt.exe` on Windows or `dmt` on other platforms. From 10b6571c8714791fbf2acc90e924213702655f96 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:59:07 +0000 Subject: [PATCH 3/5] Add RBAC-based multi-account example configuration Co-authored-by: markjbrown <800166+markjbrown@users.noreply.github.com> --- ExampleConfigs.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/ExampleConfigs.md b/ExampleConfigs.md index d402fbd..322ff39 100644 --- a/ExampleConfigs.md +++ b/ExampleConfigs.md @@ -4,6 +4,8 @@ The tool supports simultaneous connections to two different Cosmos DB accounts, allowing you to migrate data directly from one account to another. +### Using Connection Strings + ```json { "Source": "Cosmos-nosql", @@ -27,6 +29,32 @@ The tool supports simultaneous connections to two different Cosmos DB accounts, } ``` +### Using RBAC Authentication (Passwordless) + +```json +{ + "Source": "Cosmos-nosql", + "Sink": "Cosmos-nosql", + "SourceSettings": { + "UseRbacAuth": true, + "AccountEndpoint": "https://source-account.documents.azure.com:443/", + "EnableInteractiveCredentials": true, + "Database": "sourceDatabase", + "Container": "sourceContainer", + "IncludeMetadataFields": false + }, + "SinkSettings": { + "UseRbacAuth": true, + "AccountEndpoint": "https://destination-account.documents.azure.com:443/", + "EnableInteractiveCredentials": true, + "Database": "destinationDatabase", + "Container": "destinationContainer", + "PartitionKeyPath": "/id", + "WriteMode": "UpsertStream" + } +} +``` + > **Note**: The tool creates separate CosmosClient instances for the source and sink, allowing you to connect to different Cosmos DB accounts simultaneously. Each connection can have its own configuration including connection mode, proxy settings, and authentication method (connection string or RBAC). ## JSON to Cosmos-NoSQL From b90f99b004943c432ecd5ecec54d8fc81f1766c5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:32:50 +0000 Subject: [PATCH 4/5] Update tests and docs to focus on multiple sink accounts Co-authored-by: markjbrown <800166+markjbrown@users.noreply.github.com> --- ExampleConfigs.md | 36 +++ .../CosmosMultiAccountSupportTests.cs | 211 +++++++++--------- Extensions/Cosmos/README.md | 22 +- README.md | 4 +- 4 files changed, 159 insertions(+), 114 deletions(-) diff --git a/ExampleConfigs.md b/ExampleConfigs.md index 322ff39..cad2c47 100644 --- a/ExampleConfigs.md +++ b/ExampleConfigs.md @@ -1,5 +1,41 @@ # Example `migrationsettings.json` Files +## Multiple Cosmos-NoSQL Sinks (Different Accounts) + +The tool supports writing to multiple different Cosmos DB accounts simultaneously using the `Operations` feature. This allows you to replicate data from one source to multiple destination accounts in a single execution. + +```json +{ + "Source": "JSON", + "Sink": "Cosmos-nosql", + "SourceSettings": { + "FilePath": "C:\\data\\sample-data.json" + }, + "SinkSettings": { + "PartitionKeyPath": "/id", + "WriteMode": "UpsertStream" + }, + "Operations": [ + { + "SinkSettings": { + "ConnectionString": "AccountEndpoint=https://account1.documents.azure.com:443/;AccountKey=;", + "Database": "db1", + "Container": "container1" + } + }, + { + "SinkSettings": { + "ConnectionString": "AccountEndpoint=https://account2.documents.azure.com:443/;AccountKey=;", + "Database": "db2", + "Container": "container2" + } + } + ] +} +``` + +> **Note**: The tool creates separate CosmosClient instances for each operation's sink, allowing you to write to multiple different Cosmos DB accounts simultaneously. Each sink connection can have its own configuration including connection mode, proxy settings, and authentication method (connection string or RBAC). + ## Cosmos-NoSQL to Cosmos-NoSQL (Different Accounts) The tool supports simultaneous connections to two different Cosmos DB accounts, allowing you to migrate data directly from one account to another. diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs index c0bdcf2..8a6cd20 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs @@ -6,104 +6,109 @@ namespace Cosmos.DataTransfer.CosmosExtension.UnitTests; /// -/// Tests to verify that the tool can handle multiple Cosmos DB accounts simultaneously. -/// This confirms that separate CosmosClient instances are created for source and sink operations. +/// Tests to verify that the tool can handle multiple Cosmos DB sink accounts simultaneously. +/// This confirms that separate CosmosClient instances are created for multiple sink operations +/// to different Cosmos DB accounts. /// [TestClass] public class CosmosMultiAccountSupportTests { [TestMethod] - public void CreateClient_WithDifferentSettings_CreatesSeparateInstances() + public void CreateClient_WithTwoDifferentSinkAccounts_CreatesSeparateInstances() { - // Arrange - Create two different connection configurations + // Arrange - Create two different sink configurations for different Cosmos DB accounts + // This simulates writing to two different accounts simultaneously (e.g., in multiple operations) // Using valid Base64-encoded keys (dummy keys for testing) - var sourceSettings = new CosmosSourceSettings + var sink1Settings = new CosmosSinkSettings { - ConnectionString = "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", - Database = "sourceDb", - Container = "sourceContainer", + ConnectionString = "AccountEndpoint=https://sink1-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sink1Db", + Container = "sink1Container", + PartitionKeyPath = "/id", ConnectionMode = ConnectionMode.Gateway }; - var sinkSettings = new CosmosSinkSettings + var sink2Settings = new CosmosSinkSettings { - ConnectionString = "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", - Database = "sinkDb", - Container = "sinkContainer", + ConnectionString = "AccountEndpoint=https://sink2-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sink2Db", + Container = "sink2Container", PartitionKeyPath = "/id", ConnectionMode = ConnectionMode.Direct }; - // Act - Create clients using the extension service method - CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); - CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + // Act - Create clients for two different sink accounts + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); - // Assert - Verify that two distinct client instances are created - Assert.IsNotNull(sourceClient, "Source client should be created"); - Assert.IsNotNull(sinkClient, "Sink client should be created"); - Assert.AreNotSame(sourceClient, sinkClient, "Source and sink should use separate CosmosClient instances"); + // Assert - Verify that two distinct client instances are created for different sink accounts + Assert.IsNotNull(sink1Client, "First sink client should be created"); + Assert.IsNotNull(sink2Client, "Second sink client should be created"); + Assert.AreNotSame(sink1Client, sink2Client, "Two different sink accounts should use separate CosmosClient instances"); // Verify client configurations are independent - Assert.AreEqual(ConnectionMode.Gateway, sourceClient.ClientOptions.ConnectionMode); - Assert.AreEqual(ConnectionMode.Direct, sinkClient.ClientOptions.ConnectionMode); + Assert.AreEqual(ConnectionMode.Gateway, sink1Client.ClientOptions.ConnectionMode); + Assert.AreEqual(ConnectionMode.Direct, sink2Client.ClientOptions.ConnectionMode); // Dispose clients - sourceClient.Dispose(); - sinkClient.Dispose(); + sink1Client.Dispose(); + sink2Client.Dispose(); } [TestMethod] - public void CreateClient_WithRbacAuth_CreatesSeparateInstancesForDifferentAccounts() + public void CreateClient_WithRbacAuth_CreatesSeparateInstancesForTwoDifferentSinkAccounts() { - // Arrange - Create two different RBAC-based connection configurations - var sourceSettings = new CosmosSourceSettings + // Arrange - Create two different RBAC-based sink configurations for different accounts + var sink1Settings = new CosmosSinkSettings { UseRbacAuth = true, - AccountEndpoint = "https://source-account.documents.azure.com:443/", - Database = "sourceDb", - Container = "sourceContainer", + AccountEndpoint = "https://sink1-account.documents.azure.com:443/", + Database = "sink1Db", + Container = "sink1Container", + PartitionKeyPath = "/id", EnableInteractiveCredentials = false }; - var sinkSettings = new CosmosSinkSettings + var sink2Settings = new CosmosSinkSettings { UseRbacAuth = true, - AccountEndpoint = "https://sink-account.documents.azure.com:443/", - Database = "sinkDb", - Container = "sinkContainer", + AccountEndpoint = "https://sink2-account.documents.azure.com:443/", + Database = "sink2Db", + Container = "sink2Container", PartitionKeyPath = "/id", EnableInteractiveCredentials = false }; - // Act - Create clients - CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); - CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + // Act - Create clients for two different sink accounts + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); - // Assert - Verify separate instances - Assert.IsNotNull(sourceClient); - Assert.IsNotNull(sinkClient); - Assert.AreNotSame(sourceClient, sinkClient, "RBAC-based source and sink should use separate CosmosClient instances"); + // Assert - Verify separate instances for different sink accounts + Assert.IsNotNull(sink1Client); + Assert.IsNotNull(sink2Client); + Assert.AreNotSame(sink1Client, sink2Client, "RBAC-based sinks to different accounts should use separate CosmosClient instances"); // Dispose clients - sourceClient.Dispose(); - sinkClient.Dispose(); + sink1Client.Dispose(); + sink2Client.Dispose(); } [TestMethod] - public void CreateClient_WithSameAccount_StillCreatesSeparateInstances() + public void CreateClient_WithTwoSinksToSameAccount_StillCreatesSeparateInstances() { - // Arrange - Create two configurations pointing to the same account - // This tests that even when using the same account, separate client instances are created + // Arrange - Create two sink configurations pointing to the same account + // This tests that even when using the same account for multiple sinks, separate client instances are created var connectionString = "AccountEndpoint=https://same-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"; - var sourceSettings = new CosmosSourceSettings + var sink1Settings = new CosmosSinkSettings { ConnectionString = connectionString, Database = "db1", - Container = "container1" + Container = "container1", + PartitionKeyPath = "/id" }; - var sinkSettings = new CosmosSinkSettings + var sink2Settings = new CosmosSinkSettings { ConnectionString = connectionString, Database = "db2", @@ -111,100 +116,102 @@ public void CreateClient_WithSameAccount_StillCreatesSeparateInstances() PartitionKeyPath = "/id" }; - // Act - CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); - CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + // Act - Create clients for two sinks to the same account + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); - // Assert - Even with the same account, separate client instances should be created - Assert.IsNotNull(sourceClient); - Assert.IsNotNull(sinkClient); - Assert.AreNotSame(sourceClient, sinkClient, "Source and sink should use separate client instances even for the same account"); + // Assert - Even with the same account, separate client instances should be created for multiple sinks + Assert.IsNotNull(sink1Client); + Assert.IsNotNull(sink2Client); + Assert.AreNotSame(sink1Client, sink2Client, "Multiple sinks should use separate client instances even for the same account"); // Dispose clients - sourceClient.Dispose(); - sinkClient.Dispose(); + sink1Client.Dispose(); + sink2Client.Dispose(); } [TestMethod] - public void CreateClient_WithDifferentProxySettings_CreatesSeparateInstances() + public void CreateClient_WithTwoSinksWithDifferentProxySettings_CreatesSeparateInstances() { - // Arrange - Create configurations with different proxy settings - var sourceSettings = new CosmosSourceSettings + // Arrange - Create two sink configurations with different proxy settings + var sink1Settings = new CosmosSinkSettings { - ConnectionString = "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", - Database = "sourceDb", - Container = "sourceContainer", + ConnectionString = "AccountEndpoint=https://sink1-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sink1Db", + Container = "sink1Container", + PartitionKeyPath = "/id", WebProxy = "http://proxy1.example.com:8080", UseDefaultProxyCredentials = true }; - var sinkSettings = new CosmosSinkSettings + var sink2Settings = new CosmosSinkSettings { - ConnectionString = "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", - Database = "sinkDb", - Container = "sinkContainer", + ConnectionString = "AccountEndpoint=https://sink2-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;", + Database = "sink2Db", + Container = "sink2Container", PartitionKeyPath = "/id", WebProxy = "http://proxy2.example.com:8080", UseDefaultProxyCredentials = false }; - // Act - CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings, "Cosmos-nosql"); - CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings, "Cosmos-nosql", "Cosmos-nosql"); + // Act - Create clients for two sinks with different proxy settings + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); - // Assert - Assert.IsNotNull(sourceClient); - Assert.IsNotNull(sinkClient); - Assert.AreNotSame(sourceClient, sinkClient, "Clients with different proxy settings should be separate instances"); + // Assert - Verify separate instances for different proxy configurations + Assert.IsNotNull(sink1Client); + Assert.IsNotNull(sink2Client); + Assert.AreNotSame(sink1Client, sink2Client, "Multiple sinks with different proxy settings should be separate instances"); // Verify proxy settings are properly configured - Assert.IsNotNull(sourceClient.ClientOptions.WebProxy); - Assert.IsNotNull(sinkClient.ClientOptions.WebProxy); + Assert.IsNotNull(sink1Client.ClientOptions.WebProxy); + Assert.IsNotNull(sink2Client.ClientOptions.WebProxy); // Dispose clients - sourceClient.Dispose(); - sinkClient.Dispose(); + sink1Client.Dispose(); + sink2Client.Dispose(); } [TestMethod] - public void ExtensionInitialization_CreatesIndependentClients() + public void SinkExtensionInitialization_CreatesIndependentClientsForMultipleSinks() { - // Arrange - This test verifies the actual extension behavior - var sourceConfig = TestHelpers.CreateConfig(new Dictionary() + // Arrange - This test verifies that multiple sink operations can use independent clients + var sink1Config = TestHelpers.CreateConfig(new Dictionary() { - { "ConnectionString", "AccountEndpoint=https://source-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, - { "Database", "sourceDb" }, - { "Container", "sourceContainer" }, + { "ConnectionString", "AccountEndpoint=https://sink1-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, + { "Database", "sink1Db" }, + { "Container", "sink1Container" }, + { "PartitionKeyPath", "/id" } }); - var sinkConfig = TestHelpers.CreateConfig(new Dictionary() + var sink2Config = TestHelpers.CreateConfig(new Dictionary() { - { "ConnectionString", "AccountEndpoint=https://sink-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, - { "Database", "sinkDb" }, - { "Container", "sinkContainer" }, + { "ConnectionString", "AccountEndpoint=https://sink2-account.documents.azure.com:443/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" }, + { "Database", "sink2Db" }, + { "Container", "sink2Container" }, { "PartitionKeyPath", "/id" } }); // Get settings from configuration - var sourceSettings = sourceConfig.Get(); - var sinkSettings = sinkConfig.Get(); + var sink1Settings = sink1Config.Get(); + var sink2Settings = sink2Config.Get(); - // Act - Simulate what the extensions do internally - CosmosClient sourceClient = CosmosExtensionServices.CreateClient(sourceSettings!, "Cosmos-nosql"); - CosmosClient sinkClient = CosmosExtensionServices.CreateClient(sinkSettings!, "Cosmos-nosql", "Cosmos-nosql"); + // Act - Simulate what multiple sink operations do internally + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings!, "Cosmos-nosql", "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings!, "Cosmos-nosql", "JSON"); - // Assert - Assert.IsNotNull(sourceClient, "Source extension should create a client"); - Assert.IsNotNull(sinkClient, "Sink extension should create a client"); - Assert.AreNotSame(sourceClient, sinkClient, - "Source and sink extensions should create and use separate CosmosClient instances"); + // Assert - Verify that multiple sink operations create independent clients + Assert.IsNotNull(sink1Client, "First sink extension should create a client"); + Assert.IsNotNull(sink2Client, "Second sink extension should create a client"); + Assert.AreNotSame(sink1Client, sink2Client, + "Multiple sink operations should create and use separate CosmosClient instances"); // Verify that both clients can be used independently - Assert.IsTrue(sourceClient.ClientOptions.AllowBulkExecution, "Source client should have bulk execution enabled"); - Assert.IsTrue(sinkClient.ClientOptions.AllowBulkExecution, "Sink client should have bulk execution enabled"); + Assert.IsTrue(sink1Client.ClientOptions.AllowBulkExecution, "First sink client should have bulk execution enabled"); + Assert.IsTrue(sink2Client.ClientOptions.AllowBulkExecution, "Second sink client should have bulk execution enabled"); // Dispose clients - sourceClient.Dispose(); - sinkClient.Dispose(); + sink1Client.Dispose(); + sink2Client.Dispose(); } } diff --git a/Extensions/Cosmos/README.md b/Extensions/Cosmos/README.md index 36eba48..0c8363d 100644 --- a/Extensions/Cosmos/README.md +++ b/Extensions/Cosmos/README.md @@ -4,22 +4,24 @@ The Cosmos data transfer extension provides source and sink capabilities for rea > **Note**: When specifying the JSON extension as the Source or Sink property in configuration, utilize the name **Cosmos-nosql**. -## Multi-Account Support +## Multi-Account Sink Support -The tool supports simultaneous connections to two different Cosmos DB accounts, enabling direct account-to-account data migration. Each source and sink connection creates its own independent CosmosClient instance with separate configurations. +The tool supports writing to multiple different Cosmos DB accounts simultaneously, enabling data replication and distribution patterns. Each sink operation creates its own independent CosmosClient instance with separate configurations. **Key capabilities:** -- Connect to different Cosmos DB accounts simultaneously for source and sink -- Each connection can use different authentication methods (connection string or RBAC) -- Independent configuration for connection mode, proxy settings, and client options -- Both connections can use the same or different Cosmos DB accounts +- Write to multiple different Cosmos DB sink accounts in a single execution using the `Operations` feature +- Each sink connection uses an independent CosmosClient instance +- Each sink can use different authentication methods (connection string or RBAC) +- Independent configuration for connection mode, proxy settings, and client options per sink +- Each sink can target the same or different Cosmos DB accounts **Example use cases:** -- Migrate data between production and development accounts -- Copy data between regions by connecting to different regional accounts -- Transfer data between accounts with different authentication requirements +- Replicate data from one source to multiple destination accounts (disaster recovery, geo-distribution) +- Fan-out pattern: distribute data from a single source to multiple regional accounts +- Data synchronization across development, staging, and production environments +- Multi-tenant scenarios where data needs to be written to different customer accounts -See the [Cosmos-to-Cosmos configuration example](../../ExampleConfigs.md#cosmos-nosql-to-cosmos-nosql-different-accounts) for details. +See the [Multiple Cosmos-NoSQL Sinks configuration example](../../ExampleConfigs.md#multiple-cosmos-nosql-sinks-different-accounts) for details. ## Settings diff --git a/README.md b/README.md index 5a70d0e..6adb9f8 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,10 @@ The Azure Cosmos DB Desktop Data Migration Tool is an open-source project contai **Key Features:** - Support for multiple data sources and sinks including Cosmos DB, MongoDB, JSON, CSV, SQL Server, and more -- Direct account-to-account migration between different Cosmos DB accounts with simultaneous connections +- Write to multiple different Cosmos DB accounts simultaneously using the Operations feature - Batch processing and bulk execution for optimal performance - Flexible authentication options including connection strings and RBAC -- Independent client configurations for source and sink connections +- Independent client configurations for each sink operation ## Quick Installation From 4a8140795f5e26080905c2a75d46583c8a683fcd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 18 Jan 2026 06:20:33 +0000 Subject: [PATCH 5/5] Resolve merge conflict with main branch Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../RunCommandTests.cs | 285 ++++++++++++++++ Core/Cosmos.DataTransfer.Core/RunCommand.cs | 91 +++++ ExampleConfigs.md | 22 ++ .../AzureTableAPIDataSourceExtensionTests.cs | 112 +++++++ Extensions/AzureTableAPI/README.md | 121 +++++++ .../CertificateConfigurationTests.cs | 102 ++++++ .../CosmosDataSinkExtensionTests.cs | 115 +++++++ .../CosmosDictionaryDataItemTests.cs | 32 ++ .../CosmosMultiAccountSupportTests.cs | 20 +- .../CosmosSettingsBaseTests.cs | 25 ++ .../Data/TypeHintProperty.json | 9 + .../CosmosDataSinkExtension.cs | 28 +- .../CosmosDataSourceExtension.cs | 14 +- .../CosmosDictionaryDataItem.cs | 41 ++- .../CosmosExtensionServices.cs | 22 +- .../CosmosSettingsBase.cs | 20 ++ .../RawJsonCosmosSerializer.cs | 3 +- Extensions/Cosmos/README.md | 86 ++++- .../JsonFileSinkTests.cs | 64 ++++ ...ansfer.SqlServerExtension.UnitTests.csproj | 1 + .../SqlServerSinkSettingsTests.cs | 317 ++++++++++++++++++ .../SqlServerDataSinkExtension.cs | 235 ++++++++++++- .../SqlServerSinkSettings.cs | 59 +++- .../SqlWriteMode.cs | 21 ++ Extensions/SqlServer/README.md | 124 ++++++- .../DataItemJsonConverterTests.cs | 76 +++++ .../DataItemJsonConverter.cs | 12 + 27 files changed, 2011 insertions(+), 46 deletions(-) create mode 100644 Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests/AzureTableAPIDataSourceExtensionTests.cs create mode 100644 Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CertificateConfigurationTests.cs create mode 100644 Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosSettingsBaseTests.cs create mode 100644 Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/Data/TypeHintProperty.json create mode 100644 Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs create mode 100644 Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs diff --git a/Core/Cosmos.DataTransfer.Core.UnitTests/RunCommandTests.cs b/Core/Cosmos.DataTransfer.Core.UnitTests/RunCommandTests.cs index fb63241..ea78c90 100644 --- a/Core/Cosmos.DataTransfer.Core.UnitTests/RunCommandTests.cs +++ b/Core/Cosmos.DataTransfer.Core.UnitTests/RunCommandTests.cs @@ -336,5 +336,290 @@ public void Invoke_WithInvalidSinkExtension_ThrowsException() // Should throw exception when sink extension is not found Assert.ThrowsException(() => handler.Invoke(new InvocationContext(parseResult))); } + + [TestMethod] + public void Invoke_WithSameCosmosSourceAndSinkWithRecreateContainer_ThrowsException() + { + const string cosmosExtension = "Cosmos-nosql"; + const string connectionString = "AccountEndpoint=https://test.documents.azure.com:443/;AccountKey=test"; + const string database = "testDb"; + const string container = "testContainer"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", cosmosExtension }, + { "Sink", cosmosExtension }, + { "SourceSettings:ConnectionString", connectionString }, + { "SourceSettings:Database", database }, + { "SourceSettings:Container", container }, + { "SinkSettings:ConnectionString", connectionString }, + { "SinkSettings:Database", database }, + { "SinkSettings:Container", container }, + { "SinkSettings:RecreateContainer", "true" }, + }) + .Build(); + + var loader = new Mock(); + var sourceExtension = new Mock(); + sourceExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sourceExtension.Object }); + + var sinkExtension = new Mock(); + sinkExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sinkExtension.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + + // Should throw exception when same container is used for source and sink with RecreateContainer + var exception = Assert.ThrowsException(() => handler.Invoke(new InvocationContext(parseResult))); + Assert.IsTrue(exception.Message.Contains("same Cosmos DB container")); + Assert.IsTrue(exception.Message.Contains("RecreateContainer")); + } + + [TestMethod] + public void Invoke_WithSameCosmosSourceAndSinkWithoutRecreateContainer_Succeeds() + { + const string cosmosExtension = "Cosmos-nosql"; + const string connectionString = "AccountEndpoint=https://test.documents.azure.com:443/;AccountKey=test"; + const string database = "testDb"; + const string container = "testContainer"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", cosmosExtension }, + { "Sink", cosmosExtension }, + { "SourceSettings:ConnectionString", connectionString }, + { "SourceSettings:Database", database }, + { "SourceSettings:Container", container }, + { "SinkSettings:ConnectionString", connectionString }, + { "SinkSettings:Database", database }, + { "SinkSettings:Container", container }, + { "SinkSettings:RecreateContainer", "false" }, + }) + .Build(); + + var loader = new Mock(); + var sourceExtension = new Mock(); + sourceExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sourceExtension.Object }); + + var sinkExtension = new Mock(); + sinkExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sinkExtension.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + var result = handler.Invoke(new InvocationContext(parseResult)); + + // Should succeed when RecreateContainer is false even with same container + Assert.AreEqual(0, result); + sourceExtension.Verify(se => se.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public void Invoke_WithSameCosmosSourceAndSinkDifferentDatabase_Succeeds() + { + const string cosmosExtension = "Cosmos-nosql"; + const string connectionString = "AccountEndpoint=https://test.documents.azure.com:443/;AccountKey=test"; + const string sourceDatabase = "sourceDb"; + const string sinkDatabase = "sinkDb"; + const string container = "testContainer"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", cosmosExtension }, + { "Sink", cosmosExtension }, + { "SourceSettings:ConnectionString", connectionString }, + { "SourceSettings:Database", sourceDatabase }, + { "SourceSettings:Container", container }, + { "SinkSettings:ConnectionString", connectionString }, + { "SinkSettings:Database", sinkDatabase }, + { "SinkSettings:Container", container }, + { "SinkSettings:RecreateContainer", "true" }, + }) + .Build(); + + var loader = new Mock(); + var sourceExtension = new Mock(); + sourceExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sourceExtension.Object }); + + var sinkExtension = new Mock(); + sinkExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sinkExtension.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + var result = handler.Invoke(new InvocationContext(parseResult)); + + // Should succeed when database is different + Assert.AreEqual(0, result); + sourceExtension.Verify(se => se.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public void Invoke_WithSameCosmosSourceAndSinkDifferentContainer_Succeeds() + { + const string cosmosExtension = "Cosmos-nosql"; + const string connectionString = "AccountEndpoint=https://test.documents.azure.com:443/;AccountKey=test"; + const string database = "testDb"; + const string sourceContainer = "sourceContainer"; + const string sinkContainer = "sinkContainer"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", cosmosExtension }, + { "Sink", cosmosExtension }, + { "SourceSettings:ConnectionString", connectionString }, + { "SourceSettings:Database", database }, + { "SourceSettings:Container", sourceContainer }, + { "SinkSettings:ConnectionString", connectionString }, + { "SinkSettings:Database", database }, + { "SinkSettings:Container", sinkContainer }, + { "SinkSettings:RecreateContainer", "true" }, + }) + .Build(); + + var loader = new Mock(); + var sourceExtension = new Mock(); + sourceExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sourceExtension.Object }); + + var sinkExtension = new Mock(); + sinkExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sinkExtension.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + var result = handler.Invoke(new InvocationContext(parseResult)); + + // Should succeed when container is different + Assert.AreEqual(0, result); + sourceExtension.Verify(se => se.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public void Invoke_WithDifferentExtensionTypesAndRecreateContainer_Succeeds() + { + const string sourceExtension = "Json"; + const string sinkExtension = "Cosmos-nosql"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", sourceExtension }, + { "Sink", sinkExtension }, + { "SourceSettings:FilePath", "test.json" }, + { "SinkSettings:ConnectionString", "AccountEndpoint=https://test.documents.azure.com:443/;AccountKey=test" }, + { "SinkSettings:Database", "testDb" }, + { "SinkSettings:Container", "testContainer" }, + { "SinkSettings:RecreateContainer", "true" }, + }) + .Build(); + + var loader = new Mock(); + var source = new Mock(); + source.SetupGet(ds => ds.DisplayName).Returns(sourceExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { source.Object }); + + var sink = new Mock(); + sink.SetupGet(ds => ds.DisplayName).Returns(sinkExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sink.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + var result = handler.Invoke(new InvocationContext(parseResult)); + + // Should succeed when source and sink are different extension types + Assert.AreEqual(0, result); + source.Verify(se => se.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public void Invoke_WithSameCosmosSourceAndSinkUsingAccountEndpoint_ThrowsException() + { + const string cosmosExtension = "Cosmos-nosql"; + const string accountEndpoint = "https://test.documents.azure.com:443/"; + const string database = "testDb"; + const string container = "testContainer"; + + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + { "Source", cosmosExtension }, + { "Sink", cosmosExtension }, + { "SourceSettings:AccountEndpoint", accountEndpoint }, + { "SourceSettings:Database", database }, + { "SourceSettings:Container", container }, + { "SinkSettings:AccountEndpoint", accountEndpoint }, + { "SinkSettings:Database", database }, + { "SinkSettings:Container", container }, + { "SinkSettings:RecreateContainer", "true" }, + }) + .Build(); + + var loader = new Mock(); + var sourceExtension = new Mock(); + sourceExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sourceExtension.Object }); + + var sinkExtension = new Mock(); + sinkExtension.SetupGet(ds => ds.DisplayName).Returns(cosmosExtension); + loader + .Setup(l => l.LoadExtensions(It.IsAny())) + .Returns(new List { sinkExtension.Object }); + + var handler = new RunCommand.CommandHandler(loader.Object, + configuration, + NullLoggerFactory.Instance); + + var parseResult = new RootCommand().Parse(Array.Empty()); + + // Should throw exception when same container is used with AccountEndpoint + var exception = Assert.ThrowsException(() => handler.Invoke(new InvocationContext(parseResult))); + Assert.IsTrue(exception.Message.Contains("same Cosmos DB container")); + } } } \ No newline at end of file diff --git a/Core/Cosmos.DataTransfer.Core/RunCommand.cs b/Core/Cosmos.DataTransfer.Core/RunCommand.cs index a2ffb08..6321765 100644 --- a/Core/Cosmos.DataTransfer.Core/RunCommand.cs +++ b/Core/Cosmos.DataTransfer.Core/RunCommand.cs @@ -205,6 +205,9 @@ private async Task ExecuteDataTransferOperation(IDataSourceExtension sourc cancellationToken.ThrowIfCancellationRequested(); + // Validate that source and sink don't point to the same container when RecreateContainer is enabled + ValidateSourceAndSinkNotSameWithRecreate(source, sourceConfig, sink, sinkConfig); + try { var data = source.ReadAsync(sourceConfig, _loggerFactory.CreateLogger(source.GetType().Name), cancellationToken); @@ -220,6 +223,94 @@ private async Task ExecuteDataTransferOperation(IDataSourceExtension sourc } } + private void ValidateSourceAndSinkNotSameWithRecreate( + IDataSourceExtension source, + IConfiguration sourceConfig, + IDataSinkExtension sink, + IConfiguration sinkConfig) + { + // Only validate if both source and sink are Cosmos-nosql extensions + const string cosmosExtensionName = "Cosmos-nosql"; + if (!source.DisplayName.Equals(cosmosExtensionName, StringComparison.OrdinalIgnoreCase) || + !sink.DisplayName.Equals(cosmosExtensionName, StringComparison.OrdinalIgnoreCase)) + { + return; + } + + // Check if RecreateContainer is enabled + var recreateContainer = sinkConfig.GetValue("RecreateContainer"); + if (!recreateContainer) + { + return; + } + + // Compare connection details + var sourceConnectionString = sourceConfig.GetValue("ConnectionString"); + var sinkConnectionString = sinkConfig.GetValue("ConnectionString"); + var sourceAccountEndpoint = sourceConfig.GetValue("AccountEndpoint"); + var sinkAccountEndpoint = sinkConfig.GetValue("AccountEndpoint"); + var sourceDatabase = sourceConfig.GetValue("Database"); + var sinkDatabase = sinkConfig.GetValue("Database"); + var sourceContainer = sourceConfig.GetValue("Container"); + var sinkContainer = sinkConfig.GetValue("Container"); + + // Normalize account endpoints for comparison + string? sourceAccount = GetAccountFromConnectionOrEndpoint(sourceConnectionString, sourceAccountEndpoint); + string? sinkAccount = GetAccountFromConnectionOrEndpoint(sinkConnectionString, sinkAccountEndpoint); + + // Check if they point to the same container + bool sameAccount = !string.IsNullOrEmpty(sourceAccount) && + !string.IsNullOrEmpty(sinkAccount) && + sourceAccount.Equals(sinkAccount, StringComparison.OrdinalIgnoreCase); + bool sameDatabase = !string.IsNullOrEmpty(sourceDatabase) && + !string.IsNullOrEmpty(sinkDatabase) && + sourceDatabase.Equals(sinkDatabase, StringComparison.OrdinalIgnoreCase); + bool sameContainer = !string.IsNullOrEmpty(sourceContainer) && + !string.IsNullOrEmpty(sinkContainer) && + sourceContainer.Equals(sinkContainer, StringComparison.OrdinalIgnoreCase); + + if (sameAccount && sameDatabase && sameContainer) + { + var errorMessage = $"Invalid configuration: Source and Sink are configured to use the same Cosmos DB container " + + $"(Database: '{sourceDatabase}', Container: '{sourceContainer}') with RecreateContainer enabled. " + + $"This would delete the source container before the data transfer begins, resulting in data loss. " + + $"Please use different containers for Source and Sink, or disable RecreateContainer in the Sink settings."; + _logger.LogError(errorMessage); + throw new InvalidOperationException(errorMessage); + } + } + + private static string? GetAccountFromConnectionOrEndpoint(string? connectionString, string? accountEndpoint) + { + if (!string.IsNullOrEmpty(accountEndpoint)) + { + // Normalize the endpoint URL + return NormalizeAccountEndpoint(accountEndpoint); + } + + if (!string.IsNullOrEmpty(connectionString)) + { + // Extract AccountEndpoint from connection string + var parts = connectionString.Split(';', StringSplitOptions.RemoveEmptyEntries); + foreach (var part in parts) + { + if (part.Trim().StartsWith("AccountEndpoint=", StringComparison.OrdinalIgnoreCase)) + { + var endpoint = part.Substring(part.IndexOf('=') + 1).Trim(); + return NormalizeAccountEndpoint(endpoint); + } + } + } + + return null; + } + + private static string NormalizeAccountEndpoint(string endpoint) + { + // Remove trailing slashes and convert to lowercase for comparison + return endpoint.TrimEnd('/').ToLowerInvariant(); + } + private static async Task GetExtensionSelection(string? selectionName, List extensions, string inputPrompt, CancellationToken cancellationToken) where T : class, IDataTransferExtension { diff --git a/ExampleConfigs.md b/ExampleConfigs.md index cad2c47..f19ebe6 100644 --- a/ExampleConfigs.md +++ b/ExampleConfigs.md @@ -225,6 +225,28 @@ The tool supports simultaneous connections to two different Cosmos DB accounts, } ``` +## AzureTableAPI to JSON (with DateTime Filter) + +```json +{ + "Source": "AzureTableAPI", + "Sink": "JSON", + "SourceSettings": { + "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net", + "Table": "SourceTable1", + "PartitionKeyFieldName": "PartitionKey", + "RowKeyFieldName": "RowKey", + "QueryFilter": "Timestamp ge datetime\u00272023-05-15T03:30:32.663Z\u0027" + }, + "SinkSettings": { + "FilePath": "D:\\output\\filtered-data.json", + "Indented": true + } +} +``` + +> **Note**: When using DateTime filters in the `QueryFilter` property, single quotes around the datetime value must be JSON-escaped as `\u0027`. The datetime must be in ISO 8601 format with the `datetime` prefix. + ## Cosmos-NoSQL to SqlServer ```json diff --git a/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests/AzureTableAPIDataSourceExtensionTests.cs b/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests/AzureTableAPIDataSourceExtensionTests.cs new file mode 100644 index 0000000..8f878ff --- /dev/null +++ b/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests/AzureTableAPIDataSourceExtensionTests.cs @@ -0,0 +1,112 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Cosmos.DataTransfer.AzureTableAPIExtension.Settings; +using Microsoft.Extensions.Configuration; + +namespace Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests +{ + [TestClass] + public class AzureTableAPIDataSourceExtensionTests + { + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_CanBeNull() + { + var settings = new AzureTableAPIDataSourceSettings(); + + Assert.IsNull(settings.QueryFilter, "QueryFilter should be null by default"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_CanBeSet() + { + var settings = new AzureTableAPIDataSourceSettings() + { + QueryFilter = "PartitionKey eq 'test'" + }; + + Assert.AreEqual("PartitionKey eq 'test'", settings.QueryFilter, "QueryFilter should be settable"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationBasic() + { + // Test basic filter deserialization + var json = """{"QueryFilter": "PartitionKey eq 'WI'"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("PartitionKey eq 'WI'", settings?.QueryFilter, "QueryFilter should be deserialized from JSON"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationWithDatetime() + { + // Test datetime filter with JSON-escaped single quotes + var json = """{"QueryFilter": "Timestamp eq datetime\u00272023-01-12T16:53:31.1714422Z\u0027"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("Timestamp eq datetime'2023-01-12T16:53:31.1714422Z'", settings?.QueryFilter, + "QueryFilter with JSON-escaped datetime should be correctly deserialized"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationWithDatetimeGreaterThan() + { + // Test datetime filter with 'ge' (greater than or equal) operator + var json = """{"QueryFilter": "Timestamp ge datetime\u00272023-05-15T03:30:32.663Z\u0027"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("Timestamp ge datetime'2023-05-15T03:30:32.663Z'", settings?.QueryFilter, + "QueryFilter with 'ge' datetime operator should be correctly deserialized"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationWithDatetimeLessThan() + { + // Test datetime filter with 'lt' (less than) operator + var json = """{"QueryFilter": "Timestamp lt datetime\u00272024-12-08T06:06:00.976Z\u0027"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("Timestamp lt datetime'2024-12-08T06:06:00.976Z'", settings?.QueryFilter, + "QueryFilter with 'lt' datetime operator should be correctly deserialized"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationWithDatetimeRange() + { + // Test datetime filter with range (combining 'ge' and 'lt') + var json = """{"QueryFilter": "Timestamp ge datetime\u00272023-01-01T00:00:00Z\u0027 and Timestamp lt datetime\u00272024-01-01T00:00:00Z\u0027"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("Timestamp ge datetime'2023-01-01T00:00:00Z' and Timestamp lt datetime'2024-01-01T00:00:00Z'", settings?.QueryFilter, + "QueryFilter with datetime range should be correctly deserialized"); + } + + [TestMethod] + public void AzureTableAPIDataSourceSettings_QueryFilter_JsonDeserializationCombinedFilters() + { + // Test combining partition key filter with datetime filter + var json = """{"QueryFilter": "PartitionKey eq \u0027users\u0027 and Timestamp ge datetime\u00272023-05-15T00:00:00Z\u0027"}"""; + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.AreEqual("PartitionKey eq 'users' and Timestamp ge datetime'2023-05-15T00:00:00Z'", settings?.QueryFilter, + "QueryFilter combining partition key and datetime should be correctly deserialized"); + } + } +} diff --git a/Extensions/AzureTableAPI/README.md b/Extensions/AzureTableAPI/README.md index 58b423e..9e292d2 100644 --- a/Extensions/AzureTableAPI/README.md +++ b/Extensions/AzureTableAPI/README.md @@ -28,6 +28,89 @@ The following setting is supported for the Source: - `QueryFilter` - This enables you to specify an OData filter to be applied to the data being retrieved by the AzureTableAPI Source. This is used in cases where only a subset of data from the source Table is needed in the migration. Example usage to query a subset of entities from the source table: `PartitionKey eq 'foo'`. +#### Query Filter Examples + +The `QueryFilter` setting supports OData filter syntax for querying Azure Table API entities. Below are examples of common filter patterns: + +**Basic Filters:** +```json +"QueryFilter": "PartitionKey eq 'WI'" +``` + +**DateTime Filters:** + +When filtering by `Timestamp` or other datetime properties, you must use the `datetime` prefix with ISO 8601 format timestamps. In JSON configuration files, single quotes around the datetime value must be JSON-escaped as `\u0027`: + +```json +"QueryFilter": "Timestamp eq datetime\u00272023-01-12T16:53:31.1714422Z\u0027" +``` + +```json +"QueryFilter": "Timestamp ge datetime\u00272023-05-15T03:30:32.663Z\u0027" +``` + +```json +"QueryFilter": "Timestamp lt datetime\u00272024-12-08T06:06:00.976Z\u0027" +``` + +**DateTime Range Filters:** + +To filter entities within a date range, combine multiple conditions with `and`: + +```json +"QueryFilter": "Timestamp ge datetime\u00272023-01-01T00:00:00Z\u0027 and Timestamp lt datetime\u00272024-01-01T00:00:00Z\u0027" +``` + +**Combined Filters:** + +You can combine partition key filters with datetime filters for more efficient queries: + +```json +"QueryFilter": "PartitionKey eq \u0027users\u0027 and Timestamp ge datetime\u00272023-05-15T00:00:00Z\u0027" +``` + +> **Important Notes:** +> - DateTime values must be in ISO 8601 format: `YYYY-MM-DDTHH:mm:ss.fffZ` +> - The `datetime` prefix is required before the timestamp value +> - Single quotes around datetime values must be JSON-escaped as `\u0027` in JSON configuration files +> - The `Z` suffix indicates UTC time +> - For better query performance, include `PartitionKey` in your filter when possible +> - Supported datetime operators: `eq` (equal), `ne` (not equal), `gt` (greater than), `ge` (greater than or equal), `lt` (less than), `le` (less than or equal) + +#### Troubleshooting Common DateTime Filter Issues + +The following table analyzes common mistakes when specifying datetime filters. Each row shows a query that was attempted and identifies the specific issues: + +| Query Filter | Missing `datetime` Prefix | Wrong Date Format | Incorrect Encoding | Result | +|--------------|--------------------------|-------------------|-------------------|---------| +| `"QueryFilter": "Timestamp ge datetime\u00272023-05-17T03:06:07.691Z\u0027"` | ✅ Correct | ✅ Correct (ISO 8601) | ✅ Correct (`\u0027`) | ✅ Should work* | +| `"QueryFilter": "Timestamp le datetime\u00272023-05-17T03:06:07.691Z\u0027"` | ✅ Correct | ✅ Correct (ISO 8601) | ✅ Correct (`\u0027`) | ✅ Should work* | +| `"QueryFilter": "Timestamp eq datetime\u00272023-05-17T03:06:07.691Z\u0027"` | ✅ Correct | ✅ Correct (ISO 8601) | ✅ Correct (`\u0027`) | ✅ Should work* | +| `"QueryFilter": "Timestamp gt datetime\u00272023-05-17T03:06:07.691Z\u0027"` | ✅ Correct | ✅ Correct (ISO 8601) | ✅ Correct (`\u0027`) | ✅ Should work* | +| `"QueryFilter": "Timestamp ge datetime '2023-05-17T03:06:07.691Z'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Space before quote, not JSON-escaped | ❌ Invalid syntax | +| `"QueryFilter": "Timestamp le datetime '2023-05-17T03:06:07.691Z'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Space before quote, not JSON-escaped | ❌ Invalid syntax | +| `"QueryFilter": "Timestamp gt datetime '2023-05-17T03:06:07.691Z'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Space before quote, not JSON-escaped | ❌ Invalid syntax | +| `"QueryFilter": "Timestamp eq datetime '2023-05-17T03:06:07.691Z'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Space before quote, not JSON-escaped | ❌ Invalid syntax | +| `"QueryFilter": "Timestamp ge datetime'\u00272023-05-17T03:06:07.691Z\u0027'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Extra quote at end | ❌ Invalid syntax | +| `"QueryFilter": "Timestamp eq \u00272023-05-17T03:06:07.691Z\u0027"` | ❌ Missing | ✅ Correct (ISO 8601) | ✅ Correct (`\u0027`) | ❌ No data (invalid) | +| `"QueryFilter": "Timestamp ge datetime '2023-05-17T03:10:39.058Z\u002B00:00'"` | ✅ Correct | ❌ Invalid timezone format | ❌ Space before quote, mixed encoding | ❌ Transfer fails | +| `"QueryFilter": "Timestamp ge datetime '2023-05-17T03:10:39.058Z\u002B00'"` | ✅ Correct | ❌ Invalid timezone format | ❌ Space before quote, mixed encoding | ❌ Transfer fails | +| `"QueryFilter": "Timestamp ge datetime 2023-05-17T03:10:39.058Z\u002B00"` | ✅ Correct | ❌ Invalid timezone format | ❌ No quotes around datetime | ❌ Transfer fails | +| `"QueryFilter": "Timestamp ge datetime'u00272023-05-17T03:06:07.691Zu0027'"` | ✅ Correct | ✅ Correct (ISO 8601) | ❌ Wrong escape sequence (missing `\`) | ❌ Transfer fails | +| `"QueryFilter": "Timestamp eq '\u00272023-05-17T03:06:07.691Z\u0027'"` | ❌ Missing | ✅ Correct (ISO 8601) | ❌ Extra quote at end | ❌ Transfer fails | + +\* **Note**: The first four queries are syntactically correct. If they returned no data, it may be because: +- No entities exist with timestamps matching the filter criteria +- The specific timestamp value doesn't match any entity timestamps (especially with `eq` operator) +- For exact matches with `eq`, consider using `ge` (greater than or equal) or `le` (less than or equal) operators instead, as table timestamps include high-precision fractional seconds + +**Key Takeaways:** +1. Always use `datetime` prefix before the timestamp value +2. Always use ISO 8601 format: `YYYY-MM-DDTHH:mm:ss.fffZ` +3. Always JSON-escape single quotes as `\u0027` (not literal `'` characters) +4. No spaces between `datetime` and the opening quote +5. Timezone should be `Z` for UTC, not `+00:00` or other formats + ### Additional Sink Settings The AzureTableAPI Sink extension has additional settings that can be configured for writing Table entities. @@ -106,3 +189,41 @@ The following are a couple example `settings.json` files for configuring the Azu "MaxConcurrentEntityWrites": 5 } ``` + +### Example DateTime Filter Configurations + +The following examples demonstrate how to use datetime filters in the `QueryFilter` setting: + +**Example 1: Filter entities modified after a specific date** + +```json +{ + "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net", + "Table": "SourceTable1", + "PartitionKeyFieldName": "PartitionKey", + "RowKeyFieldName": "RowKey", + "QueryFilter": "Timestamp ge datetime\u00272023-05-15T03:30:32.663Z\u0027" +} +``` + +**Example 2: Filter entities within a date range** + +```json +{ + "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net", + "Table": "SourceTable1", + "QueryFilter": "Timestamp ge datetime\u00272023-01-01T00:00:00Z\u0027 and Timestamp lt datetime\u00272024-01-01T00:00:00Z\u0027" +} +``` + +**Example 3: Combine partition key with datetime filter** + +```json +{ + "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net", + "Table": "SourceTable1", + "PartitionKeyFieldName": "State", + "RowKeyFieldName": "id", + "QueryFilter": "PartitionKey eq \u0027CA\u0027 and Timestamp ge datetime\u00272023-06-01T00:00:00Z\u0027" +} +``` diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CertificateConfigurationTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CertificateConfigurationTests.cs new file mode 100644 index 0000000..f88f080 --- /dev/null +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CertificateConfigurationTests.cs @@ -0,0 +1,102 @@ +using System.ComponentModel.DataAnnotations; +using Microsoft.Extensions.Logging; +using Moq; + +namespace Cosmos.DataTransfer.CosmosExtension.UnitTests +{ + [TestClass] + public class CertificateConfigurationTests + { + [TestMethod] + public void CosmosSettingsBase_WithDisableSslValidation_ShouldValidateSuccessfully() + { + var settings = new TestableCosmosSettings + { + ConnectionString = "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDj...", + Database = "testDb", + Container = "testContainer", + DisableSslValidation = true + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.AreEqual(0, validationResults.Count, "Should have no validation errors"); + } + + [TestMethod] + public void CosmosSettingsBase_WithDisableSslValidationFalse_ShouldUseDefaultValidation() + { + var settings = new TestableCosmosSettings + { + ConnectionString = "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDj...", + Database = "testDb", + Container = "testContainer", + DisableSslValidation = false + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.AreEqual(0, validationResults.Count, "Settings should validate with DisableSslValidation=false"); + } + + [TestMethod] + public void CreateClient_WithDisableSslValidation_LogsWarningAndSetsCallback() + { + var loggerMock = new Mock(); + var settings = new TestableCosmosSettings + { + ConnectionString = "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", + Database = "testDb", + Container = "testContainer", + DisableSslValidation = true + }; + + var client = CosmosExtensionServices.CreateClient(settings, "TestDisplay", loggerMock.Object); + + // Verify warning was logged when DisableSslValidation is true + loggerMock.Verify( + x => x.Log( + LogLevel.Warning, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("SSL certificate validation is DISABLED")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + // Verify client was created successfully + Assert.IsNotNull(client, "CosmosClient should be created"); + } + + [TestMethod] + public void CreateClient_WithoutDisableSslValidation_DoesNotLogWarning() + { + var loggerMock = new Mock(); + var settings = new TestableCosmosSettings + { + ConnectionString = "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", + Database = "testDb", + Container = "testContainer", + DisableSslValidation = false + }; + + var client = CosmosExtensionServices.CreateClient(settings, "TestDisplay", loggerMock.Object); + + // Verify warning was NOT logged when DisableSslValidation is false + loggerMock.Verify( + x => x.Log( + LogLevel.Warning, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("SSL certificate validation is DISABLED")), + It.IsAny(), + It.IsAny>()), + Times.Never); + + // Verify client was created successfully + Assert.IsNotNull(client, "CosmosClient should be created"); + } + + private class TestableCosmosSettings : CosmosSettingsBase + { + } + } +} \ No newline at end of file diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs index fdb5bf6..690a06b 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs @@ -366,5 +366,120 @@ public static bool HasProperty(object obj, string name) var values = (IDictionary)obj; return values.ContainsKey(name); } + + private static string? InvokeGetPropertyValue(ExpandoObject item, string propertyName) + { + var method = typeof(CosmosDataSinkExtension).GetMethod("GetPropertyValue", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static); + return method?.Invoke(null, new object[] { item, propertyName }) as string; + } + + [TestMethod] + public void GetPropertyValue_WithSimpleProperty_ReturnsValue() + { + // Arrange + var expando = new ExpandoObject(); + var dict = (IDictionary)expando; + dict["id"] = "test-id-123"; + dict["name"] = "test-name"; + + // Act + var result = InvokeGetPropertyValue(expando, "id"); + + // Assert + Assert.IsNotNull(result); + Assert.AreEqual("test-id-123", result); + } + + [TestMethod] + public void GetPropertyValue_WithNestedProperty_ReturnsValue() + { + // Arrange - Create nested structure matching the issue example + var expando = new ExpandoObject(); + var dict = (IDictionary)expando; + dict["id"] = "test-id"; + + var nestedExpando = new ExpandoObject(); + var nestedDict = (IDictionary)nestedExpando; + nestedDict["partitionkeyvalue2"] = "guid-value-123"; + nestedDict["somevalue4"] = "other-guid"; + nestedDict["UserName"] = "testuser"; + + dict["partitionkeyvalue1"] = nestedExpando; + + // Act + var result = InvokeGetPropertyValue(expando, "partitionkeyvalue1/partitionkeyvalue2"); + + // Assert + Assert.IsNotNull(result); + Assert.AreEqual("guid-value-123", result); + } + + [TestMethod] + public void GetPropertyValue_WithDeeplyNestedProperty_ReturnsValue() + { + // Arrange - Create deeply nested structure + var expando = new ExpandoObject(); + var dict = (IDictionary)expando; + dict["id"] = "test-id"; + + var level1 = new ExpandoObject(); + var level1Dict = (IDictionary)level1; + + var level2 = new ExpandoObject(); + var level2Dict = (IDictionary)level2; + + var level3 = new ExpandoObject(); + var level3Dict = (IDictionary)level3; + level3Dict["finalValue"] = "deeply-nested-value"; + + level2Dict["level3"] = level3; + level1Dict["level2"] = level2; + dict["level1"] = level1; + + // Act + var result = InvokeGetPropertyValue(expando, "level1/level2/level3/finalValue"); + + // Assert + Assert.IsNotNull(result); + Assert.AreEqual("deeply-nested-value", result); + } + + [TestMethod] + public void GetPropertyValue_WithMissingNestedProperty_ReturnsNull() + { + // Arrange + var expando = new ExpandoObject(); + var dict = (IDictionary)expando; + dict["id"] = "test-id"; + + var nestedExpando = new ExpandoObject(); + var nestedDict = (IDictionary)nestedExpando; + nestedDict["existingKey"] = "value"; + + dict["parent"] = nestedExpando; + + // Act + var result = InvokeGetPropertyValue(expando, "parent/nonExistentKey"); + + // Assert + Assert.IsNull(result); + } + + [TestMethod] + public void GetPropertyValue_WithNullIntermediateValue_ReturnsNull() + { + // Arrange + var expando = new ExpandoObject(); + var dict = (IDictionary)expando; + dict["id"] = "test-id"; + dict["parent"] = null; + + // Act + var result = InvokeGetPropertyValue(expando, "parent/child"); + + // Assert + Assert.IsNull(result); + } } } diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDictionaryDataItemTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDictionaryDataItemTests.cs index 3defac4..176757b 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDictionaryDataItemTests.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDictionaryDataItemTests.cs @@ -1,4 +1,5 @@ using Cosmos.DataTransfer.Interfaces; +using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace Cosmos.DataTransfer.CosmosExtension.UnitTests @@ -87,5 +88,36 @@ public async Task GetValue_WithMixedValueTypes_ReturnsValidTypedValues() Assert.AreEqual("four", mixedArray.ElementAt(3)); Assert.IsInstanceOfType(mixedArray.ElementAt(4), typeof(IDataItem)); } + + [TestMethod] + public async Task GetFieldNames_WithTypeHintProperty_IncludesTypeField() + { + const string fileIn = "Data/TypeHintProperty.json"; + + var json = JObject.Parse(await File.ReadAllTextAsync(fileIn)); + + // Use shared utility to convert JObject to Dictionary while preserving $type properties + var dict = CosmosDictionaryDataItem.JObjectToDictionary(json); + var item = new CosmosDictionaryDataItem(dict); + + var fields = item.GetFieldNames().ToList(); + + Assert.AreEqual(3, fields.Count); + CollectionAssert.Contains(fields, "id"); + CollectionAssert.Contains(fields, "name"); + CollectionAssert.Contains(fields, "myFavouritePet"); + + var child = item.GetValue("myFavouritePet") as IDataItem; + Assert.IsNotNull(child); + var childFields = child.GetFieldNames().ToList(); + Assert.AreEqual(3, childFields.Count); + CollectionAssert.Contains(childFields, "$type"); + CollectionAssert.Contains(childFields, "Name"); + CollectionAssert.Contains(childFields, "OtherName"); + + Assert.AreEqual("MyProject.Pets.Dog, MyProject", child.GetValue("$type")); + Assert.AreEqual("Foo", child.GetValue("Name")); + Assert.AreEqual("OtherFoo", child.GetValue("OtherName")); + } } } \ No newline at end of file diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs index 8a6cd20..c3375a0 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosMultiAccountSupportTests.cs @@ -38,8 +38,8 @@ public void CreateClient_WithTwoDifferentSinkAccounts_CreatesSeparateInstances() }; // Act - Create clients for two different sink accounts - CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); - CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); // Assert - Verify that two distinct client instances are created for different sink accounts Assert.IsNotNull(sink1Client, "First sink client should be created"); @@ -80,8 +80,8 @@ public void CreateClient_WithRbacAuth_CreatesSeparateInstancesForTwoDifferentSin }; // Act - Create clients for two different sink accounts - CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); - CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); // Assert - Verify separate instances for different sink accounts Assert.IsNotNull(sink1Client); @@ -117,8 +117,8 @@ public void CreateClient_WithTwoSinksToSameAccount_StillCreatesSeparateInstances }; // Act - Create clients for two sinks to the same account - CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); - CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); // Assert - Even with the same account, separate client instances should be created for multiple sinks Assert.IsNotNull(sink1Client); @@ -155,8 +155,8 @@ public void CreateClient_WithTwoSinksWithDifferentProxySettings_CreatesSeparateI }; // Act - Create clients for two sinks with different proxy settings - CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", "JSON"); - CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", "JSON"); + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings, "Cosmos-nosql", NullLogger.Instance, "JSON"); // Assert - Verify separate instances for different proxy configurations Assert.IsNotNull(sink1Client); @@ -197,8 +197,8 @@ public void SinkExtensionInitialization_CreatesIndependentClientsForMultipleSink var sink2Settings = sink2Config.Get(); // Act - Simulate what multiple sink operations do internally - CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings!, "Cosmos-nosql", "JSON"); - CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings!, "Cosmos-nosql", "JSON"); + CosmosClient sink1Client = CosmosExtensionServices.CreateClient(sink1Settings!, "Cosmos-nosql", NullLogger.Instance, "JSON"); + CosmosClient sink2Client = CosmosExtensionServices.CreateClient(sink2Settings!, "Cosmos-nosql", NullLogger.Instance, "JSON"); // Assert - Verify that multiple sink operations create independent clients Assert.IsNotNull(sink1Client, "First sink extension should create a client"); diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosSettingsBaseTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosSettingsBaseTests.cs new file mode 100644 index 0000000..6ca5f34 --- /dev/null +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosSettingsBaseTests.cs @@ -0,0 +1,25 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Cosmos.DataTransfer.CosmosExtension; + +namespace Cosmos.DataTransfer.CosmosExtension.UnitTests +{ + [TestClass] + public class CosmosSettingsBaseTests + { + private class TestableCosmosSettings : CosmosSettingsBase { } + + [TestMethod] + public void AllowBulkExecution_Property_ShouldSetAndGet() + { + var settings = new TestableCosmosSettings + { + AllowBulkExecution = true + }; + + Assert.IsTrue(settings.AllowBulkExecution, "AllowBulkExecution should be true when set to true"); + + settings.AllowBulkExecution = false; + Assert.IsFalse(settings.AllowBulkExecution, "AllowBulkExecution should be false when set to false"); + } + } +} diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/Data/TypeHintProperty.json b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/Data/TypeHintProperty.json new file mode 100644 index 0000000..01c40e2 --- /dev/null +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/Data/TypeHintProperty.json @@ -0,0 +1,9 @@ +{ + "id": "1", + "name": "Dog", + "myFavouritePet": { + "$type": "MyProject.Pets.Dog, MyProject", + "Name": "Foo", + "OtherName": "OtherFoo" + } +} diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs index 3305391..e3060a9 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs @@ -139,7 +139,7 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati var settings = config.Get(); settings.Validate(); - var client = CosmosExtensionServices.CreateClient(settings!, DisplayName, dataSource.DisplayName); + var client = CosmosExtensionServices.CreateClient(settings!, DisplayName, logger, dataSource.DisplayName); Container container; if (settings!.UseRbacAuth) @@ -286,7 +286,31 @@ private static MemoryStream CreateItemStream(ExpandoObject item) private static string? GetPropertyValue(ExpandoObject item, string propertyName) { - return ((IDictionary)item)[propertyName]?.ToString(); + // Handle nested property paths (e.g., "property1/property2/property3") + // Note: Calling code uses TrimStart('/') to remove leading slash before calling this method + var pathSegments = propertyName.Split('/'); + object? current = item; + + foreach (var segment in pathSegments) + { + if (current == null) + { + return null; + } + + if (current is not ExpandoObject expandoObj) + { + return null; + } + + var dict = (IDictionary)expandoObj; + if (!dict.TryGetValue(segment, out current)) + { + return null; + } + } + + return current?.ToString(); } public IEnumerable GetSettings() diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSourceExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSourceExtension.cs index 5a13ff8..9c2dbcb 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSourceExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSourceExtension.cs @@ -5,6 +5,7 @@ using Microsoft.Azure.Cosmos.Encryption; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; namespace Cosmos.DataTransfer.CosmosExtension { @@ -18,7 +19,7 @@ public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogge var settings = config.Get(); settings.Validate(); - var client = CosmosExtensionServices.CreateClient(settings!, DisplayName); + var client = CosmosExtensionServices.CreateClient(settings!, DisplayName, logger); Container container; @@ -40,19 +41,22 @@ public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogge } logger.LogInformation("Reading from {Database}.{Container}", settings.Database, settings.Container); - using FeedIterator> feedIterator = GetFeedIterator>(settings, container, requestOptions); + using FeedIterator feedIterator = GetFeedIterator(settings, container, requestOptions); while (feedIterator.HasMoreResults) { - foreach (var item in await feedIterator.ReadNextAsync(cancellationToken)) + foreach (var jObject in await feedIterator.ReadNextAsync(cancellationToken)) { + // Manually convert JObject to Dictionary to preserve all properties including $type + var dict = CosmosDictionaryDataItem.JObjectToDictionary(jObject); + if (!settings.IncludeMetadataFields) { - var corePropertiesOnly = new Dictionary(item.Where(kvp => !kvp.Key.StartsWith("_"))); + var corePropertiesOnly = new Dictionary(dict.Where(kvp => !kvp.Key.StartsWith("_"))); yield return new CosmosDictionaryDataItem(corePropertiesOnly); } else { - yield return new CosmosDictionaryDataItem(item); + yield return new CosmosDictionaryDataItem(dict); } } } diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDictionaryDataItem.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDictionaryDataItem.cs index 1324c37..e65eb90 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDictionaryDataItem.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDictionaryDataItem.cs @@ -13,6 +13,33 @@ public CosmosDictionaryDataItem(IDictionary items) Items = items; } + /// + /// Converts a JObject to a Dictionary while preserving all properties including metadata properties like $type. + /// + /// + /// Using ToObject<Dictionary> would filter out metadata properties because Newtonsoft.Json + /// treats properties like $type, $id, and $ref as special metadata even when TypeNameHandling is None. + /// + public static IDictionary JObjectToDictionary(JObject jObject) + { + return jObject.Properties().ToDictionary( + p => p.Name, + p => ConvertJTokenValue(p.Value)); + } + + /// + /// Converts a JToken to its appropriate object representation while preserving JObject and JArray types. + /// + private static object? ConvertJTokenValue(JToken token) + { + return token.Type switch + { + JTokenType.Object => token, // Keep as JObject, will be converted by GetChildObject + JTokenType.Array => token, // Keep as JArray, will be converted by GetChildObject + _ => ((JValue)token).Value // For primitives, extract the value + }; + } + public IEnumerable GetFieldNames() { return Items.Keys; @@ -32,13 +59,19 @@ public IEnumerable GetFieldNames() { if (value is JObject element) { - return new CosmosDictionaryDataItem(element.ToObject>(JsonSerializer.Create(RawJsonCosmosSerializer.GetDefaultSettings())) - .ToDictionary(k => k.Key, v => v.Value)); + // Use the public utility method for consistency + var dict = JObjectToDictionary(element); + return new CosmosDictionaryDataItem(dict); } if (value is JArray array) { - return array.ToObject>(JsonSerializer.Create(RawJsonCosmosSerializer.GetDefaultSettings())) - .Select(GetChildObject).ToList(); + return array.Select(item => { + if (item is JObject || item is JArray) + { + return GetChildObject(item); + } + return ((JValue)item).Value; + }).ToList(); } return value; diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs index ba585ca..c158a08 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs @@ -1,15 +1,14 @@ -using Azure.Identity; -using Cosmos.DataTransfer.Interfaces; +using Azure.Core; +using Azure.Identity; +using Azure.Security.KeyVault.Keys.Cryptography; using Microsoft.Azure.Cosmos; +using Microsoft.Azure.Cosmos.Encryption; using Microsoft.Extensions.Logging; using System.Globalization; -using System.Reflection; -using Azure.Core; -using System.Text.RegularExpressions; -using Microsoft.Azure.Cosmos.Encryption; -using Azure.Security.KeyVault.Keys.Cryptography; using System.Net; using System.Net.Http; +using System.Reflection; +using System.Text.RegularExpressions; namespace Cosmos.DataTransfer.CosmosExtension { @@ -37,7 +36,7 @@ public static class CosmosExtensionServices return new HttpClient(handler); }); - public static CosmosClient CreateClient(CosmosSettingsBase settings, string displayName, string? sourceDisplayName = null) + public static CosmosClient CreateClient(CosmosSettingsBase settings, string displayName, ILogger logger, string? sourceDisplayName = null) { string userAgentString = CreateUserAgentString(displayName, sourceDisplayName); @@ -76,6 +75,13 @@ public static CosmosClient CreateClient(CosmosSettingsBase settings, string disp ? () => _httpClientWithDefaultCredentialsAndPreAuth.Value : () => _httpClientWithDefaultCredentials.Value; } + + // Disable SSL certificate validation for development scenarios + if (settings.DisableSslValidation) + { + logger.LogWarning("SSL certificate validation is DISABLED. This should ONLY be used for development scenarios. Never use in production."); + clientOptions.ServerCertificateCustomValidationCallback = (cert, chain, errors) => true; + } CosmosClient? cosmosClient; if (settings.UseRbacAuth) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSettingsBase.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSettingsBase.cs index c381479..d147ec0 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSettingsBase.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSettingsBase.cs @@ -27,6 +27,26 @@ public abstract class CosmosSettingsBase : IValidatableObject /// public bool LimitToEndpoint { get; set; } = false; + /// + /// Disables SSL certificate validation for the Cosmos DB connection. + /// This is intended for use with local development environments. + /// WARNING: Never use this option in production as it disables critical security checks. + /// + public bool DisableSslValidation { get; set; } = false; + + /// + /// Enables bulk execution for Cosmos DB operations. + /// When set to true, operations such as bulk inserts and updates are optimized for performance. + /// + /// Default: false + /// + /// + /// Warning: Use with caution, as enabling bulk execution may affect consistency and error handling. + /// Review Cosmos DB documentation for bulk operation caveats before enabling in production environments. + /// + /// + public bool AllowBulkExecution { get; set; } = false; + public virtual IEnumerable Validate(ValidationContext validationContext) { if (!UseRbacAuth && string.IsNullOrEmpty(ConnectionString)) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/RawJsonCosmosSerializer.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/RawJsonCosmosSerializer.cs index f375393..efbbe15 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/RawJsonCosmosSerializer.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/RawJsonCosmosSerializer.cs @@ -8,7 +8,7 @@ namespace Cosmos.DataTransfer.CosmosExtension; /// Serializer for Cosmos allowing access to internal JsonSerializer settings. /// /// -/// Defaults to disabling metadata handling to allow passthrough of recognized properties like "$type". +/// Uses default metadata handling to allow passthrough of properties like "$type" as regular JSON properties. /// public class RawJsonCosmosSerializer : CosmosSerializer { @@ -18,7 +18,6 @@ public static JsonSerializerSettings GetDefaultSettings() => new() { DateParseHandling = DateParseHandling.None, - MetadataPropertyHandling = MetadataPropertyHandling.Ignore, ContractResolver = null, MaxDepth = 64, }; diff --git a/Extensions/Cosmos/README.md b/Extensions/Cosmos/README.md index 0c8363d..3f140f6 100644 --- a/Extensions/Cosmos/README.md +++ b/Extensions/Cosmos/README.md @@ -23,13 +23,68 @@ The tool supports writing to multiple different Cosmos DB accounts simultaneousl See the [Multiple Cosmos-NoSQL Sinks configuration example](../../ExampleConfigs.md#multiple-cosmos-nosql-sinks-different-accounts) for details. +## JSON Metadata Property Preservation + +The Cosmos extension preserves all JSON properties during data migration, including properties that start with special characters like `$type`, `$id`, and `$ref`. These properties are commonly used by serialization libraries (such as Newtonsoft.Json) to store type information for polymorphic objects or reference tracking. + +**Example**: If your source data contains documents with `$type` properties used for type discrimination: + +```json +{ + "id": "1", + "name": "Dog", + "myFavouritePet": { + "$type": "MyProject.Pets.Dog, MyProject", + "Name": "Foo", + "OtherName": "OtherFoo" + } +} +``` + +These properties will be preserved exactly as they appear in the source when migrating to the destination. This ensures that applications using type information embedded in JSON properties will continue to work correctly after migration. + +> **Note**: Prior to version 3.1.0, properties starting with `$` were filtered out during migration. If you need the old behavior, please use an earlier version of the tool. + ## Settings + +### Main Settings + +| Setting | Description | Default | +|------------------------|---------------------------------------------------------------------------------------------------|-----------| +| ConnectionString | Cosmos DB connection string (AccountEndpoint + AccountKey) | | +| UseRbacAuth | Use Role Based Access Control for authentication | false | +| AccountEndpoint | Cosmos DB account endpoint (required for RBAC) | | +| EnableInteractiveCredentials | Prompt for Azure login if default credentials are unavailable | false | +| Database | Cosmos DB database name | | +| Container | Cosmos DB container name | | +| WebProxy | Proxy server URL for Cosmos DB connections | | +| InitClientEncryption | Enable Always Encrypted feature | false | +| LimitToEndpoint | Restrict client to endpoint (see CosmosClientOptions.LimitToEndpoint) | false | +| DisableSslValidation | Disable SSL certificate validation (for local dev only; not for production) | false | +| AllowBulkExecution | Enable bulk execution for optimized performance.
**Warning:** May affect consistency and error handling. | false | + Source and sink require settings used to locate and access the Cosmos DB account. This can be done in one of two ways: - Using a `ConnectionString` that includes an AccountEndpoint and AccountKey - Using RBAC (Role Based Access Control) by setting `UseRbacAuth` to true and specifying `AccountEndpoint` and optionally `EnableInteractiveCredentials` to prompt the user to log in to Azure if default credentials are not available. See ([migrate-passwordless](https://learn.microsoft.com/azure/cosmos-db/nosql/migrate-passwordless?tabs=sign-in-azure-cli%2Cdotnet%2Cazure-portal-create%2Cazure-portal-associate%2Capp-service-identity) for how to configure Cosmos DB for passwordless access. + +### Bulk Execution + +The extension supports bulk execution for Cosmos DB operations. When the `AllowBulkExecution` setting is set to `true`, operations such as bulk inserts and updates are optimized for performance. Use with caution, as bulk execution may affect consistency and error handling. Default is `false`. + +Example: + +```json +{ + "ConnectionString": "AccountEndpoint=https://...", + "Database": "myDb", + "Container": "myContainer", + "AllowBulkExecution": true +} +``` + Source and sink settings also both require parameters to specify the data location within a Cosmos DB account: - `Database` @@ -46,7 +101,7 @@ Source supports the following optional parameters: ### Always Encrypted -Source and Sink support Always Encrypted as an optional parameter. When `InitClientEncryption` is set to `true`, the extension will initialize the Cosmos client with the Always Encrypted feature enabled. This allows for the use of encrypted fields in the Cosmos DB container. The extension will automatically decrypt the fields when reading from the source and encrypt the fields when writing to the sink. +Source and Sink support Always Encrypted as an optional parameter. When `InitClientEncryption` is set to `true`, the extension will initialize the Cosmos client with the Always Encrypted feature enabled. This allows for the use of encrypted fields in the Cosmos DB container. The extension will automatically decrypt the fields when reading from the source and encrypt the fields when writing to the sink.
The extension will also automatically handle the encryption keys and encryption policy for the client, but it requires `UseRbacAuth` to be set to `true` and the user to have the necessary permissions to access the key vault.
@@ -89,22 +144,39 @@ Or with RBAC: } ``` +#### Disable SSL Validation Configuration Example + +For development purposes with SSL validation disabled: + +```json +{ + "ConnectionString": "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDj...", + "Database":"myDb", + "Container":"myContainer", + "DisableSslValidation": true +} +``` + ### Sink Settings #### **Partition Key Settings** + - **`PartitionKeyPath`**: Specifies the partition key path when creating the container (e.g., `/id`) if it does not exist. - **`PartitionKeyPaths`**: Use this to supply an array of up to 3 paths for hierarchical partition keys. #### **Database Management** + - **`UseAutoscaleForDatabase`**: Specifies if the database will be created with autoscale enabled or manual. Defaults to `false`. manual. #### **Container Management** + - **`RecreateContainer`**: Optional, defaults to `false`. Deletes and recreates the container to ensure only newly imported data is present. - **`CreatedContainerMaxThroughput`**: Specifies the initial throughput (in RUs) for a newly created container. - **`UseAutoscaleForCreatedContainer`**: Enables autoscale for the newly created container. - **`UseSharedThroughput`**: Set to `true` to use shared throughput provisioned at the database level. #### **Batching and Write Behavior** + - **`BatchSize`**: Optional, defaults to `100`. Sets the number of items to accumulate before inserting. - **`WriteMode`**: Specifies the type of data write to use. Options: - `InsertStream` @@ -113,6 +185,7 @@ Or with RBAC: - `Upsert` #### **Connection Settings** + - **`ConnectionMode`**: Controls how the client connects to the Cosmos DB service. Options: - `Gateway` (default) - `Direct` @@ -123,16 +196,23 @@ Or with RBAC: - **`PreAuthenticate`**: Optional, defaults to `false`. When `true`, enables pre-authentication on the HttpClient, which sends credentials with the initial request rather than waiting for a 401/407 challenge. This can save extra round-trips but should only be used when the endpoint is trusted. - **`LimitToEndpoint`**: Optional, defaults to `false`. When the value of this property is false, the Cosmos DB SDK will automatically discover - write and read regions, and use them when the configured application region is not available. + write and read regions, and use them when the configured application region is not available. When set to `true`, availability is limited to the endpoint specified. - **Note**: [CosmosClientOptions.LimitToEndpoint Property](https://learn.microsoft.com/dotnet/api/microsoft.azure.cosmos.cosmosclientoptions.limittoendpoint?view=azure-dotnet). When using the Cosmos DB Emulator Container for Linux it's been observed - setting the value to `true` enables import and export of data. + setting the value to `true` enables import and export of data. + +#### **SSL/Certificate Settings** + +- **`DisableSslValidation`**: Optional, defaults to `false`. Disables SSL certificate validation for development/emulator scenarios. + - **⚠️ WARNING**: Only use this for development purposes. Never use in production environments as it disables critical security checks and makes connections vulnerable to man-in-the-middle attacks. #### **Serverless Account** + - **`IsServerlessAccount`**: Specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. - **Note**: Serverless accounts cannot have shared throughput. See [Azure Cosmos DB serverless account type](https://learn.microsoft.com/azure/cosmos-db/serverless#use-serverless-resources). #### **Client Behavior** + - **`PreserveMixedCaseIds`**: Optional, defaults to `false`. Writes `id` fields with their original casing while generating a separate lowercased `id` field as required by Cosmos. - **`IgnoreNullValues`**: Optional. Excludes fields with null values when writing to Cosmos DB. - **`InitClientEncryption`**: Optional, defaults to `false`. Uses client-side encryption with the container. Can only be used with `UseRbacAuth` set to `true` diff --git a/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonFileSinkTests.cs b/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonFileSinkTests.cs index 1ea0f42..eb9ccfd 100644 --- a/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonFileSinkTests.cs +++ b/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonFileSinkTests.cs @@ -2,6 +2,7 @@ using Cosmos.DataTransfer.Common.UnitTests; using Microsoft.Extensions.Logging.Abstractions; using Newtonsoft.Json; +using Newtonsoft.Json.Linq; namespace Cosmos.DataTransfer.JsonExtension.UnitTests { @@ -45,5 +46,68 @@ public async Task WriteAsync_WithFlatObjects_WritesToValidFile() Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Name == "Two")); Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Name == "Three")); } + + [TestMethod] + public async Task WriteAsync_WithNestedDictionaries_SerializesCorrectly() + { + // Test case for the MongoDB nested elements issue + var sink = new JsonFileSink(); + + var data = new List + { + new(new Dictionary + { + { "_id", new Dictionary { { "$oid", "some_id" } } }, + { "thread_id", "thread_id" }, + { "content", new List> + { + new Dictionary + { + { "text", "a message text" }, + { "type", "text" } + } + } + }, + { "role", "user" } + }) + }; + + string outputFile = $"{DateTime.Now:yy-MM-dd}_FS_Nested_Output.json"; + var config = TestHelpers.CreateConfig(new Dictionary + { + { "FilePath", outputFile } + }); + + await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonFileSource(), NullLogger.Instance); + + var jsonContent = await File.ReadAllTextAsync(outputFile); + var outputArray = JArray.Parse(jsonContent); + + Assert.AreEqual(1, outputArray.Count); + + var doc = outputArray[0] as JObject; + Assert.IsNotNull(doc); + + // Verify _id is an object with $oid field + var idObj = doc["_id"] as JObject; + Assert.IsNotNull(idObj, "_id should be an object"); + Assert.AreEqual("some_id", idObj["$oid"]?.ToString()); + + // Verify thread_id is a string + Assert.AreEqual("thread_id", doc["thread_id"]?.ToString()); + + // Verify content is an array of objects + var contentArray = doc["content"] as JArray; + Assert.IsNotNull(contentArray, "content should be an array"); + Assert.AreEqual(1, contentArray.Count); + + var contentItem = contentArray[0] as JObject; + Assert.IsNotNull(contentItem, "content item should be an object"); + Assert.AreEqual("a message text", contentItem["text"]?.ToString()); + Assert.AreEqual("text", contentItem["type"]?.ToString()); + + // Verify role is a string + Assert.AreEqual("user", doc["role"]?.ToString()); + } } } \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj index 36e5f3f..7498e5f 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj @@ -14,6 +14,7 @@ + diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs new file mode 100644 index 0000000..0f129ba --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -0,0 +1,317 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Microsoft.Extensions.Configuration; + +namespace Cosmos.DataTransfer.SqlServerExtension.UnitTests; + +[TestClass] +public class SqlServerSinkSettingsTests +{ + [TestMethod] + public void TestSinkSettings_DefaultWriteMode_IsInsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Insert, settings.WriteMode, "WriteMode should default to Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_CanBeSetToUpsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Upsert, settings.WriteMode, "WriteMode should be settable to Upsert"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_RequiresPrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should fail when PrimaryKeyColumns is empty and WriteMode is Upsert"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("PrimaryKeyColumns must be specified")), + "Validation error should mention PrimaryKeyColumns requirement"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_WithPrimaryKeyColumns_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass when PrimaryKeyColumns is provided with Upsert mode"); + } + + [TestMethod] + public void TestSinkSettings_InsertMode_DoesNotRequirePrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should not require PrimaryKeyColumns when WriteMode is Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_DeserializesFromJson() + { + // Test JSON to enum conversion for Insert + var jsonInsert = """{"WriteMode": "Insert"}"""; + var configInsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonInsert))) + .Build(); + var settingsInsert = configInsert.Get(); + Assert.AreEqual(SqlWriteMode.Insert, settingsInsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Insert'"); + + // Test JSON to enum conversion for Upsert + var jsonUpsert = """{"WriteMode": "Upsert"}"""; + var configUpsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonUpsert))) + .Build(); + var settingsUpsert = configUpsert.Get(); + Assert.AreEqual(SqlWriteMode.Upsert, settingsUpsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Upsert'"); + } + + [TestMethod] + public void TestSinkSettings_PrimaryKeyColumns_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id", "TenantId"], + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "TenantId"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.AreEqual(SqlWriteMode.Upsert, settings!.WriteMode, "WriteMode should be Upsert"); + Assert.IsNotNull(settings.PrimaryKeyColumns, "PrimaryKeyColumns should not be null"); + Assert.AreEqual(2, settings.PrimaryKeyColumns.Count, "Should have 2 primary key columns"); + CollectionAssert.AreEqual(new[] { "Id", "TenantId" }, settings.PrimaryKeyColumns, "Primary key columns should match"); + } + + [TestMethod] + public void TestSinkSettings_CompositePrimaryKey_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "TenantId", "UserId" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "TenantId" }, + new ColumnMapping { ColumnName = "UserId" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass with composite primary key"); + } + + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should fail when all columns are primary keys without DeleteNotMatchedBySource"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")), + "Validation error should mention non-primary key column requirement"); + } + + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_WithDelete_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should pass when all columns are primary keys but DeleteNotMatchedBySource is true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DefaultsToFalse() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsFalse(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should default to false"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_CanBeSetToTrue() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsTrue(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be settable to true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "Name"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.IsTrue(settings!.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_WithInsertMode_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.DeleteNotMatchedBySource))), + "Validation should fail when DeleteNotMatchedBySource is true with Insert mode"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("can only be used when WriteMode is Upsert")), + "Validation error should mention Upsert mode requirement"); + } +} diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index 61804f4..1549abd 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -18,16 +18,151 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati var settings = config.Get(); settings.Validate(); + if (settings!.WriteMode == SqlWriteMode.Upsert) + { + await WriteUpsertAsync(dataItems, settings, logger, cancellationToken); + } + else + { + await WriteInsertAsync(dataItems, settings, logger, cancellationToken); + } + } + + private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } await using var connection = new SqlConnection(settings.ConnectionString); await connection.OpenAsync(cancellationToken); - await using (var transaction = connection.BeginTransaction()) + await using var transaction = connection.BeginTransaction(); + + try { + using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); + bulkCopy.DestinationTableName = tableName; + + var dataColumns = new Dictionary(); + foreach (ColumnMapping columnMapping in settings.ColumnMappings) + { + Type type = Type.GetType(columnMapping.DataType ?? "System.String")!; + DataColumn dbColumn = new DataColumn(columnMapping.ColumnName, type); + dataColumns.Add(columnMapping, dbColumn); + bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); + } + + + var dataTable = new DataTable(); + dataTable.Columns.AddRange(dataColumns.Values.ToArray()); + + var batches = dataItems.Buffer(settings.BatchSize); + await foreach (var batch in batches.WithCancellation(cancellationToken)) + { + foreach (var item in batch) + { + var fieldNames = item.GetFieldNames().ToList(); + DataRow row = dataTable.NewRow(); + foreach (var columnMapping in dataColumns) + { + DataColumn column = columnMapping.Value; + ColumnMapping mapping = columnMapping.Key; + + string? fieldName = mapping.GetFieldName(); + if (fieldName != null) + { + object? value = null; + var sourceField = fieldNames.FirstOrDefault(n => n.Equals(fieldName, StringComparison.CurrentCultureIgnoreCase)); + if (sourceField != null) + { + value = item.GetValue(sourceField); + } + + if (value != null || mapping.AllowNull) + { + if (value is IDataItem child) + { + value = child.AsJsonString(false, false); + } + row[column.ColumnName] = value; + } + else + { + row[column.ColumnName] = mapping.DefaultValue; + } + } + } + dataTable.Rows.Add(row); + } + await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); + dataTable.Clear(); + } + + await transaction.CommitAsync(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error copying data to table {TableName}", tableName); try { - using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); - bulkCopy.DestinationTableName = tableName; + await transaction.RollbackAsync(cancellationToken); + } + catch (Exception rollbackEx) + { + logger.LogError(rollbackEx, "Error rolling back transaction for table {TableName}", tableName); + } + throw; + } + + await connection.CloseAsync(); + } + + private async Task WriteUpsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { + string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } + foreach (var pkColumn in settings.PrimaryKeyColumns) + { + ValidateSqlIdentifier(pkColumn, nameof(settings.PrimaryKeyColumns)); + } + + string stagingTableName = $"#Staging_{Guid.NewGuid():N}"; + + await using var connection = new SqlConnection(settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + try + { + // Create staging table with same structure as target table + var createStagingTableSql = $@" + SELECT TOP 0 * + INTO {stagingTableName} + FROM {tableName}"; + + await using (var createCommand = new SqlCommand(createStagingTableSql, connection)) + { + await createCommand.ExecuteNonQueryAsync(cancellationToken); + } + + // Bulk insert into staging table + using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, null)) + { + bulkCopy.DestinationTableName = stagingTableName; var dataColumns = new Dictionary(); foreach (ColumnMapping columnMapping in settings.ColumnMappings) @@ -38,7 +173,6 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); } - var dataTable = new DataTable(); dataTable.Columns.AddRange(dataColumns.Values.ToArray()); @@ -83,17 +217,98 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); dataTable.Clear(); } - - await transaction.CommitAsync(cancellationToken); } - catch (Exception ex) + + // Build and execute MERGE statement + var mergeStatement = BuildMergeStatement(tableName, stagingTableName, settings); + logger.LogInformation("Executing MERGE statement for upsert operation"); + + await using (var mergeCommand = new SqlCommand(mergeStatement, connection)) { - Console.WriteLine($"Error copying data to table {tableName}: {ex.Message}"); - await transaction.RollbackAsync(cancellationToken); + mergeCommand.CommandTimeout = 300; // 5 minutes timeout for large merges + var rowsAffected = await mergeCommand.ExecuteNonQueryAsync(cancellationToken); + logger.LogInformation("MERGE completed. Rows affected: {RowsAffected}", rowsAffected); } } + catch (Exception ex) + { + logger.LogError(ex, "Error during upsert operation to table {TableName}", tableName); + throw; + } + finally + { + // Clean up staging table (temp tables are automatically dropped on connection close) + await connection.CloseAsync(); + } + } - await connection.CloseAsync(); + /// + /// Validates that a SQL identifier contains only allowed characters to prevent SQL injection. + /// Allows alphanumeric characters, underscores, dots (for schema.table), and spaces (for quoted identifiers). + /// + private static void ValidateSqlIdentifier(string identifier, string parameterName) + { + if (string.IsNullOrWhiteSpace(identifier)) + { + throw new ArgumentException("SQL identifier cannot be null or empty.", parameterName); + } + + // Allow alphanumeric, underscore, dot (for schema.table), space (for quoted identifiers), and brackets + if (!System.Text.RegularExpressions.Regex.IsMatch(identifier, @"^[\w\.\s\[\]]+$")) + { + throw new ArgumentException( + $"Invalid SQL identifier '{identifier}'. Identifiers can only contain alphanumeric characters, underscores, dots, spaces, and brackets.", + parameterName); + } + } + + private string BuildMergeStatement(string targetTable, string stagingTable, SqlServerSinkSettings settings) + { + var allColumns = settings.ColumnMappings.Select(m => m.ColumnName).ToList(); + var primaryKeys = settings.PrimaryKeyColumns; + var nonKeyColumns = allColumns.Except(primaryKeys).ToList(); + + // Build ON clause for matching + var onClause = string.Join(" AND ", + primaryKeys.Select(pk => $"target.[{pk}] = source.[{pk}]")); + + // Build INSERT columns and values + var insertColumns = string.Join(", ", allColumns.Select(col => $"[{col}]")); + var insertValues = string.Join(", ", allColumns.Select(col => $"source.[{col}]")); + + // Build the MERGE statement + var mergeStatement = $@" + MERGE {targetTable} AS target + USING {stagingTable} AS source + ON ({onClause})"; + + // Only add UPDATE clause if there are non-key columns to update + if (nonKeyColumns.Count > 0) + { + var updateSet = string.Join(", ", + nonKeyColumns.Select(col => $"target.[{col}] = source.[{col}]")); + + mergeStatement += $@" + WHEN MATCHED THEN + UPDATE SET {updateSet}"; + } + + mergeStatement += $@" + WHEN NOT MATCHED BY TARGET THEN + INSERT ({insertColumns}) + VALUES ({insertValues})"; + + // Add DELETE clause if requested for full table synchronization + if (settings.DeleteNotMatchedBySource) + { + mergeStatement += $@" + WHEN NOT MATCHED BY SOURCE THEN + DELETE"; + } + + mergeStatement += ";"; + + return mergeStatement; } public IEnumerable GetSettings() diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index 01f390f..038c6ac 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -4,7 +4,7 @@ namespace Cosmos.DataTransfer.SqlServerExtension { - public class SqlServerSinkSettings : IDataExtensionSettings + public class SqlServerSinkSettings : IDataExtensionSettings, IValidatableObject { [Required] [SensitiveValue] @@ -18,5 +18,62 @@ public class SqlServerSinkSettings : IDataExtensionSettings [MinLength(1)] public List ColumnMappings { get; set; } = new List(); + /// + /// Specifies the behavior when writing data to SQL Server. + /// Insert: Inserts new records only (default). + /// Upsert: Uses SQL MERGE to insert or update based on primary key columns. + /// + public SqlWriteMode WriteMode { get; set; } = SqlWriteMode.Insert; + + /// + /// List of column names that form the primary key for the table. + /// Required when WriteMode is Upsert. These columns are used in the MERGE ON clause. + /// + public List PrimaryKeyColumns { get; set; } = new List(); + + /// + /// When true and WriteMode is Upsert, records in the destination that do not exist in the source will be deleted. + /// This enables full table synchronization. Use with caution as this can result in data loss. + /// Default is false. + /// + public bool DeleteNotMatchedBySource { get; set; } = false; + + public IEnumerable Validate(ValidationContext validationContext) + { + var results = new List(); + + // Custom validation for Upsert mode + if (WriteMode == SqlWriteMode.Upsert) + { + if (PrimaryKeyColumns == null || PrimaryKeyColumns.Count == 0) + { + results.Add(new ValidationResult( + "PrimaryKeyColumns must be specified when WriteMode is Upsert.", + new[] { nameof(PrimaryKeyColumns) })); + } + else + { + // Ensure at least one non-key column exists for updates, unless we're only doing DELETE sync + var allColumns = ColumnMappings.Select(m => m.ColumnName).ToList(); + var nonKeyColumns = allColumns.Except(PrimaryKeyColumns).ToList(); + + if (nonKeyColumns.Count == 0 && !DeleteNotMatchedBySource) + { + results.Add(new ValidationResult( + "At least one non-primary key column must be specified in ColumnMappings for Upsert mode, or set DeleteNotMatchedBySource to true.", + new[] { nameof(ColumnMappings) })); + } + } + } + else if (WriteMode == SqlWriteMode.Insert && DeleteNotMatchedBySource) + { + // DeleteNotMatchedBySource only works with Upsert mode + results.Add(new ValidationResult( + "DeleteNotMatchedBySource can only be used when WriteMode is Upsert.", + new[] { nameof(DeleteNotMatchedBySource) })); + } + + return results; + } } } \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs new file mode 100644 index 0000000..dd2d526 --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs @@ -0,0 +1,21 @@ +namespace Cosmos.DataTransfer.SqlServerExtension +{ + /// + /// Defines the behavior when writing data to SQL Server. + /// + public enum SqlWriteMode + { + /// + /// Inserts new records only using bulk insert. This is the default behavior. + /// + Insert, + + /// + /// Uses SQL MERGE to insert new records or update existing ones based on primary key columns. + /// When matched: updates all non-key columns with source values. + /// When not matched: inserts new records. + /// Requires PrimaryKeyColumns to be specified. + /// + Upsert + } +} diff --git a/Extensions/SqlServer/README.md b/Extensions/SqlServer/README.md index 0c232c5..4716f2e 100644 --- a/Extensions/SqlServer/README.md +++ b/Extensions/SqlServer/README.md @@ -89,6 +89,26 @@ Sink settings require a `TableName` to define where to insert data and an array Sink settings also include an optional `BatchSize` parameter to specify the count of records to accumulate before bulk inserting, default value is 1000. +#### WriteMode + +The `WriteMode` parameter specifies how data should be written to the target table: + +- `Insert` (default): Inserts new records only using SQL bulk insert. This is the fastest mode but will fail if duplicate keys exist. +- `Upsert`: Uses SQL MERGE to insert new records or update existing ones based on primary key columns. When records match on the primary key(s), all non-key columns are updated with source values. When records don't match, new records are inserted. + +When using `Upsert` mode, you must also specify the `PrimaryKeyColumns` parameter with a list of column names that form the primary key. This can be a single column or a composite key. + +**Important Notes on Upsert Mode:** +- Upsert mode uses a temporary staging table and SQL MERGE statement +- All columns in `ColumnMappings` must exist in the target table +- Primary key columns must be included in `ColumnMappings` +- The operation syncs the source data with the destination (INSERT when not matched, UPDATE when matched) +- By default, DELETE operations are not performed - records only in the destination remain unchanged +- To enable full table synchronization including deletions, set `DeleteNotMatchedBySource` to `true` (use with caution as this can result in data loss) +- Performance may be slower than Insert mode due to the MERGE operation overhead + +#### Basic Insert Example + ```json { "ConnectionString": "", @@ -110,10 +130,112 @@ Sink settings also include an optional `BatchSize` parameter to specify the coun { "ColumnName": "IsSet", "AllowNull": false, - "DefaultValue": false + "DefaultValue": false, "DataType": "System.Boolean" } ], "BatchSize": 1000 } ``` + +#### Upsert Example + +```json +{ + "ConnectionString": "Server=.;Database=PaymentService;Trusted_Connection=True;", + "TableName": "AccountTransactions", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "ColumnMappings": [ + { + "ColumnName": "Id" + }, + { + "ColumnName": "AccountNumber" + }, + { + "ColumnName": "TransactionDate", + "DataType": "System.DateTime" + }, + { + "ColumnName": "Amount", + "DataType": "System.Decimal" + }, + { + "ColumnName": "Status" + } + ], + "BatchSize": 1000 +} +``` + +#### Upsert with Composite Key Example + +```json +{ + "ConnectionString": "Server=.;Database=SalesDB;Trusted_Connection=True;", + "TableName": "OrderLineItems", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["OrderId", "LineItemId"], + "ColumnMappings": [ + { + "ColumnName": "OrderId", + "DataType": "System.Int32" + }, + { + "ColumnName": "LineItemId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Quantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "UnitPrice", + "DataType": "System.Decimal" + } + ], + "BatchSize": 500 +} +``` + +#### Full Table Sync with DELETE Example + +This example demonstrates full table synchronization where records that exist in the destination but not in the source will be deleted. **Use this with caution as it can result in data loss.** + +```json +{ + "ConnectionString": "Server=.;Database=InventoryDB;Trusted_Connection=True;", + "TableName": "Products", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["ProductId"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + { + "ColumnName": "ProductId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Price", + "DataType": "System.Decimal" + }, + { + "ColumnName": "StockQuantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "LastUpdated", + "DataType": "System.DateTime" + } + ], + "BatchSize": 1000 +} +``` + +**Warning:** When `DeleteNotMatchedBySource` is set to `true`, any records in the destination table that do not have a matching primary key in the source data will be permanently deleted. Ensure you have proper backups and understand the implications before enabling this option. diff --git a/Interfaces/Cosmos.DataTransfer.Common.UnitTests/DataItemJsonConverterTests.cs b/Interfaces/Cosmos.DataTransfer.Common.UnitTests/DataItemJsonConverterTests.cs index 700dafa..ebdec38 100644 --- a/Interfaces/Cosmos.DataTransfer.Common.UnitTests/DataItemJsonConverterTests.cs +++ b/Interfaces/Cosmos.DataTransfer.Common.UnitTests/DataItemJsonConverterTests.cs @@ -194,5 +194,81 @@ public void Test_AsJsonString(bool includeNullFields) { var json = DataItemJsonConverter.AsJsonString(obj, false, includeNullFields); Assert.AreEqual(expected, json); } + + [TestMethod] + [DataRow(false)] + [DataRow(true)] + public void Test_WriteFieldValue_DictionaryAsNestedObject(bool includeNullFields) + { + // Test that Dictionary is properly serialized as nested object + var nestedDict = new Dictionary + { + { "text", "a message text" }, + { "type", "text" }, + { "NULL", null } + }; + + var expected = "\"x\":{\"text\":\"a message text\",\"type\":\"text\",\"NULL\":null}"; + if (!includeNullFields) + { + expected = expected.Replace(",\"NULL\":null", ""); + } + + var (writer, readFunc) = CreateUtf8JsonWriter(); + DataItemJsonConverter.WriteFieldValue(writer, "x", nestedDict, includeNullFields: includeNullFields); + Assert.AreEqual(expected, readFunc(), $"includeNullFields: {includeNullFields}"); + } + + [TestMethod] + [DataRow(false)] + [DataRow(true)] + public void Test_WriteFieldValue_ArrayOfDictionaries(bool includeNullFields) + { + // Test array of dictionaries (simulating MongoDB nested array scenario) + var arrayOfDicts = new List> + { + new Dictionary + { + { "text", "a message text" }, + { "type", "text" } + }, + new Dictionary + { + { "text", "another message" }, + { "type", "text" } + } + }; + + var expected = "\"x\":[{\"text\":\"a message text\",\"type\":\"text\"},{\"text\":\"another message\",\"type\":\"text\"}]"; + + var (writer, readFunc) = CreateUtf8JsonWriter(); + DataItemJsonConverter.WriteFieldValue(writer, "x", arrayOfDicts, includeNullFields: includeNullFields); + Assert.AreEqual(expected, readFunc(), $"includeNullFields: {includeNullFields}"); + } + + [TestMethod] + public void Test_AsJsonString_CompleteMongoScenario() + { + // Test complete scenario from the issue: nested _id object and array of content dictionaries + var mongoStyleDoc = new DictionaryDataItem(new Dictionary + { + { "_id", new Dictionary { { "$oid", "some_id" } } }, + { "thread_id", "thread_id" }, + { "content", new List> + { + new Dictionary + { + { "text", "a message text" }, + { "type", "text" } + } + } + }, + { "role", "user" } + }); + + var expected = "{\"_id\":{\"$oid\":\"some_id\"},\"thread_id\":\"thread_id\",\"content\":[{\"text\":\"a message text\",\"type\":\"text\"}],\"role\":\"user\"}"; + var json = DataItemJsonConverter.AsJsonString(mongoStyleDoc, false, false); + Assert.AreEqual(expected, json); + } } diff --git a/Interfaces/Cosmos.DataTransfer.Common/DataItemJsonConverter.cs b/Interfaces/Cosmos.DataTransfer.Common/DataItemJsonConverter.cs index 342a682..e258add 100644 --- a/Interfaces/Cosmos.DataTransfer.Common/DataItemJsonConverter.cs +++ b/Interfaces/Cosmos.DataTransfer.Common/DataItemJsonConverter.cs @@ -104,6 +104,12 @@ internal static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, ob { WriteDataItem(writer, child, includeNullFields, propertyName); } + else if (fieldValue is IDictionary dict) + { + // Handle dictionaries (e.g., from MongoDB BsonDocument conversion) as nested objects + var dictItem = new DictionaryDataItem(dict); + WriteDataItem(writer, dictItem, includeNullFields, propertyName); + } else if (fieldValue is not string && fieldValue is IEnumerable children) { writer.WriteStartArray(propertyName); @@ -113,6 +119,12 @@ internal static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, ob { WriteDataItem(writer, arrayChild, includeNullFields); } + else if (arrayItem is IDictionary arrayDict) + { + // Handle dictionaries (e.g., from MongoDB BsonDocument conversion) as nested objects + var arrayDictItem = new DictionaryDataItem(arrayDict); + WriteDataItem(writer, arrayDictItem, includeNullFields); + } else if (TryGetLong(arrayItem, out var longValue)) { writer.WriteNumberValue(longValue);