-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathAzureTableAPIDataSourceExtension.cs
More file actions
94 lines (80 loc) · 4.3 KB
/
AzureTableAPIDataSourceExtension.cs
File metadata and controls
94 lines (80 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
using System.ComponentModel.Composition;
using System.Runtime.CompilerServices;
using Azure;
using Azure.Identity;
using Azure.Data.Tables;
using Cosmos.DataTransfer.AzureTableAPIExtension.Data;
using Cosmos.DataTransfer.AzureTableAPIExtension.Settings;
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Cosmos.DataTransfer.AzureTableAPIExtension
{
[Export(typeof(IDataSourceExtension))]
public class AzureTableAPIDataSourceExtension : IDataSourceExtensionWithSettings
{
public string DisplayName => "AzureTableAPI";
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var settings = config.Get<AzureTableAPIDataSourceSettings>();
settings.Validate();
TableServiceClient serviceClient;
if (settings!.UseRbacAuth)
{
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureTableAPIDataSourceSettings.UseRbacAuth), nameof(AzureTableAPIDataSourceSettings.EnableInteractiveCredentials));
var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
var baseUri = new Uri(settings.AccountEndpoint);
#pragma warning restore CS8604 // Restore warning
serviceClient = new TableServiceClient(baseUri, credential);
}
else
{
logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureTableAPIDataSinkSettings.ConnectionString));
serviceClient = new TableServiceClient(settings.ConnectionString);
}
var tableClient = serviceClient.GetTableClient(settings.Table);
logger.LogInformation("Reading from table '{Table}'", settings.Table);
AsyncPageable<TableEntity> queryResults;
if (!string.IsNullOrWhiteSpace(settings.QueryFilter)) {
logger.LogInformation("Applying QueryFilter: {QueryFilter}", settings.QueryFilter);
if (settings.QueryFilter.Contains("Timestamp", StringComparison.OrdinalIgnoreCase))
{
logger.LogWarning("QueryFilter references the system 'Timestamp' property. " +
"Note: Cosmos DB Table API does not support filtering on the system Timestamp property — " +
"queries will silently return 0 results. Consider using a custom datetime property instead. " +
"This limitation does not apply to Azure Storage Tables.");
}
queryResults = tableClient.QueryAsync<TableEntity>(filter: settings.QueryFilter);
} else {
logger.LogInformation("No QueryFilter specified, reading all entities");
queryResults = tableClient.QueryAsync<TableEntity>();
}
int itemCount = 0;
await foreach (var entity in queryResults.WithCancellation(cancellationToken))
{
yield return new AzureTableAPIDataItem(entity, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
itemCount++;
}
if (itemCount > 0)
{
logger.LogInformation("Read {ItemCount} items from table '{Table}'", itemCount, settings.Table);
}
else
{
if (!string.IsNullOrWhiteSpace(settings.QueryFilter))
{
logger.LogWarning("No items read from table '{Table}' with QueryFilter: {QueryFilter}. Verify the filter syntax is correct for your table API provider.", settings.Table, settings.QueryFilter);
}
else
{
logger.LogWarning("No items read from table '{Table}'", settings.Table);
}
}
}
public IEnumerable<IDataExtensionSettings> GetSettings()
{
yield return new AzureTableAPIDataSourceSettings();
}
}
}