-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAMQPProcessor.cs
More file actions
66 lines (60 loc) · 2.31 KB
/
AMQPProcessor.cs
File metadata and controls
66 lines (60 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using Azure.Messaging.ServiceBus;
using System.Text.Json;
using TP.AsyncAPI.Logic;
public class AMQPProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
private readonly ServiceBusClient _client;
private readonly IProcessorSimulationLogic _logic;
private readonly ILogger<AMQPProcessor> _logger;
public AMQPProcessor(
ServiceBusProcessor processor,
ServiceBusClient client,
IProcessorSimulationLogic logic,
ILogger<AMQPProcessor> logger
)
{
_processor = processor ??
throw new ArgumentNullException(nameof(processor));
_client = client ??
throw new ArgumentNullException(nameof(client));
_logic = logic ??
throw new ArgumentNullException(nameof(logic));
_logger = logger ??
throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += async r => {
_logger.LogInformation($"Received request: {r.Message.Body}");
try
{
var requestData = JsonSerializer.Deserialize<RequestModel>(r.Message.Body);
var responseData = await ProcessSimulationRequest(requestData);
var responseMessage = new ServiceBusMessage(JsonSerializer.Serialize(responseData))
{
CorrelationId = r.Message.CorrelationId
};
//Forward to another queue for further processing
_ = _client.CreateSender(r.Message.ReplyTo).SendMessageAsync(responseMessage);
}
catch (Exception ex)
{
await r.DeadLetterMessageAsync(
r.Message,
ex.GetType().ToString(),
ex.Message
);
}
};
_processor.ProcessErrorAsync += e => {
_logger.LogError(e.Exception, $"Error processing message: {e.EntityPath}, {e.Identifier}");
return Task.CompletedTask;
};
await _processor.StartProcessingAsync(stoppingToken);
}
private async Task<DataModel> ProcessSimulationRequest(RequestModel request)
{
return await _logic.ExecuteAsync(request);
}
}