Skip to content

Commit 1986553

Browse files
committed
Merge pull request #80 from nayato/cancel-schedule
Adds IEventExecutor.Schedule, proper cancellation of scheduled tasks
2 parents ae0798e + d5a4c30 commit 1986553

23 files changed

Lines changed: 601 additions & 157 deletions

src/DotNetty.Buffers/ByteBufferUtil.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc
579579

580580
if (src.HasArray)
581581
{
582-
return encoding.GetString(src.Array, readerIndex, len);
582+
return encoding.GetString(src.Array, src.ArrayOffset + readerIndex, len);
583583
}
584584
else
585585
{

src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,21 @@ public void Execute(Action action)
5555
this.Execute(new ActionTaskQueueNode(action));
5656
}
5757

58+
public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
59+
{
60+
throw new NotSupportedException();
61+
}
62+
63+
public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
64+
{
65+
throw new NotSupportedException();
66+
}
67+
68+
public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
69+
{
70+
throw new NotSupportedException();
71+
}
72+
5873
public virtual Task ScheduleAsync(Action action, TimeSpan delay)
5974
{
6075
return this.ScheduleAsync(action, delay, CancellationToken.None);
Lines changed: 39 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) Microsoft. All rights reserved.
1+
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
namespace DotNetty.Common.Concurrency
@@ -15,8 +15,6 @@ namespace DotNetty.Common.Concurrency
1515
/// </summary>
1616
public abstract class AbstractScheduledEventExecutor : AbstractEventExecutor
1717
{
18-
static readonly Action<object, object> AddScheduledTaskAction = (e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t);
19-
2018
protected readonly PriorityQueue<IScheduledRunnable> ScheduledTaskQueue = new PriorityQueue<IScheduledRunnable>();
2119

2220
// TODO: support for EventExecutorGroup
@@ -94,172 +92,89 @@ protected bool HasScheduledTasks()
9492
return scheduledTask != null && scheduledTask.Deadline <= PreciseTimeSpan.FromStart;
9593
}
9694

