Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions src/DotNetCore.CAP.NATS/ITransport.NATS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,47 @@ public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPo

public async Task<OperateResult> SendAsync(TransportMessage message)
{
var connection = _connectionPool.RentConnection();

try
{
var msg = new Msg(message.GetName(), message.Body.ToArray());
foreach (var header in message.Headers)
var connection = _connectionPool.RentConnection();
try
{
msg.Header[header.Key] = header.Value;
}
var msg = new Msg(message.GetName(), message.Body.ToArray());
foreach (var header in message.Headers)
{
msg.Header[header.Key] = header.Value;
}

var js = connection.CreateJetStreamContext(_jetStreamOptions);
var js = connection.CreateJetStreamContext(_jetStreamOptions);

var builder = PublishOptions.Builder().WithMessageId(message.GetId());
var builder = PublishOptions.Builder().WithMessageId(message.GetId());

var resp = await js.PublishAsync(msg, builder.Build());
var resp = await js.PublishAsync(msg, builder.Build());

if (resp.Seq > 0)
{
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");
if (resp.Seq > 0)
{
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");

return OperateResult.Success;
return OperateResult.Success;
}

throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
}
catch (Exception ex)
{
var warpEx = new PublisherSentFailedException(ex.Message, ex);

throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
return OperateResult.Failed(warpEx);
}
finally
{
_connectionPool.Return(connection);
}
}
catch (Exception ex)
catch (Exception e)
{
var warpEx = new PublisherSentFailedException(ex.Message, ex);

var warpEx = new PublisherSentFailedException(e.Message, e);
return OperateResult.Failed(warpEx);
}
finally
{
_connectionPool.Return(connection);
}
}
}
}
Loading