-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy pathTimingWheelTimer.cs
More file actions
255 lines (230 loc) · 7.87 KB
/
TimingWheelTimer.cs
File metadata and controls
255 lines (230 loc) · 7.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
using System;
using System.Threading;
using System.Threading.Tasks;
using DelayQueue;
using ET;
using TimingWheel.Extensions;
using TimingWheel.Interfaces;
using UGFExtensions;
namespace TimingWheel
{
/// <summary>
/// 时间轮计时器,参考kafka时间轮算法实现
/// </summary>
public class TimingWheelTimer : ITimer
{
/// <summary>
/// 时间槽延时队列,和时间轮共用
/// </summary>
private readonly DelayQueue<TimeSlot> m_DelayQueue = new DelayQueue<TimeSlot>();
/// <summary>
/// 时间轮
/// </summary>
private readonly TimingWheel m_TimingWheel;
/// <summary>
/// 任务总数
/// </summary>
private readonly AtomicInt m_TaskCount = new AtomicInt();
/// <summary>
/// 任务总数
/// </summary>
public int TaskCount => m_TaskCount.Get();
private CancellationTokenSource m_CancelTokenSource;
/// <summary>
///
/// </summary>
/// <param name="tickSpan">时间槽大小,毫秒</param>
/// <param name="slotCount">时间槽数量</param>
/// <param name="startMs">起始时间戳,标识时间轮创建时间</param>
private TimingWheelTimer(long tickSpan, int slotCount, long startMs)
{
m_TimingWheel = new TimingWheel(tickSpan, slotCount, startMs, m_TaskCount, m_DelayQueue);
}
/// <summary>
/// 构建时间轮计时器
/// </summary>
/// <param name="tickSpan">时间槽大小</param>
/// <param name="slotCount">时间槽数量</param>
/// <param name="startMs">起始时间戳,标识时间轮创建时间,默认当前时间</param>
public static ITimer Build(TimeSpan tickSpan, int slotCount, long? startMs = null)
{
return new TimingWheelTimer((long) tickSpan.TotalMilliseconds,
slotCount,
startMs ?? DateTimeHelper.GetTimestamp());
}
/// <summary>
/// 添加任务
/// </summary>
/// <param name="timeout">过期时间,相对时间</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public ETTask<bool> AddTask(TimeSpan timeout, ETCancellationToken cancellationToken = default)
{
var timeoutMs = DateTimeHelper.GetTimestamp() + (long) timeout.TotalMilliseconds;
return AddTask(timeoutMs, cancellationToken);
}
/// <summary>
/// 添加任务
/// </summary>
/// <param name="timeout">过期时间,相对时间</param>
/// <param name="action"></param>
/// <returns></returns>
public ITimeTask AddTask(TimeSpan timeout, Action<bool> action)
{
var timeoutMs = DateTimeHelper.GetTimestamp() + (long) timeout.TotalMilliseconds;
return AddTask(timeoutMs,action);
}
/// <summary>
/// 添加任务
/// </summary>
/// <param name="timeoutMs">过期时间戳,绝对时间</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async ETTask<bool> AddTask(long timeoutMs, ETCancellationToken cancellationToken = default)
{
var task = TimeTask.Create(timeoutMs);
AddTask(task);
// 如果添加了已经到期的时间 那么会立即执行 导致下面的task 被置空 报错 这里判断一下是否执行了
if (task.TaskStatus == TimeTaskStatus.None)
{
return true;
}
void CancelAction()
{
task.Cancel();
}
bool result;
try
{
cancellationToken?.Add(CancelAction);
result = await (ETTask<bool>) task.DelayTask;
}
finally
{
cancellationToken?.Remove(CancelAction);
}
return result;
}
/// <summary>
/// 添加任务
/// </summary>
/// <param name="timeoutMs">过期时间戳,绝对时间</param>
/// <param name="action"></param>
/// <returns></returns>
public ITimeTask AddTask(long timeoutMs, Action<bool> action)
{
var task = TimeTask.Create(timeoutMs, action);
AddTask(task);
// 如果添加了已经到期的时间 那么会立即执行 返回null 不允许操作已经返还对象池的 ITimeTask 。
return task.TaskStatus == TimeTaskStatus.None ? null : task;
}
/// <summary>
/// 启动
/// </summary>
public void Start()
{
if (m_CancelTokenSource != null)
{
return;
}
m_CancelTokenSource = new CancellationTokenSource();
// 时间轮运行线程
Task.Factory.StartNew(() => Run(m_CancelTokenSource.Token),
m_CancelTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
/// <summary>
/// 停止
/// </summary>
public void Stop()
{
Cancel();
m_DelayQueue.Clear();
}
/// <summary>
/// 暂停
/// </summary>
public void Pause()
{
Cancel();
}
/// <summary>
/// 恢复
/// </summary>
public void Resume()
{
Start();
}
/// <summary>
/// 取消任务
/// </summary>
private void Cancel()
{
if (m_CancelTokenSource != null)
{
m_CancelTokenSource.Cancel();
m_CancelTokenSource.Dispose();
m_CancelTokenSource = null;
}
}
/// <summary>
/// 运行
/// </summary>
private void Run(CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
Step(token);
}
}
catch (Exception e)
{
if (e is OperationCanceledException)
{
return;
}
throw;
}
}
/// <summary>
/// 推进时间轮
/// </summary>
/// <param name="token"></param>
private void Step(CancellationToken token)
{
// 阻塞式获取,到期的时间槽才会出队
if (m_DelayQueue.TryTake(out var slot, token))
{
while (!token.IsCancellationRequested)
{
// 推进时间轮
m_TimingWheel.Step(slot.TimeoutMs.Get());
// 到期的任务会重新添加进时间轮,那么下一层时间轮的任务重新计算后可能会进入上层时间轮。
// 这样就实现了任务在时间轮中的传递,由大精度的时间轮进入小精度的时间轮。
slot.Flush(AddTask);
// Flush之后可能有新的slot入队,可能仍旧过期,因此尝试继续处理,直到没有过期项。
if (!m_DelayQueue.TryTakeNoBlocking(out slot))
{
break;
}
}
}
}
/// <summary>
/// 添加任务
/// </summary>
/// <param name="timeTask">延时任务</param>
private void AddTask(TimeTask timeTask)
{
// 添加失败,说明该任务已到期,需要执行了
if (m_TimingWheel.AddTask(timeTask)) return;
if (timeTask.IsWaiting)
{
Loom.Post(timeTask.Run);
}
}
}
}