Skip to content

Commit 87604ec

Browse files
authored
Merge pull request #49 from ojdev/dev
只保留一个Channel
2 parents d0be90e + 91d13c5 commit 87604ec

5 files changed

Lines changed: 21 additions & 8 deletions

File tree

RabbitMQ.EventBus.AspNetCore.Sample/Controllers/ValuesController.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.AspNetCore.Mvc;
22
using System;
3+
using System.Threading.Tasks;
34

45
namespace RabbitMQ.EventBus.AspNetCore.Simple.Controllers
56
{
@@ -16,18 +17,23 @@ public ValuesController(IRabbitMQEventBus eventBus)
1617

1718
// GET api/values
1819
[HttpGet]
19-
public ActionResult<string> Get()
20+
public async Task<ActionResult<string>> Get()
2021
{
2122
//_eventBus.Publish(new
2223
//{
2324
// Body = "rabbitmq.eventbus.test=>发送消息",
2425
// Time = DateTimeOffset.Now
2526
//}, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test");
26-
_eventBus.Publish(new
27+
for (int i = 0; i < 1000; i++)
2728
{
28-
Body = "rabbitmq.eventbus.test1=>发送消息",
29-
Time = DateTimeOffset.Now,
30-
}, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test1");
29+
_eventBus.Publish(new
30+
{
31+
Body = $"rabbitmq.eventbus.test1=>发送消息/t{i}",
32+
Time = DateTimeOffset.Now,
33+
}, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test1");
34+
await Task.Yield();
35+
await Task.Delay(500);
36+
}
3137
return "Ok";
3238
}
3339

src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void Publish<TMessage>(TMessage message, string exchange, string routingK
5959
_publishChannel = _persistentConnection.ExchangeDeclare(exchange, type: type);
6060
_publishChannel.BasicReturn += async (se, ex) => await Task.Delay((int)_persistentConnection.Configuration.ConsumerFailRetryInterval.TotalMilliseconds).ContinueWith(t => Publish(body, ex.Exchange, ex.RoutingKey));
6161
}
62+
6263
IBasicProperties properties = _publishChannel.CreateBasicProperties();
6364
properties.DeliveryMode = 2; // persistent
6465
_publishChannel.BasicPublish(exchange: exchange,

src/RabbitMQ.EventBus.AspNetCore/Extensions/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection ser
3333
ILogger<DefaultRabbitMQPersistentConnection> logger = options.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
3434
var connection = new DefaultRabbitMQPersistentConnection(configuration, connectionAction, logger);
3535
connection.TryConnect();
36+
Console.WriteLine("创建一次连接");
3637
return connection;
3738
});
3839
services.TryAddSingleton<IEventHandlerModuleFactory, EventHandlerModuleFactory>();

src/RabbitMQ.EventBus.AspNetCore/Factories/DefaultRabbitMQPersistentConnection.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal sealed class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentC
2020
Func<string> _connectionAction;
2121
private bool _disposed;
2222
private readonly object sync_root = new object();
23-
23+
private IModel _model;
2424
public string Endpoint => _connection?.Endpoint.ToString();
2525
public DefaultRabbitMQPersistentConnection(RabbitMQEventBusConnectionConfiguration configuration, Func<string> connectionAction, ILogger<DefaultRabbitMQPersistentConnection> logger)
2626
{
@@ -42,7 +42,12 @@ public IModel CreateModel()
4242
{
4343
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
4444
}
45-
return _connection.CreateModel();
45+
if ((_model?.IsOpen ?? false) == false)
46+
{
47+
_model = _connection.CreateModel();
48+
Console.WriteLine("创建一个Channel");
49+
}
50+
return _model;
4651
}
4752

4853
public void Dispose()

src/RabbitMQ.EventBus.AspNetCore/RabbitMQ.EventBus.AspNetCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
2727
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
2828
<PackageReference Include="Polly" Version="7.2.2" />
29-
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
29+
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
3030
</ItemGroup>
3131

3232
<ItemGroup>

0 commit comments

Comments
 (0)