@@ -23,6 +23,7 @@ internal class DefaultRabbitMQEventBus : IRabbitMQEventBus
2323 private readonly ILogger < DefaultRabbitMQEventBus > _logger ;
2424 private readonly IServiceProvider _serviceProvider ;
2525 private readonly IEventHandlerModuleFactory _eventHandlerFactory ;
26+ private readonly Dictionary < string , IModel > subscribes ;
2627 /// <summary>
2728 ///
2829 /// </summary>
@@ -36,9 +37,10 @@ public DefaultRabbitMQEventBus(IRabbitMQPersistentConnection persistentConnectio
3637 _serviceProvider = serviceProvider ?? throw new ArgumentNullException ( nameof ( serviceProvider ) ) ;
3738 _eventHandlerFactory = eventHandlerFactory ?? throw new ArgumentNullException ( nameof ( eventHandlerFactory ) ) ;
3839 _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
40+ subscribes = new Dictionary < string , IModel > ( ) ;
41+ _logger . LogInformation ( "消息队列准备就绪" ) ;
3942 }
4043
41- private IModel _publishChannel ;
4244 /// <summary>
4345 ///
4446 /// </summary>
@@ -50,16 +52,9 @@ public DefaultRabbitMQEventBus(IRabbitMQPersistentConnection persistentConnectio
5052 public void Publish < TMessage > ( TMessage message , string exchange , string routingKey , string type = ExchangeType . Topic )
5153 {
5254 string body = message . Serialize ( ) ;
53- if ( _publishChannel ? . IsOpen != true )
54- {
55- if ( _persistentConnection . IsConnected )
56- {
57- _persistentConnection . TryConnect ( ) ;
58- }
59- _publishChannel = _persistentConnection . ExchangeDeclare ( exchange , type : type ) ;
60- _publishChannel . BasicReturn += async ( se , ex ) => await Task . Delay ( ( int ) _persistentConnection . Configuration . ConsumerFailRetryInterval . TotalMilliseconds ) . ContinueWith ( t => Publish ( body , ex . Exchange , ex . RoutingKey ) ) ;
61- }
62-
55+ using var _publishChannel = _persistentConnection . ExchangeDeclare ( exchange , type : type ) ;
56+ _publishChannel . BasicReturn += async ( se , ex ) => await Task . Delay ( ( int ) _persistentConnection . Configuration . ConsumerFailRetryInterval . TotalMilliseconds ) . ContinueWith ( t => Publish ( body , ex . Exchange , ex . RoutingKey ) ) ;
57+
6358 IBasicProperties properties = _publishChannel . CreateBasicProperties ( ) ;
6459 properties . DeliveryMode = 2 ; // persistent
6560 _publishChannel . BasicPublish ( exchange : exchange ,
@@ -81,11 +76,9 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
8176 string queue = attr . Queue ?? ( _persistentConnection . Configuration . Prefix == QueuePrefixType . ExchangeName
8277 ? $ "{ attr . Exchange } .{ eventType . Name } "
8378 : $ "{ _persistentConnection . Configuration . ClientProvidedName } .{ eventType . Name } ") ;
84- if ( ! _persistentConnection . IsConnected )
85- {
86- _persistentConnection . TryConnect ( ) ;
87- }
88- IModel channel ;
79+
80+ var onlyKey = $ "{ attr . Exchange } _{ queue } _{ attr . RoutingKey } ";
81+ subscribes . TryGetValue ( onlyKey , out IModel channel ) ;
8982 #region snippet
9083 var arguments = new Dictionary < string , object > ( ) ;
9184
@@ -135,6 +128,7 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
135128 #endregion
136129 channel . QueueBind ( queue , attr . Exchange , attr . RoutingKey , null ) ;
137130 channel . BasicQos ( 0 , _persistentConnection . Configuration . PrefetchCount , false ) ;
131+ subscribes [ onlyKey ] = channel ;
138132 EventingBasicConsumer consumer = new ( channel ) ;
139133 consumer . Received += async ( model , ea ) =>
140134 {
0 commit comments