Skip to content

Commit a8701d3

Browse files
author
欧俊
committed
目前经过简单测试,基本时可用的
1 parent 0fa1ef5 commit a8701d3

5 files changed

Lines changed: 50 additions & 98 deletions

File tree

RabbitMQ.EventBus.AspNetCore.Sample/Controllers/MessageBodyHandle.cs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,27 @@
55

66
namespace RabbitMQ.EventBus.AspNetCore.Simple.Controllers
77
{
8-
public class MessageBodyHandle : IEventResponseHandler<MessageBody, string>, IDisposable
9-
{
10-
private Guid id;
11-
private readonly ILogger<MessageBodyHandle> _logger;
8+
public class MessageBodyHandle : IEventResponseHandler<MessageBody, string>, IDisposable
9+
{
10+
private Guid id;
11+
private readonly ILogger<MessageBodyHandle> _logger;
1212

13-
public MessageBodyHandle(ILogger<MessageBodyHandle> logger)
14-
{
15-
id = Guid.NewGuid();
16-
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
17-
}
18-
public void Dispose()
19-
{
20-
_logger.LogInformation("MessageBodyHandle Disposable.");
21-
}
13+
public MessageBodyHandle(ILogger<MessageBodyHandle> logger)
14+
{
15+
id = Guid.NewGuid();
16+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
17+
}
18+
public void Dispose()
19+
{
20+
_logger.LogInformation("MessageBodyHandle Disposable.");
21+
}
2222

2323

24-
public Task<string> HandleAsync(HandlerEventArgs<MessageBody> args)
25-
{
26-
return Task.FromResult("收到消息,已确认");
27-
}
24+
public Task<string> HandleAsync(HandlerEventArgs<MessageBody> args)
25+
{
26+
return Task.FromResult("收到消息,已确认" + DateTimeOffset.Now);
2827
}
2928
}
29+
30+
31+
}

RabbitMQ.EventBus.AspNetCore.Sample/Controllers/ValuesController.cs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,17 @@ public ValuesController(IRabbitMQEventBus eventBus)
2020
[HttpGet]
2121
public async Task<ActionResult<string>> Get()
2222
{
23-
2423
Console.WriteLine($"发送消息{1}");
2524
var body = new
2625
{
2726
requestId = Guid.NewGuid(),
28-
Body = $"rabbitmq.eventbus.test=>发送消息/t{1}",
27+
Body = $"rabbitmq.eventbus.test=>发送消息\t{1}",
2928
Time = DateTimeOffset.Now,
3029
};
3130
var r = await _eventBus.PublishAsync<string>(body, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test");
3231
Console.WriteLine($"返回了{r}");
3332
await Task.Delay(500);
3433
return r;
3534
}
36-
37-
// GET api/values/5
38-
[HttpGet("{id}")]
39-
public ActionResult<string> Get(int id)
40-
{
41-
return "value";
42-
}
43-
44-
// POST api/values
45-
[HttpPost]
46-
public void Post([FromBody] string value)
47-
{
48-
}
49-
50-
// PUT api/values/5
51-
[HttpPut("{id}")]
52-
public void Put(int id, [FromBody] string value)
53-
{
54-
}
55-
56-
// DELETE api/values/5
57-
[HttpDelete("{id}")]
58-
public void Delete(int id)
59-
{
60-
}
6135
}
6236
}

RabbitMQ.EventBus.AspNetCore.Sample/Startup.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public void ConfigureServices(IServiceCollection services)
2727
services.AddControllers();
2828
services.AddHealthChecks();
2929

30-
services.AddRabbitMQEventBus(() => "amqp://guest:guest@172.30.239.244:5672/", eventBusOptionAction: eventBusOption =>
30+
31+
services.AddRabbitMQEventBus("localhost", 5672, "guest", "guest", "", eventBusOptionAction: eventBusOption =>
3132
{
3233
eventBusOption.ClientProvidedAssembly(assemblyName);
3334
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
@@ -40,6 +41,21 @@ public void ConfigureServices(IServiceCollection services)
4041
config.ExchangeNameSuffix = "-test";
4142
});
4243
});
44+
//or
45+
//
46+
//services.AddRabbitMQEventBus(() => "amqp://guest:guest@localhost:5672/", eventBusOptionAction: eventBusOption =>
47+
//{
48+
// eventBusOption.ClientProvidedAssembly(assemblyName);
49+
// eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
50+
// eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
51+
// eventBusOption.MessageTTL(2000);
52+
// eventBusOption.SetBasicQos(10);
53+
// eventBusOption.DeadLetterExchangeConfig(config =>
54+
// {
55+
// config.Enabled = false;
56+
// config.ExchangeNameSuffix = "-test";
57+
// });
58+
//});
4359
}
4460

4561
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

