Skip to content

Commit 7804745

Browse files
committed
Added Hearthbeat support.
Disabled auto delete on the inteface and worker queues.
1 parent 04e6fa8 commit 7804745

1 file changed

Lines changed: 6 additions & 14 deletions

File tree

RabbitExpress/QueueClient.cs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,12 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs @event
163163
HandleMessage(rpcHandler, @event);
164164
return;
165165
}
166-
167-
Reject(@event.DeliveryTag);
168166
}
169167
catch (Exception e)
170168
{
171169
Console.WriteLine(e);
172170
}
171+
Reject(@event.DeliveryTag);
173172
}
174173

175174
private string GetQueueIdentifier(string ret, string name, string[] args)
@@ -193,7 +192,7 @@ private void RegisterQueues<TInterface>()
193192
};
194193
var queueName = GetQueueIdentifier(h.ret, h.name, h.args);
195194

196-
QueueDeclareOk res = _model.QueueDeclare(queueName, true, false);
195+
QueueDeclareOk res = _model.QueueDeclare(queueName, true, false, false);
197196
IDictionary<string, object> spec = new Dictionary<string, object>
198197
{
199198
{"x-match", "all"},
@@ -210,23 +209,17 @@ private void RegisterQueues<TInterface>()
210209
/// </summary>
211210
/// <param name="connectionString">The connection string.</param>
212211
public QueueClient(Uri connectionString)
213-
: this(new ConnectionFactory() { Uri = connectionString, DispatchConsumersAsync = true })
212+
: this(new ConnectionFactory() { Uri = connectionString, DispatchConsumersAsync = true, RequestedHeartbeat = 10 })
214213
{
215-
var factory = new ConnectionFactory()
216-
{
217-
Uri = connectionString,
218-
DispatchConsumersAsync = true
219-
};
220214
}
221215

222216
/// <summary>
223217
/// Initializes a new instance of the <see cref="QueueClient{TSerializer}"/> class.
224218
/// </summary>
225219
/// <param name="config">The connection setting.</param>
226220
public QueueClient(IOptions<QueueConfig> config)
227-
: this(config.Value.ConnectionString)
221+
: this(config.Value)
228222
{
229-
_model.BasicQos(0, config.Value.PrefetchSize, false);
230223
}
231224

232225
/// <summary>
@@ -290,7 +283,7 @@ public void RegisterWorker<TQueues, TMessage>(TQueues queue, Func<QueuedMessage<
290283
}
291284
}))
292285
{
293-
_model.QueueDeclare(queueName, true, false);
286+
_model.QueueDeclare(queueName, true, false, false);
294287
IDictionary<string, object> spec = new Dictionary<string, object>
295288
{
296289
{ "x-match", "all" },
@@ -377,7 +370,7 @@ public void RpcServer<TInterface>(Expression<Action<TInterface>> method, Delegat
377370
RegisterQueues<TInterface>();
378371
if (method.Body is MethodCallExpression callExpression)
379372
{
380-
System.Reflection.MethodInfo info = callExpression.Method;
373+
MethodInfo info = callExpression.Method;
381374
var h = new
382375
{
383376
name = $"{info.DeclaringType?.FullName}.{info.Name}",
@@ -406,7 +399,6 @@ public void RpcServer<TInterface>(Expression<Action<TInterface>> method, Delegat
406399
}))
407400
{
408401
_model.BasicConsume(queueName, false, _consumer);
409-
410402
return;
411403
}
412404
}

0 commit comments

Comments
 (0)