-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSendTimeout.cs
More file actions
78 lines (67 loc) · 3.36 KB
/
SendTimeout.cs
File metadata and controls
78 lines (67 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
using System.Text;
using BuslyCLI.Config;
using BuslyCLI.Config.Transports;
using BuslyCLI.Infrastructure.Factories;
using NServiceBus.DelayedDelivery;
using NServiceBus.Routing;
using NServiceBus.Transport;
using Spectre.Console;
using Spectre.Console.Cli;
namespace BuslyCLI.Commands.NsbTimeout;
public class SendTimeout(IAnsiConsole console, IRawEndpointFactory rawEndpointFactory, INServiceBusConfiguration nServiceBusConfiguration) : AsyncCommand<SendTimeoutCommandSettings>
{
private static readonly HashSet<Type> UnsupportedTransportTypes =
[
typeof(SqlServerTransportConfig),
typeof(PostgreSqlTransportConfig),
typeof(AzureStorageQueuesTransportConfig)
];
protected override async Task<int> ExecuteAsync(CommandContext context, SendTimeoutCommandSettings settings, CancellationToken cancellationToken)
{
var config = await nServiceBusConfiguration.GetValidatedConfigurationAsync(settings.Config.Path);
if (UnsupportedTransportTypes.Contains(config.CurrentTransportConfig.Config.GetType()))
{
console.MarkupLine($"[red]Error:[/] The [bold]{config.CurrentTransportConfig.Config.GetType().Name.Replace("Config", "")}[/] transport does not support sending timeouts.");
console.MarkupLine("This transport relies on an in-process poller to forward deferred messages, which is incompatible with the CLI's fire-and-forget execution model.");
console.MarkupLine("For details see: [link]https://tragiccode.com/busly-cli/docs/cli-reference/timeout/send[/]");
return 1;
}
var rawEndpoint = await rawEndpointFactory.CreateRawSendOnlyEndpoint(Constants.DefaultOriginatingEndpoint, config.CurrentTransportConfig);
// TODO: Validate body is valid json/xml
var headers = new Dictionary<string, string>
{
["NServiceBus.OriginatingEndpoint"] = Constants.DefaultOriginatingEndpoint,
["NServiceBus.OriginatingMachine"] = Environment.MachineName,
["NServiceBus.ConversationId"] = Guid.NewGuid().ToString(),
["NServiceBus.CorrelationId"] = Guid.NewGuid().ToString(),
["NServiceBus.MessageIntent"] = Constants.NServiceBus.CommandMessageIntent,
["NServiceBus.ContentType"] = settings.ContentType,
["NServiceBus.EnclosedMessageTypes"] = settings.EnclosedMessageType
};
var message = new OutgoingMessage(
Guid.NewGuid().ToString(),
headers,
Encoding.ASCII.GetBytes(settings.MessageBody)
);
var dispatchProperties = new DispatchProperties();
if (settings.DoNotDeliverBefore is not null)
{
dispatchProperties.DoNotDeliverBefore = new DoNotDeliverBefore(settings.DoNotDeliverBefore.Value);
}
else if (settings.DelayDeliveryWith is not null)
{
dispatchProperties.DelayDeliveryWith = new DelayDeliveryWith(settings.DelayDeliveryWith.Value);
}
var transportOperation = new TransportOperation(
message,
new UnicastAddressTag(settings.DestinationEndpoint),
dispatchProperties
);
await rawEndpoint.Dispatch(
new TransportOperations(transportOperation),
new TransportTransaction(),
cancellationToken);
await rawEndpoint.ShutDownAndCleanUp();
return 0;
}
}