Skip to content

Commit 4460408

Browse files
authored
Made sure to create all queues for Interfaces that are used. (#5)
1 parent abf1752 commit 4460408

1 file changed

Lines changed: 29 additions & 10 deletions

File tree

RabbitExpress/QueueClient.cs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace RabbitExpress
3939
using System.Collections.Generic;
4040
using System.Linq;
4141
using System.Linq.Expressions;
42+
using System.Reflection;
4243
using System.Security.Cryptography;
4344
using System.Text;
4445
using System.Threading;
@@ -163,6 +164,32 @@ private string GetQueueIdentifier(string ret, string name, string[] args)
163164
return Convert.ToBase64String(buffer);
164165
}
165166

167+
private void RegisterQueues<TInterface>()
168+
{
169+
var iType = typeof(TInterface);
170+
var methods = iType.GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.InvokeMethod);
171+
foreach (var info in methods)
172+
{
173+
var h = new
174+
{
175+
name = $"{info.DeclaringType?.FullName}.{info.Name}",
176+
args = info.GetParameters().Select(a => a.ParameterType.FullName).ToArray(),
177+
ret = info.ReturnType.FullName
178+
};
179+
var queueName = GetQueueIdentifier(h.ret, h.name, h.args);
180+
181+
var res = _model.QueueDeclare(queueName, true, false);
182+
IDictionary<string, object> spec = new Dictionary<string, object>
183+
{
184+
{"x-match", "all"},
185+
{"returnType", h.ret},
186+
{"signature", h.name},
187+
{"args", h.args}
188+
};
189+
_model.QueueBind(queueName, Exchange, string.Empty, spec);
190+
}
191+
}
192+
166193
/// <summary>
167194
/// Initializes a new instance of the <see cref="QueueClient{TSerializer}"/> class.
168195
/// </summary>
@@ -290,6 +317,7 @@ public void Dispose()
290317
public TInterface RpcClient<TInterface>()
291318
where TInterface : class
292319
{
320+
RegisterQueues<TInterface>();
293321
return Proxy.CreateProxy<TInterface>(Intercept, asyncMode: AsyncInvocationMode.Wait);
294322
}
295323

@@ -303,6 +331,7 @@ public TInterface RpcClient<TInterface>()
303331
public void RpcServer<TInterface>(Expression<Action<TInterface>> method, Delegate implementation)
304332
where TInterface : class
305333
{
334+
RegisterQueues<TInterface>();
306335
if (method.Body is MethodCallExpression callExpression)
307336
{
308337
System.Reflection.MethodInfo info = callExpression.Method;
@@ -333,16 +362,6 @@ public void RpcServer<TInterface>(Expression<Action<TInterface>> method, Delegat
333362
return WorkerResult.Success;
334363
}))
335364
{
336-
_model.QueueDeclare(queueName, true, false);
337-
IDictionary<string, object> spec = new Dictionary<string, object>
338-
{
339-
{"x-match", "all"},
340-
{"returnType", h.ret},
341-
{"signature", h.name},
342-
{"args", h.args}
343-
};
344-
_model.QueueBind(queueName, Exchange, string.Empty, spec);
345-
346365
_model.BasicConsume(queueName, false, _consumer);
347366

348367
return;

0 commit comments

Comments
 (0)