-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathActiveMqQueueMessageConverter.cs
More file actions
120 lines (107 loc) · 4.2 KB
/
ActiveMqQueueMessageConverter.cs
File metadata and controls
120 lines (107 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
using System;
using System.Configuration;
using Apache.NMS;
using Apache.NMS.Util;
using ViennaNET.Messaging.Messages;
namespace ViennaNET.Messaging.ActiveMQQueue
{
internal static class ActiveMqQueueMessageConverter
{
public static IMessage ConvertToMqMessage(this BaseMessage message, ISession session)
{
if (string.IsNullOrWhiteSpace(message.MessageId))
{
message.MessageId = Guid.NewGuid().ToString().ToUpper();
}
if (string.IsNullOrWhiteSpace(message.CorrelationId))
{
message.CorrelationId = message.MessageId;
}
var mes = ConvertToInternalMessage(message, session);
mes.NMSMessageId = message.MessageId;
mes.NMSCorrelationID = message.CorrelationId;
mes.NMSTimeToLive = message.LifeTime;
if (!string.IsNullOrWhiteSpace(message.ReplyQueue))
{
var replyQueue = SessionUtil.GetDestination(session, message.ReplyQueue.Trim());
mes.NMSReplyTo = replyQueue;
}
foreach (var property in message.Properties)
{
if (string.IsNullOrEmpty(property.Key))
{
throw new ArgumentException("Property name must be set");
}
if (property.Value != null)
{
mes.Properties.SetString(property.Key, property.Value.ToString());
}
}
return mes;
}
public static BaseMessage ConvertToBaseMessage(this IMessage receivedMessage)
{
var message = CreateBaseMessage(receivedMessage);
message.MessageId = receivedMessage.NMSMessageId;
message.CorrelationId = receivedMessage.NMSCorrelationID;
message.SendDateTime = receivedMessage.NMSTimestamp;
message.ReceiveDate = DateTime.Now;
message.LifeTime = receivedMessage.NMSTimeToLive;
switch (receivedMessage.NMSReplyTo)
{
case null:
message.ReplyQueue = null;
break;
case IQueue queue:
message.ReplyQueue = queue.QueueName;
break;
case ITopic topic:
message.ReplyQueue = topic.TopicName;
break;
}
if (receivedMessage.Properties != null
&& receivedMessage.Properties.Count > 0)
{
foreach (var property in receivedMessage.Properties.Keys)
{
var name = property as string;
if (!string.IsNullOrEmpty(name))
{
var prop = receivedMessage.Properties[name];
message.Properties.Add(name, prop);
}
}
}
return message;
}
private static IMessage ConvertToInternalMessage(BaseMessage message, ISession session)
{
switch (message)
{
case TextMessage textMessage:
return session.CreateTextMessage(textMessage.Body);
case BytesMessage bytesMessage:
var result = session.CreateBytesMessage();
result.WriteBytes(bytesMessage.Body);
return result;
default:
throw new ArgumentException(
$"Unknown inherited type of BaseMessage ({message.GetType()}) while converting to IMessage");
}
}
private static BaseMessage CreateBaseMessage(IMessage message)
{
switch (message)
{
case ITextMessage textMessage:
return new TextMessage { Body = textMessage.Text };
case IBytesMessage bytesMessage:
var buffer = new byte[bytesMessage.BodyLength];
bytesMessage.ReadBytes(buffer);
return new BytesMessage { Body = buffer };
default:
return new TextMessage { Body = string.Empty };
}
}
}
}