-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAzureQueueStorageMessageTransport.cs
More file actions
161 lines (141 loc) · 5.71 KB
/
Copy pathAzureQueueStorageMessageTransport.cs
File metadata and controls
161 lines (141 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
namespace NetEvolve.Pulse.Outbox;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Text.Json;
using Azure.Identity;
using Azure.Storage.Queues;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;
/// <summary>
/// Azure Queue Storage transport implementation for the outbox processor.
/// </summary>
/// <remarks>
/// The <see cref="QueueClient"/> is lazily initialized on first use. If
/// <see cref="AzureQueueStorageTransportOptions.CreateQueueIfNotExists"/> is <see langword="true"/>,
/// the queue is created automatically during initialization.
/// Messages are JSON-serialized and Base64-encoded before sending.
/// Raw message size must not exceed 48 KB (the Azure Queue Storage Base64-encoded limit of 64 KB).
/// </remarks>
public sealed class AzureQueueStorageMessageTransport : IMessageTransport, IDisposable
{
internal const int MaxMessageSizeInBytes = 48 * 1024; // Raw 48 KB limit (64 KB after Base64 encoding)
private readonly AzureQueueStorageTransportOptions _options;
private readonly QueueClient? _queueClientOverride;
private readonly SemaphoreSlim _initLock = new SemaphoreSlim(1, 1);
private QueueClient? _queueClient;
/// <summary>
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class.
/// </summary>
/// <param name="options">The configured transport options.</param>
internal AzureQueueStorageMessageTransport(IOptions<AzureQueueStorageTransportOptions> options)
{
ArgumentNullException.ThrowIfNull(options);
_options = options.Value;
}
/// <summary>
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class
/// with a pre-built queue client. Used for testing.
/// </summary>
/// <param name="options">The configured transport options.</param>
/// <param name="queueClient">A pre-built queue client to use instead of creating one from options.</param>
internal AzureQueueStorageMessageTransport(
IOptions<AzureQueueStorageTransportOptions> options,
QueueClient queueClient
)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(queueClient);
_options = options.Value;
_queueClientOverride = queueClient;
}
/// <inheritdoc />
public void Dispose() => _initLock.Dispose();
/// <inheritdoc />
public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
var json = SerializeMessage(message);
var rawBytes = Encoding.UTF8.GetBytes(json);
if (rawBytes.Length > MaxMessageSizeInBytes)
{
throw new InvalidOperationException(
$"Message size {rawBytes.Length} bytes exceeds the Azure Queue Storage limit of {MaxMessageSizeInBytes} bytes (48 KB raw / 64 KB Base64-encoded)."
);
}
var base64 = Convert.ToBase64String(rawBytes);
var queueClient = await GetQueueClientAsync(cancellationToken).ConfigureAwait(false);
_ = await queueClient
.SendMessageAsync(
base64,
visibilityTimeout: _options.MessageVisibilityTimeout,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);
}
/// <inheritdoc />
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(messages);
foreach (var message in messages)
{
await SendAsync(message, cancellationToken).ConfigureAwait(false);
}
}
private static string SerializeMessage(OutboxMessage message) =>
JsonSerializer.Serialize(
new
{
id = message.Id,
eventType = message.EventType.ToOutboxEventTypeName(),
payload = message.Payload,
correlationId = message.CorrelationId,
causationId = message.CausationId,
createdAt = message.CreatedAt,
}
);
[SuppressMessage(
"Maintainability",
"CA1508:Avoid dead conditional code",
Justification = "Double-checked locking: the inner null check guards against concurrent initialization after the semaphore is acquired."
)]
private async Task<QueueClient> GetQueueClientAsync(CancellationToken cancellationToken)
{
if (_queueClientOverride is not null)
{
return _queueClientOverride;
}
if (_queueClient is not null)
{
return _queueClient;
}
await _initLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Re-check after acquiring the lock (double-checked locking pattern).
if (_queueClient is not null)
{
return _queueClient;
}
QueueClient client;
if (!string.IsNullOrWhiteSpace(_options.ConnectionString))
{
client = new QueueClient(_options.ConnectionString, _options.QueueName);
}
else
{
var queueUri = new Uri(_options.QueueServiceUri!, _options.QueueName);
client = new QueueClient(queueUri, new DefaultAzureCredential());
}
if (_options.CreateQueueIfNotExists)
{
_ = await client.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}
_queueClient = client;
return _queueClient;
}
finally
{
_initLock.Release();
}
}
}