-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathRawEndpoint.cs
More file actions
84 lines (71 loc) · 2.92 KB
/
RawEndpoint.cs
File metadata and controls
84 lines (71 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
using System.Collections.Concurrent;
using System.Reflection;
using System.Reflection.Emit;
using NServiceBus.Extensibility;
using NServiceBus.Transport;
using NServiceBus.Unicast.Messages;
namespace BuslyCLI.Infrastructure.Endpoints;
public class RawEndpoint(TransportInfrastructure infrastructure, string endpointName) : RawSendOnlyEndpoint(infrastructure, endpointName)
{
private static readonly TimeSpan IncomingMessageTimeout = TimeSpan.FromSeconds(5);
private readonly BlockingCollection<IncomingMessage> _receivedMessages = new();
private IMessageReceiver _messageReceiver;
private ISubscriptionManager _subscriptionManager;
public async Task StartEndpoint()
{
_messageReceiver = _infrastructure.Receivers["Primary"];
_subscriptionManager = _messageReceiver.Subscriptions;
await _messageReceiver.Initialize(new PushRuntimeSettings(1),
OnMessage,
OnError);
await _messageReceiver.StartReceive();
}
// private async Task StopReceive()
// {
// await _messageReceiver.StopReceive();
// }
public override async Task ShutDownAndCleanUp()
{
await _messageReceiver.StopReceive();
await base.ShutDownAndCleanUp();
}
private static Type CreateTypeFromString(string typeAsString)
{
var typeSignature = typeAsString;
var an = new AssemblyName(typeSignature);
var assemblyBuilder =
AssemblyBuilder.DefineDynamicAssembly(new AssemblyName(Guid.NewGuid().ToString()),
AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("MainModule");
var type = moduleBuilder.DefineType(typeSignature,
TypeAttributes.Public |
TypeAttributes.Class |
TypeAttributes.AutoClass |
TypeAttributes.AnsiClass |
TypeAttributes.BeforeFieldInit |
TypeAttributes.AutoLayout,
null).GetTypeInfo().AsType();
return type;
}
public async Task Subscribe(string eventType, CancellationToken cancellationToken = default)
{
await _subscriptionManager.SubscribeAll([new MessageMetadata(CreateTypeFromString(eventType))],
new ContextBag(), cancellationToken);
}
public Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
_receivedMessages.Add(
new IncomingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body.ToArray()),
cancellationToken);
return Task.CompletedTask;
}
public Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationToken cancellationToken)
{
return Task.FromResult(ErrorHandleResult.Handled);
}
public IncomingMessage TryReceiveMessage()
{
if (_receivedMessages.TryTake(out var incomingMessage, IncomingMessageTimeout)) return incomingMessage;
return null;
}
}