src/RabbitMQ.EventBus.AspNetCore/Extensions/ServiceCollectionExtensions.cs

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,57 +7,17 @@ public static class ServiceCollectionExtensions
77
/// 添加RabbitMQEventBus
88
/// </summary>
99
/// <param name="services"></param>
10-
/// <param name="connectionAction">使用匿名函数取得连接字符串,用来兼容使用Consul获取服务地址的情况</param>
10+
/// <param name="endpoint"></param>
11+
/// <param name="port"></param>
12+
/// <param name="username"></param>
13+
/// <param name="password"></param>
14+
/// <param name="visualHost"></param>
1115
/// <param name="eventBusOptionAction"></param>
16+
/// <param name="moduleOptions"></param>
1217
/// <returns></returns>
13-
//public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection services, Func<string> connectionAction, Action<RabbitMQEventBusConnectionConfigurationBuild> eventBusOptionAction)
14-
//{
15-
// RabbitMQEventBusConnectionConfiguration configuration = new();
16-
// RabbitMQEventBusConnectionConfigurationBuild configurationBuild = new(configuration);
17-
// eventBusOptionAction?.Invoke(configurationBuild);
18-
// services.TryAddSingleton<IRabbitMQPersistentConnection>(options =>
19-
// {
20-
// ILogger<DefaultRabbitMQPersistentConnection> logger = options.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
21-
// var connection = new DefaultRabbitMQPersistentConnection(configuration, connectionAction, logger);
22-
// connection.TryConnect();
23-
// Console.WriteLine("创建一次连接");
24-
// return connection;
25-
// });
26-
// services.TryAddSingleton<IEventHandlerModuleFactory, EventHandlerModuleFactory>();
27-
// services.TryAddSingleton<IRabbitMQEventBus, DefaultRabbitMQEventBus>();
28-
// foreach (Type mType in typeof(IEvent).GetAssemblies())
29-
// {
30-
// services.TryAddTransient(mType);
31-
// foreach (Type hType in typeof(IEventHandler<>).GetMakeGenericType(mType))
32-
// {
33-
// services.TryAddTransient(hType);
34-
// }
35-
// }
36-
// return services;
37-
//}
38-
/// <summary>
39-
/// 自动订阅
40-
/// </summary>
41-
/// <param name="app"></param>
42-
//public static void RabbitMQEventBusAutoSubscribe(this IApplicationBuilder app)
43-
//{
44-
// IRabbitMQEventBus eventBus = app.ApplicationServices.GetRequiredService<IRabbitMQEventBus>();
45-
// ILogger<IRabbitMQEventBus> logger = app.ApplicationServices.GetRequiredService<ILogger<IRabbitMQEventBus>>();
46-
// using (logger.BeginScope("EventBus Subscribe"))
47-
// {
48-
// logger.LogInformation($"=======================================================================");
49-
// foreach (Type mType in typeof(IEvent).GetAssemblies())
50-
// {
51-
// var handlesAny = typeof(IEventHandler<>).GetMakeGenericType(mType);
52-
// if (handlesAny.Any())
53-
// {
54-
// logger.LogInformation($"{mType.Name}\t=>\t{string.Join("、", handlesAny)}");
55-
// eventBus.Subscribe(mType);
56-
// }
57-
// }
58-
// logger.LogInformation($"=======================================================================");
59-
// }
60-
//}
18+
public static IServiceProvider AddRabbitMQEventBus(this IServiceCollection services, string endpoint, int port, string username, string password, string visualHost, Action<RabbitMQEventBusConnectionConfigurationBuild> eventBusOptionAction, Action<RabbitMQEventBusModuleOption> moduleOptions = null)
19+
=> AddRabbitMQEventBus(services, () => $"amqp://{username}:{password}@{endpoint}:{port}/{visualHost}", eventBusOptionAction, moduleOptions);
20+
6121
/// <summary>
6222
/// 添加RabbitMQEventBus
6323
/// </summary>
@@ -96,10 +56,10 @@ public static IServiceProvider AddRabbitMQEventBus(this IServiceCollection servi
9656
var serviceProvider = services.BuildServiceProvider();
9757
var _logger = serviceProvider.GetRequiredService<ILogger<DefaultRabbitMQEventBusV2>>();
9858
var rmqeV2 = serviceProvider.GetService<IRabbitMQEventBus>();
99-
foreach (var handler in responseHandlers)
59+
foreach (var (registerType, handlerType, eventType, responseType) in responseHandlers)
10060
{
101-
rmqeV2.Subscribe(handler.eventType, handler.responseType);
102-
_logger.LogInformation($"subscribe:\t{handler.eventType}\t=>\t{handler.handlerType}<{handler.eventType.Name},{handler.responseType.Name}>\t return Type : \t{handler.responseType}");
61+
rmqeV2.Subscribe(eventType, responseType);
62+
_logger.LogInformation($"subscribe:\t{eventType}\t=>\t{handlerType}<{eventType.Name},{responseType.Name}>\t return Type : \t{responseType}");
10363
}
10464
foreach (Type mType in typeof(IEvent).GetAssemblies())
10565
{

src/RabbitMQ.EventBus.AspNetCore/RabbitMQ.EventBus.AspNetCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
2525
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
2626
<PackageReference Include="Polly" Version="7.2.3" />
27-
<PackageReference Include="RabbitMQ.Client" Version="6.2.4" />
27+
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
2828
</ItemGroup>
2929

3030
<ItemGroup>

0 commit comments

Comments
 (0)