Skip to content

Commit ed028ed

Browse files
committed
Merge pull request #81 from nayato/no-breakout
STEE: smarter idle waiting for scheduled tasks
2 parents 1986553 + ba716c6 commit ed028ed

5 files changed

Lines changed: 52 additions & 8 deletions

File tree

src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
2727
readonly MpscLinkedQueue<IRunnable> taskQueue = new MpscLinkedQueue<IRunnable>();
2828
Thread thread;
2929
volatile int executionState = ST_NOT_STARTED;
30-
readonly TimeSpan breakoutInterval;
3130
readonly PreciseTimeSpan preciseBreakoutInterval;
3231
PreciseTimeSpan lastExecutionTime;
3332
readonly ManualResetEventSlim emptyEvent = new ManualResetEventSlim();
@@ -41,7 +40,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
4140
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
4241
{
4342
this.terminationCompletionSource = new TaskCompletionSource();
44-
this.breakoutInterval = breakoutInterval;
4543
this.preciseBreakoutInterval = PreciseTimeSpan.FromTimeSpan(breakoutInterval);
4644
this.scheduler = new ExecutorTaskScheduler(this);
4745
this.thread = new Thread(this.Loop)
@@ -400,10 +398,23 @@ IRunnable PollTask()
400398
if (task == null)
401399
{
402400
this.emptyEvent.Reset();
403-
if ((task = this.taskQueue.Dequeue()) == null // revisit queue as producer might have put a task in meanwhile
404-
&& this.emptyEvent.Wait(this.breakoutInterval))
401+
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
405402
{
406-
task = this.taskQueue.Dequeue();
403+
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
404+
if (nextScheduledTask != null)
405+
{
406+
TimeSpan wakeUpTimeout = (nextScheduledTask.Deadline - PreciseTimeSpan.FromStart).ToTimeSpan();
407+
if (this.emptyEvent.Wait(wakeUpTimeout))
408+
{
409+
// woken up before the next scheduled task was due
410+
task = this.taskQueue.Dequeue();
411+
}
412+
}
413+
else
414+
{
415+
this.emptyEvent.Wait();
416+
task = this.taskQueue.Dequeue();
417+
}
407418
}
408419
}
409420

src/DotNetty.Common/DotNetty.Common.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@
163163
<Compile Include="IResourceLeakHint.cs" />
164164
<Compile Include="ThreadLocalObjectList.cs" />
165165
<Compile Include="ThreadLocalPool.cs" />
166-
<Compile Include="Timestamp.cs" />
166+
<Compile Include="PreciseTimeSpan.cs" />
167167
<Compile Include="Utilities\AtomicReference.cs" />
168168
<Compile Include="Utilities\BitOps.cs" />
169169
<Compile Include="Utilities\ByteArrayExtensions.cs" />

src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace DotNetty.Transport.Channels
99

1010
public class SingleThreadEventLoop : SingleThreadEventExecutor, IEventLoop
1111
{
12-
static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromSeconds(5);
12+
static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromMilliseconds(100);
1313

1414
public SingleThreadEventLoop()
1515
: this(null, DefaultBreakoutInterval)

test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,40 @@ public void FuzzyScheduling(int producerCount, bool perCpu, int taskPerProducer)
8585
Assert.True(mre.WaitOne(TimeSpan.FromSeconds(5)));
8686
}
8787

88-
public class Container<T>
88+
[Theory]
89+
[InlineData(true)]
90+
[InlineData(false)]
91+
public async Task ScheduledTaskFiresOnTime(bool scheduleFromExecutor)
92+
{
93+
var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMinutes(1));
94+
var promise = new TaskCompletionSource();
95+
Func<Task> scheduleFunc = () => scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100));
96+
Task task = scheduleFromExecutor ? await scheduler.SubmitAsync(scheduleFunc) : scheduleFunc();
97+
await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300)));
98+
Assert.True(task.IsCompleted);
99+
}
100+
101+
[Fact]
102+
public async Task ScheduledTaskFiresOnTimeWhileBusy()
103+
{
104+
var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMilliseconds(10));
105+
var promise = new TaskCompletionSource();
106+
Action selfQueueAction = null;
107+
selfQueueAction = () =>
108+
{
109+
if (!promise.Task.IsCompleted)
110+
{
111+
scheduler.Execute(selfQueueAction);
112+
}
113+
};
114+
115+
scheduler.Execute(selfQueueAction);
116+
Task task = scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100));
117+
await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300)));
118+
Assert.True(task.IsCompleted);
119+
}
120+
121+
class Container<T>
89122
{
90123
public T Value;
91124
}

0 commit comments

Comments
 (0)