forked from wmowm/InitQ
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathInitQ.Core.cs
More file actions
140 lines (129 loc) · 5.85 KB
/
Copy pathInitQ.Core.cs
File metadata and controls
140 lines (129 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using InitQ.Abstractions;
using InitQ.Cache;
using InitQ.Internal;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace InitQ
{
public class InitQCore
{
private async Task Send(IEnumerable<ConsumerExecutorDescriptor> ExecutorDescriptorList, IServiceProvider serviceProvider, InitQOptions options)
{
foreach (var ConsumerExecutorDescriptor in ExecutorDescriptorList)
{
//线程
await Task.Run(async() =>
{
using (var scope = serviceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
var publish = ConsumerExecutorDescriptor.Attribute.Name;
var provider = scope.ServiceProvider;
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, ConsumerExecutorDescriptor.ImplTypeInfo);
ParameterInfo[] parameterInfos = ConsumerExecutorDescriptor.MethodInfo.GetParameters();
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
while (true)
{
try
{
if (options.ShowLog)
{
Console.WriteLine($"执行方法:{obj.ToString()},key:{publish},执行时间{DateTime.Now}");
}
var count = await _redis.ListLengthAsync(publish);
if (count > 0)
{
//从MQ里获取一条消息
var res = await _redis.ListRightPopAsync(publish);
if (string.IsNullOrEmpty(res)) continue;
//堵塞
await Task.Delay(options.IntervalTime);
try
{
await Task.Run(async() =>
{
if (parameterInfos.Length == 0)
{
ConsumerExecutorDescriptor.MethodInfo.Invoke(obj, null);
}
else
{
object[] parameters = new object[] { res };
ConsumerExecutorDescriptor.MethodInfo.Invoke(obj, parameters);
}
});
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
else
{
//线程挂起1s
await Task.Delay(options.SuspendTime);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
});
}
}
public async Task FindInterfaceTypes(IServiceProvider provider, InitQOptions options)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
using (var scoped = provider.CreateScope())
{
var scopedProvider = scoped.ServiceProvider;
var list_service = scopedProvider.GetService<Func<Type, IRedisSubscribe>>();
foreach (var item in options.ListSubscribe)
{
var consumerServices = list_service(item);
var typeInfo = consumerServices.GetType().GetTypeInfo();
if (!typeof(IRedisSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
{
continue;
}
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
await Send(executorDescriptorList.Where(m => m.Attribute.GetType().Name == "SubscribeAttribute"), provider, options);
}
}
private IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo)
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true);
var topicAttributes = topicAttr as IList<TopicAttribute> ?? topicAttr.ToList();
if (!topicAttributes.Any())
{
continue;
}
foreach (var attr in topicAttributes)
{
yield return InitDescriptor(attr, method, typeInfo);
}
}
}
private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr, MethodInfo methodInfo, TypeInfo implType)
{
var descriptor = new ConsumerExecutorDescriptor
{
Attribute = attr,
MethodInfo = methodInfo,
ImplTypeInfo = implType
};
return descriptor;
}
}
}