-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathActiveMqQueueSubscribingMessageAdapter.cs
More file actions
121 lines (95 loc) · 3.68 KB
/
ActiveMqQueueSubscribingMessageAdapter.cs
File metadata and controls
121 lines (95 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.NMS;
using Microsoft.Extensions.Logging;
using ViennaNET.Messaging.Messages;
using ViennaNET.Utils;
namespace ViennaNET.Messaging.ActiveMQQueue
{
/// <summary>
/// Адаптер, реализующий взаимодействие с очередью ActiveMQ в режиме подписки
/// </summary>
/// <remarks>Не поддерживает транзакции</remarks>
public class ActiveMqQueueSubscribingMessageAdapter : ActiveMqQueueMessageAdapter, IMessageAdapterWithSubscribing
{
private const int DefaultReplyTimeout = 30;
private readonly ActiveMqQueueConfiguration _configuration;
private readonly List<MessageListener> _listeners = new();
/// <summary>
/// Конструктор адаптера
/// </summary>
/// <param name="connectionFactory">Фабрика подключений</param>
/// <param name="queueConfiguration">Параметры очереди</param>
/// <param name="logger">Сервис логирования</param>
public ActiveMqQueueSubscribingMessageAdapter(IActiveMqConnectionFactory connectionFactory,
ActiveMqQueueConfiguration queueConfiguration,
ILogger logger)
: base(connectionFactory, queueConfiguration, logger)
{
_configuration = queueConfiguration;
}
/// <inheritdoc />
public void Subscribe(Func<BaseMessage, Task> handler)
{
handler.ThrowIfNull(nameof(handler));
var listener = new MessageListener(msg => handler.Invoke(msg.ConvertToBaseMessage()));
_listeners.Add(listener);
GetConsumer().Listener += listener;
}
/// <inheritdoc />
public void Unsubscribe()
{
var consumer = GetConsumer();
foreach (var listener in _listeners)
{
consumer.Listener -= listener;
}
_listeners.Clear();
}
/// <inheritdoc />
public async Task<BaseMessage> RequestAndWaitResponse(BaseMessage message)
{
var replyQueueName = _configuration.ReplyQueue;
message.ReplyQueue = replyQueueName;
message.LifeTime = _configuration.Lifetime ?? TimeSpan.FromSeconds(DefaultReplyTimeout);
Send(message);
var destination = Session.GetQueue(_configuration.ReplyQueue);
var consumer = Session.CreateConsumer(
destination,
$"JMSCorrelationID = '{message.CorrelationId}'",
false);
var result = await Task
.Factory
.StartNew(() => consumer.Receive(message.LifeTime))
.ConfigureAwait(false);
if (result == null)
{
throw new TimeoutException($"Request timed out. ReplyQueueName: {replyQueueName}");
}
var resultMessage = result.ConvertToBaseMessage();
LogMessageInternal(resultMessage, false);
return resultMessage;
}
/// <inheritdoc />
public BaseMessage Reply(BaseMessage message)
{
var replyQueueName = _configuration.ReplyQueue;
message.ReplyQueue = replyQueueName;
message.LifeTime = _configuration.Lifetime ?? TimeSpan.FromSeconds(DefaultReplyTimeout);
var destination = Session.GetQueue(_configuration.ReplyQueue);
using var producer = Session.CreateProducer(destination);
LogMessageInternal(message, true);
var requestMessage = message.ConvertToMqMessage(Session);
producer.Send(requestMessage);
message.MessageId = requestMessage.NMSMessageId;
message.CorrelationId = requestMessage.NMSCorrelationID;
return message;
}
/// <inheritdoc />
protected override ISession CreateSession(IConnection connection)
{
return connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
}
}
}