Skip to content

Commit 435b09e

Browse files
committed
去掉订阅消息直接序列化的内容,只采用EventHandlerArgs
RabbitMQ连接采用匿名方法,用来兼容使用服务发现的情况
1 parent efc0b36 commit 435b09e

9 files changed

Lines changed: 87 additions & 61 deletions

File tree

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
[![NuGet](https://img.shields.io/nuget/v/RabbitMQ.EventBus.AspNetCore.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore)
66
[![NuGet](https://img.shields.io/nuget/dt/RabbitMQ.EventBus.AspNetCore.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore)
77
[![GitHub license](https://img.shields.io/github/license/ojdev/RabbitMQ.EventBus.AspNetCore.svg)](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore/blob/master/LICENSE)
8+
[![COUNT](https://img.shields.io/github/languages/count/srburton/RabbitMQ.EventBus.AspNetCore.svg)]()
9+
[![LAST COMMIT](https://img.shields.io/github/last-commit/srburton/RabbitMQ.EventBus.AspNetCore.svg)]()
10+
[![CODE SIZE](https://img.shields.io/github/languages/code-size/srburton/RabbitMQ.EventBus.AspNetCore.svg)]()
11+
[![ISSUES](https://img.shields.io/github/issues-raw/srburton/RabbitMQ.EventBus.AspNetCore.svg)]()
12+
[![ISSUES](https://img.shields.io/github/issues-closed/srburton/RabbitMQ.EventBus.AspNetCore.svg)]()
813
[![FangPlus license](https://img.shields.io/badge/FangPlus-2.0-green.svg)](https://ch.hrb.housecool.com)
914

1015

RabbitMQ.EventBus.AspNetCore.Simple/Controllers/MessageBodyHandle.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ public void Dispose()
2020
Console.WriteLine("释放");
2121
}
2222

23-
public Task Handle(MessageBody1 message, EventHandlerArgs args)
23+
public Task Handle(EventHandlerArgs<MessageBody1> args)
2424
{
2525
Console.WriteLine("==================================================");
2626
Console.WriteLine(id + "=>" + typeof(MessageBody1).Name);
27-
Console.WriteLine(message.Serialize());
27+
Console.WriteLine(args.Event.Body);
2828
Console.WriteLine(args.Original);
2929
Console.WriteLine(args.Redelivered);
3030
Console.WriteLine("==================================================");

RabbitMQ.EventBus.AspNetCore.Simple/Startup.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ public void ConfigureServices(IServiceCollection services)
2323
string assemblyName = typeof(Startup).GetTypeInfo().Assembly.GetName().Name;
2424
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
2525

26-
services.AddRabbitMQEventBus("amqp://guest:guest@192.168.0.252:5672/", eventBusOptionAction: eventBusOption =>
27-
{
28-
eventBusOption.ClientProvidedAssembly(assemblyName);
29-
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
30-
eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
31-
});
26+
services.AddRabbitMQEventBus(() => "amqp://guest:guest@192.168.0.252:5672/", eventBusOptionAction: eventBusOption =>
27+
{
28+
eventBusOption.ClientProvidedAssembly(assemblyName);
29+
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
30+
eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
31+
});
3232
}
3333

3434
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,11 @@ private async Task ProcessEvent(string body, Type eventType, Type eventHandleTyp
231231
{
232232
throw new InvalidOperationException(eventHandleType.Name);
233233
}
234-
object integrationEvent = JsonConvert.DeserializeObject(body, eventType);
235234
Type concreteType = typeof(IEventHandler<>).MakeGenericType(eventType);
236235
await (Task)concreteType.GetMethod(nameof(IEventHandler<IEvent>.Handle)).Invoke(
237236
eventHandler,
238237
new object[] {
239-
integrationEvent, new EventHandlerArgs(body, args.Redelivered, args.Exchange, args.RoutingKey)
238+
Activator.CreateInstance(typeof(EventHandlerArgs<>).MakeGenericType(eventType), new object[] { body, args.Redelivered, args.Exchange, args.RoutingKey })
240239
});
241240
}
242241
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using Newtonsoft.Json;
2+
3+
namespace RabbitMQ.EventBus.AspNetCore.Events
4+
{
5+
/// <summary>
6+
///
7+
/// </summary>
8+
public class EventHandlerArgs<TEvent>
9+
{
10+
/// <summary>
11+
/// 原始消息
12+
/// </summary>
13+
public string Original { get; }
14+
/// <summary>
15+
/// 是否为打回的消息
16+
/// </summary>
17+
public bool Redelivered { get; }
18+
/// <summary>
19+
/// 交换机
20+
/// </summary>
21+
public string Exchange { get; }
22+
/// <summary>
23+
/// 路由key
24+
/// </summary>
25+
public string RoutingKey { get; }
26+
/// <summary>
27+
///
28+
/// </summary>
29+
/// <param name="original"></param>
30+
/// <param name="redelivered"></param>
31+
/// <param name="exchange"></param>
32+
/// <param name="routingKey"></param>
33+
protected EventHandlerArgs(string original, bool redelivered, string exchange, string routingKey)
34+
{
35+
Original = original;
36+
Redelivered = redelivered;
37+
Exchange = exchange;
38+
RoutingKey = routingKey;
39+
}
40+
private TEvent _event;
41+
/// <summary>
42+
/// 序列化后的对象
43+
/// </summary>
44+
public TEvent Event
45+
{
46+
get
47+
{
48+
if (_event == null)
49+
{
50+
_event = JsonConvert.DeserializeObject<TEvent>(Original);
51+
}
52+
return _event;
53+
}
54+
}
55+
}
56+
}

src/RabbitMQ.EventBus.AspNetCore/Events/IEventHandler.cs

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,45 +11,8 @@ public interface IEventHandler<TEvent> where TEvent : IEvent
1111
/// <summary>
1212
///
1313
/// </summary>
14-
/// <param name="message"></param>
1514
/// <param name="args"></param>
1615
/// <returns></returns>
17-
Task Handle(TEvent message, EventHandlerArgs args);
18-
}
19-
/// <summary>
20-
///
21-
/// </summary>
22-
public class EventHandlerArgs
23-
{
24-
/// <summary>
25-
/// 原始消息
26-
/// </summary>
27-
public string Original { get; set; }
28-
/// <summary>
29-
/// 是否为打回的消息
30-
/// </summary>
31-
public bool Redelivered { get; set; }
32-
/// <summary>
33-
///
34-
/// </summary>
35-
public string Exchange { get; set; }
36-
/// <summary>
37-
///
38-
/// </summary>
39-
public string RoutingKey { get; set; }
40-
/// <summary>
41-
///
42-
/// </summary>
43-
/// <param name="original"></param>
44-
/// <param name="redelivered"></param>
45-
/// <param name="exchange"></param>
46-
/// <param name="routingKey"></param>
47-
public EventHandlerArgs(string original, bool redelivered, string exchange, string routingKey)
48-
{
49-
Original = original;
50-
Redelivered = redelivered;
51-
Exchange = exchange;
52-
RoutingKey = routingKey;
53-
}
16+
Task Handle(EventHandlerArgs<TEvent> args);
5417
}
5518
}

src/RabbitMQ.EventBus.AspNetCore/Extensions/ServiceCollectionExtensions.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,18 @@ public static class ServiceCollectionExtensions
2020
/// 添加RabbitMQEventBus
2121
/// </summary>
2222
/// <param name="services"></param>
23-
/// <param name="connectionString"></param>
23+
/// <param name="connectionAction">使用匿名函数取得连接字符串,用来兼容使用Consul获取服务地址的情况</param>
2424
/// <param name="eventBusOptionAction"></param>
2525
/// <returns></returns>
26-
public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection services, string connectionString, Action<RabbitMQEventBusConnectionConfigurationBuild> eventBusOptionAction)
26+
public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection services, Func<string> connectionAction, Action<RabbitMQEventBusConnectionConfigurationBuild> eventBusOptionAction)
2727
{
2828
RabbitMQEventBusConnectionConfiguration configuration = new RabbitMQEventBusConnectionConfiguration();
2929
RabbitMQEventBusConnectionConfigurationBuild configurationBuild = new RabbitMQEventBusConnectionConfigurationBuild(configuration);
3030
eventBusOptionAction?.Invoke(configurationBuild);
3131
services.TryAddSingleton<IRabbitMQPersistentConnection>(options =>
3232
{
3333
ILogger<DefaultRabbitMQPersistentConnection> logger = options.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
34-
IConnectionFactory factory = new ConnectionFactory
35-
{
36-
AutomaticRecoveryEnabled = configuration.AutomaticRecoveryEnabled,
37-
NetworkRecoveryInterval = configuration.NetworkRecoveryInterval,
38-
Uri = new Uri(connectionString),
39-
};
40-
var connection = new DefaultRabbitMQPersistentConnection(configuration, factory, logger);
34+
var connection = new DefaultRabbitMQPersistentConnection(configuration, connectionAction, logger);
4135
connection.TryConnect();
4236
return connection;
4337
});

src/RabbitMQ.EventBus.AspNetCore/Factories/DefaultRabbitMQPersistentConnection.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ internal sealed class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentC
1717
private readonly IConnectionFactory _connectionFactory;
1818
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
1919
private IConnection _connection;
20+
Func<string> _connectionAction;
2021
private bool _disposed;
2122
private readonly object sync_root = new object();
2223

2324
public string Endpoint => _connection?.Endpoint.ToString();
24-
public DefaultRabbitMQPersistentConnection(RabbitMQEventBusConnectionConfiguration configuration, IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger)
25+
public DefaultRabbitMQPersistentConnection(RabbitMQEventBusConnectionConfiguration configuration, Func<string> connectionAction, ILogger<DefaultRabbitMQPersistentConnection> logger)
2526
{
2627
Configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
27-
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
28+
_connectionAction = connectionAction ?? throw new ArgumentNullException(nameof(connectionAction));
29+
_connectionFactory = new ConnectionFactory
30+
{
31+
AutomaticRecoveryEnabled = configuration.AutomaticRecoveryEnabled,
32+
NetworkRecoveryInterval = configuration.NetworkRecoveryInterval
33+
};
2834
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2935
}
3036

@@ -71,6 +77,9 @@ public bool TryConnect()
7177

7278
policy.Execute(() =>
7379
{
80+
string connectionString = _connectionAction.Invoke();
81+
_logger.WriteLog(Configuration.Level, $"[ConnectionString]:\t{connectionString}");
82+
_connectionFactory.Uri = new Uri(connectionString);
7483
_connection = _connectionFactory.CreateConnection(clientProvidedName: Configuration.ClientProvidedName);
7584
});
7685

src/RabbitMQ.EventBus.AspNetCore/Factories/IRabbitMQPersistentConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ public interface IRabbitMQPersistentConnection : IDisposable
1818
/// </summary>
1919
string Endpoint { get; }
2020
/// <summary>
21-
/// 是否打开链接
21+
/// 连接是否打开
2222
/// </summary>
2323
bool IsConnected { get; }
2424
/// <summary>
25-
/// 尝试链接
25+
/// 尝试连接
2626
/// </summary>
2727
/// <returns></returns>
2828
bool TryConnect();

0 commit comments

Comments
 (0)