Skip to content

Commit 2af7c19

Browse files
committed
增加可设置的预取消息数
1 parent 21ea715 commit 2af7c19

5 files changed

Lines changed: 26 additions & 7 deletions

File tree

RabbitMQ.EventBus.AspNetCore.Sample/Startup.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public void ConfigureServices(IServiceCollection services)
3333
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
3434
eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
3535
eventBusOption.MessageTTL(2000);
36+
eventBusOption.SetBasicQos(10);
3637
eventBusOption.DeadLetterExchangeConfig(config =>
3738
{
3839
config.Enabled = true;

src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfiguration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public sealed class RabbitMQEventBusConnectionConfiguration
3535
/// </summary>
3636
public LogLevel Level { get; set; }
3737
/// <summary>
38+
/// 每次预取的消息条数(默认:1)
39+
/// </summary>
40+
public ushort PrefetchCount { get; set; }
41+
/// <summary>
3842
/// 队列名前缀(默认ClientProvidedName)
3943
/// </summary>
4044
public QueuePrefixType Prefix { get; set; }
@@ -68,6 +72,7 @@ public RabbitMQEventBusConnectionConfiguration()
6872
AutomaticRecoveryEnabled = true;
6973
ConsumerFailRetryInterval = TimeSpan.FromSeconds(1);
7074
DeadLetterExchange = new DeadLetterExchangeConfig();
75+
PrefetchCount = 1;
7176
}
7277
}
7378
}

src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfigurationBuild.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,17 @@ public void DeadLetterExchangeConfig(Action<DeadLetterExchangeConfig> config)
8383
Configuration.MessageTTL = 60000;
8484
}
8585
}
86+
/// <summary>
87+
/// 设置预取条数
88+
/// </summary>
89+
/// <param name="prefetchCount"></param>
90+
public void SetBasicQos(ushort prefetchCount)
91+
{
92+
if (prefetchCount < 1)
93+
{
94+
throw new ArgumentOutOfRangeException($"{nameof(prefetchCount)}必须大于0");
95+
}
96+
Configuration.PrefetchCount = prefetchCount;
97+
}
8698
}
8799
}

src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,16 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
133133
}
134134
#endregion
135135
channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null);
136-
channel.BasicQos(0, 1, false);
136+
channel.BasicQos(0, _persistentConnection.Configuration.PrefetchCount, false);
137137
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
138138
consumer.Received += async (model, ea) =>
139139
{
140-
string body = Encoding.UTF8.GetString(ea.Body);
140+
string body = Encoding.UTF8.GetString(ea.Body.ToArray());
141141
bool isAck = false;
142142
try
143143
{
144144
await ProcessEvent(body, eventType, ea);
145+
//不确定是否需要改变Multiple是否需要改为true
145146
channel.BasicAck(ea.DeliveryTag, multiple: false);
146147
isAck = true;
147148
}

src/RabbitMQ.EventBus.AspNetCore/RabbitMQ.EventBus.AspNetCore.csproj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<Description>asp.net core 下使用的RabbitMQ</Description>
1313
<PackageLicenseUrl></PackageLicenseUrl>
1414
<PackageProjectUrl>https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore</PackageProjectUrl>
15-
<Version>2.1.3</Version>
15+
<Version>2.1.4</Version>
1616
<PackageLicenseFile>LICENSE</PackageLicenseFile>
1717
</PropertyGroup>
1818

@@ -22,11 +22,11 @@
2222

2323
<ItemGroup>
2424
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
25-
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
26-
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
25+
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.3" />
26+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.3" />
2727
<PackageReference Include="Polly" Version="7.2.0" />
28-
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
29-
<PackageReference Include="System.Text.Json" Version="4.7.0" />
28+
<PackageReference Include="RabbitMQ.Client" Version="6.0.0" />
29+
<PackageReference Include="System.Text.Json" Version="4.7.1" />
3030
</ItemGroup>
3131

3232
<ItemGroup>

0 commit comments

Comments
 (0)