-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRabbitMqMessageTransport.cs
More file actions
196 lines (170 loc) · 7.2 KB
/
RabbitMqMessageTransport.cs
File metadata and controls
196 lines (170 loc) · 7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
namespace NetEvolve.Pulse.Outbox;
using System.Text;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Internals;
using RabbitMQ.Client;
/// <summary>
/// Message transport that publishes outbox messages to RabbitMQ exchanges.
/// </summary>
/// <remarks>
/// <para><strong>Connection Management:</strong></para>
/// This transport uses an injected connection adapter and creates channels on demand.
/// The connection lifetime is managed externally via dependency injection.
/// <para><strong>Routing Key Resolution:</strong></para>
/// Each message is published with a routing key resolved by <see cref="ITopicNameResolver"/>.
/// By default, the simple class name of the event type is used (e.g., <c>"OrderCreated"</c>).
/// <para><strong>Payload:</strong></para>
/// The raw JSON payload from <see cref="OutboxMessage.Payload"/> is published as the message body.
/// <para><strong>Health Checks:</strong></para>
/// The <see cref="IsHealthyAsync"/> method verifies that the connection and channel are open.
/// </remarks>
internal sealed class RabbitMqMessageTransport : IMessageTransport, IDisposable
{
/// <summary>The resolved transport options controlling the RabbitMQ connection and exchange settings.</summary>
private readonly RabbitMqTransportOptions _options;
/// <summary>The topic name resolver used to determine the routing key from an outbox message.</summary>
private readonly ITopicNameResolver _topicNameResolver;
/// <summary>The RabbitMQ connection adapter.</summary>
private readonly IRabbitMqConnectionAdapter _connectionAdapter;
/// <summary>Lazy-initialized RabbitMQ channel for publishing.</summary>
private IRabbitMqChannelAdapter? _channel;
/// <summary>Semaphore for thread-safe channel initialization.</summary>
private readonly SemaphoreSlim _initializationLock = new(1, 1);
/// <summary>Indicates whether the transport has been disposed.</summary>
private bool _disposed;
/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqMessageTransport"/> class.
/// </summary>
/// <param name="connectionAdapter">The RabbitMQ connection adapter.</param>
/// <param name="topicNameResolver">The topic name resolver for determining routing keys from outbox messages.</param>
/// <param name="options">The transport options.</param>
internal RabbitMqMessageTransport(
IRabbitMqConnectionAdapter connectionAdapter,
ITopicNameResolver topicNameResolver,
IOptions<RabbitMqTransportOptions> options
)
{
ArgumentNullException.ThrowIfNull(connectionAdapter);
ArgumentNullException.ThrowIfNull(topicNameResolver);
ArgumentNullException.ThrowIfNull(options);
_connectionAdapter = connectionAdapter;
_topicNameResolver = topicNameResolver;
_options = options.Value;
}
/// <inheritdoc />
public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(messages);
var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
foreach (var message in messages)
{
await PublishAsync(channel, message, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Publishes a single outbox message to the RabbitMQ exchange using the provided channel.
/// </summary>
/// <param name="channel">The channel to use for publishing.</param>
/// <param name="message">The outbox message to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
private async Task PublishAsync(
IRabbitMqChannelAdapter channel,
OutboxMessage message,
CancellationToken cancellationToken
)
{
var routingKey = ResolveRoutingKey(message);
var body = Encoding.UTF8.GetBytes(message.Payload);
var properties = new BasicProperties
{
MessageId = message.Id.ToString(),
CorrelationId = message.CorrelationId,
ContentType = "application/json",
Timestamp = new AmqpTimestamp(message.CreatedAt.ToUnixTimeSeconds()),
Headers = new Dictionary<string, object?>
{
["eventType"] = message.EventType.ToOutboxEventTypeName(),
["retryCount"] = message.RetryCount,
},
};
await channel
.BasicPublishAsync(
exchange: _options.ExchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);
}
/// <inheritdoc />
public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default)
{
try
{
if (_connectionAdapter?.IsOpen != true || _channel?.IsOpen != true)
{
return Task.FromResult(false);
}
// Perform a lightweight check by verifying channel is still open
// RabbitMQ client maintains the connection state internally
return Task.FromResult(_channel.IsOpen);
}
catch (Exception) when (!cancellationToken.IsCancellationRequested)
{
return Task.FromResult(false);
}
}
/// <summary>
/// Ensures that a channel is available, creating it if necessary.
/// </summary>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>The initialized channel.</returns>
private async Task<IRabbitMqChannelAdapter> EnsureChannelAsync(CancellationToken cancellationToken)
{
if (_channel?.IsOpen == true)
{
return _channel;
}
await _initializationLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_channel?.IsOpen == true)
{
return _channel;
}
_channel = await _connectionAdapter.CreateChannelAsync(cancellationToken).ConfigureAwait(false);
return _channel;
}
finally
{
_ = _initializationLock.Release();
}
}
/// <summary>
/// Resolves the routing key for a given outbox message.
/// </summary>
/// <param name="message">The outbox message to resolve the routing key from.</param>
/// <returns>The resolved routing key.</returns>
private string ResolveRoutingKey(OutboxMessage message) => _topicNameResolver.Resolve(message);
/// <inheritdoc />
public void Dispose()
{
if (_disposed)
{
return;
}
_channel?.Dispose();
_initializationLock.Dispose();
_disposed = true;
}
}