97-
public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
95+
public override IScheduledTask Schedule(Action action, TimeSpan delay)
9896
{
99-
var scheduledTask = new ActionScheduledTask(action, PreciseTimeSpan.Deadline(delay), cancellationToken);
100-
if (this.InEventLoop)
101-
{
102-
this.ScheduledTaskQueue.Enqueue(scheduledTask);
103-
}
104-
else
105-
{
106-
this.Execute(AddScheduledTaskAction, this, scheduledTask);
107-
}
108-
return scheduledTask.Completion;
109-
}
110-
111-
public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
112-
{
113-
var scheduledTask = new StateActionScheduledTask(action, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
114-
if (this.InEventLoop)
115-
{
116-
this.ScheduledTaskQueue.Enqueue(scheduledTask);
117-
}
118-
else
119-
{
120-
this.Execute(AddScheduledTaskAction, this, scheduledTask);
121-
}
122-
return scheduledTask.Completion;
97+
return this.Schedule(new ActionScheduledTask(this, action, PreciseTimeSpan.Deadline(delay)));
12398
}
12499

125-
public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
100+
public override IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
126101
{
127-
var scheduledTask = new StateActionWithContextScheduledTask(action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
128-
if (this.InEventLoop)
129-
{
130-
this.ScheduledTaskQueue.Enqueue(scheduledTask);
131-
}
132-
else
133-
{
134-
this.Execute(AddScheduledTaskAction, this, scheduledTask);
135-
}
136-
return scheduledTask.Completion;
102+
return this.Schedule(new StateActionScheduledTask(this, action, state, PreciseTimeSpan.Deadline(delay)));
137103
}
138104

139-
#region Scheduled task data structures
140-
141-
protected interface IScheduledRunnable : IRunnable, IComparable<IScheduledRunnable>
105+
public override IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
142106
{
143-
PreciseTimeSpan Deadline { get; }
144-
145-
bool Cancel();
107+
return this.Schedule(new StateActionWithContextScheduledTask(this, action, context, state, PreciseTimeSpan.Deadline(delay)));
146108
}
147109

148-
protected abstract class ScheduledTaskBase : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
110+
public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
149111
{
150-
readonly TaskCompletionSource promise;
151-
152-
protected ScheduledTaskBase(PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken)
153-
{
154-
this.promise = promise;
155-
this.Deadline = deadline;
156-
this.CancellationToken = cancellationToken;
157-
}
158-
159-
public PreciseTimeSpan Deadline { get; private set; }
160-
161-
public bool Cancel()
112+
if (cancellationToken.IsCancellationRequested)
162113
{
163-
return this.promise.TrySetCanceled();
114+
return TaskEx.Cancelled;
164115
}
165116

166-
public Task Completion
117+
if (!cancellationToken.CanBeCanceled)
167118
{
168-
get { return this.promise.Task; }
119+
return this.Schedule(action, delay).Completion;
169120
}
170121

171-
public CancellationToken CancellationToken { get; private set; }
172-
173-
int IComparable<IScheduledRunnable>.CompareTo(IScheduledRunnable other)
174-
{
175-
Contract.Requires(other != null);
176-
177-
return this.Deadline.CompareTo(other.Deadline);
178-
}
122+
return this.Schedule(new ActionScheduledAsyncTask(this, action, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
123+
}
179124

180-
public override IRunnable Value
125+
public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
126+
{
127+
if (cancellationToken.IsCancellationRequested)
181128
{
182-
get { return this; }
129+
return TaskEx.Cancelled;
183130
}
184131

185-
public void Run()
132+
if (!cancellationToken.CanBeCanceled)
186133
{
187-
if (this.CancellationToken.IsCancellationRequested)
188-
{
189-
this.promise.TrySetCanceled();
190-
return;
191-
}
192-
if (this.Completion.IsCanceled)
193-
{
194-
return;
195-
}
196-
try
197-
{
198-
this.Execute();
199-
this.promise.TryComplete();
200-
}
201-
catch (Exception ex)
202-
{
203-
// todo: check for fatal
204-
this.promise.TrySetException(ex);
205-
}
134+
return this.Schedule(action, state, delay).Completion;
206135
}
207136

208-
protected abstract void Execute();
137+
return this.Schedule(new StateActionScheduledAsyncTask(this, action, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
209138
}
210139

211-
sealed class ActionScheduledTask : ScheduledTaskBase
140+
public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
212141
{
213-
readonly Action action;
214-
215-
public ActionScheduledTask(Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
216-
: base(deadline, new TaskCompletionSource(), cancellationToken)
142+
if (cancellationToken.IsCancellationRequested)
217143
{
218-
this.action = action;
144+
return TaskEx.Cancelled;
219145
}
220146

221-
protected override void Execute()
147+
if (!cancellationToken.CanBeCanceled)
222148
{
223-
this.action();
149+
return this.Schedule(action, context, state, delay).Completion;
224150
}
151+
152+
return this.Schedule(new StateActionWithContextScheduledAsyncTask(this, action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
225153
}
226154

227-
sealed class StateActionScheduledTask : ScheduledTaskBase
155+
protected IScheduledRunnable Schedule(IScheduledRunnable task)
228156
{
229-
readonly Action<object> action;
230-
231-
public StateActionScheduledTask(Action<object> action, object state, PreciseTimeSpan deadline,
232-
CancellationToken cancellationToken)
233-
: base(deadline, new TaskCompletionSource(state), cancellationToken)
157+
if (this.InEventLoop)
234158
{
235-
this.action = action;
159+
this.ScheduledTaskQueue.Enqueue(task);
236160
}
237-
238-
protected override void Execute()
161+
else
239162
{
240-
this.action(this.Completion.AsyncState);
163+
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t), this, task);
241164
}
165+
return task;
242166
}
243167

244-
sealed class StateActionWithContextScheduledTask : ScheduledTaskBase
168+
internal void RemoveScheduled(IScheduledRunnable task)
245169
{
246-
readonly Action<object, object> action;
247-
readonly object context;
248-
249-
public StateActionWithContextScheduledTask(Action<object, object> action, object context, object state,
250-
PreciseTimeSpan deadline, CancellationToken cancellationToken)
251-
: base(deadline, new TaskCompletionSource(state), cancellationToken)
170+
if (this.InEventLoop)
252171
{
253-
this.action = action;
254-
this.context = context;
172+
this.ScheduledTaskQueue.Remove(task);
255173
}
256-
257-
protected override void Execute()
174+
else
258175
{
259-
this.action(this.context, this.Completion.AsyncState);
176+
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Remove((IScheduledRunnable)t), this, task);
260177
}
261178
}
262-
263-
#endregion
264179
}
265180
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace DotNetty.Common.Concurrency
5+
{
6+
using System;
7+
using System.Threading;
8+
9+
sealed class ActionScheduledAsyncTask : ScheduledAsyncTask
10+
{
11+
readonly Action action;
12+
13+
public ActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
14+
: base(executor, deadline, new TaskCompletionSource(), cancellationToken)
15+
{
16+
this.action = action;
17+
}
18+
19+
protected override void Execute()
20+
{
21+
this.action();
22+
}
23+
}
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace DotNetty.Common.Concurrency
5+
{
6+
using System;
7+
8+
sealed class ActionScheduledTask : ScheduledTask
9+
{
10+
readonly Action action;
11+
12+
public ActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline)
13+
: base(executor, deadline, new TaskCompletionSource())
14+
{
15+
this.action = action;
16+
}
17+
18+
protected override void Execute()
19+
{
20+
this.action();
21+
}
22+
}
23+
}

src/DotNetty.Common/Concurrency/IEventExecutor.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,34 @@ public interface IEventExecutor
105105
/// </remarks>
106106
void Execute(Action<object, object> action, object context, object state);
107107

108+
/// <summary>
109+
/// Schedules the given action for execution after the specified delay would pass.
110+
/// </summary>
111+
/// <remarks>
112+
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
113+
/// </remarks>
114+
IScheduledTask Schedule(Action action, TimeSpan delay);
115+
116+
/// <summary>
117+
/// Schedules the given action for execution after the specified delay would pass.
118+
/// </summary>
119+
/// <remarks>
120+
/// <paramref name="state"/> parameter is useful to when repeated execution of an action against
121+
/// different objects is needed.
122+
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
123+
/// </remarks>
124+
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);
125+
126+
/// <summary>
127+
/// Schedules the given action for execution after the specified delay would pass.
128+
/// </summary>
129+
/// <remarks>
130+
/// <paramref name="context"/> and <paramref name="state"/> parameters are useful when repeated execution of
131+
/// an action against different objects in different context is needed.
132+
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
133+
/// </remarks>
134+
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);
135+
108136
/// <summary>
109137
/// Schedules the given action for execution after the specified delay would pass.
110138
/// </summary>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace DotNetty.Common.Concurrency
5+
{
6+
using System;
7+
8+
public interface IScheduledRunnable : IRunnable, IScheduledTask, IComparable<IScheduledRunnable>
9+
{
10+
}
11+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace DotNetty.Common.Concurrency
5+
{
6+
using System.Runtime.CompilerServices;
7+
using System.Threading.Tasks;
8+
9+
public interface IScheduledTask
10+
{
11+
bool Cancel();
12+
13+
PreciseTimeSpan Deadline { get; }
14+
15+
Task Completion { get; }
16+
17+
TaskAwaiter GetAwaiter();
18+
}
19+
}

0 commit comments

Comments
 (0)