Skip to content

Commit 66caa64

Browse files
committed
Merge pull request #91 from nayato/shutdown
Fixes ByteBuffer+Stream integration, TLS neg, STEE Shutdown
2 parents d3e6fe4 + aa3cbc6 commit 66caa64

9 files changed

Lines changed: 62 additions & 49 deletions

File tree

examples/Echo.Client/Program.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace Echo.Client
88
using System.Net;
99
using System.Security.Cryptography.X509Certificates;
1010
using System.Threading.Tasks;
11+
using DotNetty.Codecs;
1112
using DotNetty.Common.Internal.Logging;
1213
using DotNetty.Handlers.Tls;
1314
using DotNetty.Transport.Bootstrapping;
@@ -42,10 +43,12 @@ static async Task RunClient()
4243
string targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
4344
pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
4445
}
46+
pipeline.AddLast(new LengthFieldPrepender(2));
47+
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
4548

4649
pipeline.AddLast(new EchoClientHandler());
4750
}));
48-
51+
4952
IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));
5053

5154
Console.ReadLine();
@@ -64,4 +67,4 @@ static void Main(string[] args)
6467
Task.Run(() => RunClient()).Wait();
6568
}
6669
}
67-
}
70+
}

examples/Echo.Server/Program.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace Echo.Server
77
using System.Diagnostics.Tracing;
88
using System.Security.Cryptography.X509Certificates;
99
using System.Threading.Tasks;
10+
using DotNetty.Codecs;
1011
using DotNetty.Common.Internal.Logging;
1112
using DotNetty.Handlers.Logging;
1213
using DotNetty.Handlers.Tls;
@@ -41,6 +42,8 @@ static async Task RunServer()
4142
{
4243
pipeline.AddLast(TlsHandler.Server(new X509Certificate2("dotnetty.com.pfx", "password")));
4344
}
45+
pipeline.AddLast(new LengthFieldPrepender(2));
46+
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
4447

4548
pipeline.AddLast(new EchoServerHandler());
4649
}));
@@ -63,4 +66,4 @@ static void Main(string[] args)
6366
Task.Run(() => RunServer()).Wait();
6467
}
6568
}
66-
}
69+
}

src/DotNetty.Buffers/ByteBufferUtil.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,10 +635,11 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc
635635
else
636636
{
637637
IByteBuffer buffer = src.Allocator.Buffer(len);
638+
Contract.Assert(buffer.HasArray, "Operation expects allocator to operate array-based buffers.");
638639
try
639640
{
640641
buffer.WriteBytes(src, readerIndex, len);
641-
return encoding.GetString(buffer.Array, 0, len);
642+
return encoding.GetString(buffer.Array, buffer.ArrayOffset, len);
642643
}
643644
finally
644645
{

src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public override IByteBuffer GetBytes(int index, byte[] dst, int dstIndex, int le
148148

149149
public override IByteBuffer GetBytes(int index, Stream destination, int length)
150150
{
151-
destination.Write(this.Array, this.ArrayOffset + this.ReaderIndex, this.ReadableBytes);
151+
destination.Write(this.Array, this.ArrayOffset + index, length);
152152
return this;
153153
}
154154

src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
2121
const int ST_TERMINATED = 5;
2222
const string DefaultWorkerThreadName = "SingleThreadEventExecutor worker";
2323

24+
static readonly IRunnable WAKEUP_TASK = new NoOpRunnable();
25+
2426
static readonly IInternalLogger Logger =
2527
InternalLoggerFactory.GetInstance<SingleThreadEventExecutor>();
2628

@@ -70,15 +72,12 @@ void Loop()
7072
Task.Factory.StartNew(
7173
() =>
7274
{
73-
if (Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED) == ST_NOT_STARTED)
75+
Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED);
76+
while (!this.ConfirmShutdown())
7477
{
75-
while (!this.ConfirmShutdown())
76-
{
77-
this.RunAllTasks(this.preciseBreakoutInterval);
78-
}
79-
80-
this.CleanupAndTerminate(true);
78+
this.RunAllTasks(this.preciseBreakoutInterval);
8179
}
80+
this.CleanupAndTerminate(true);
8281
},
8382
CancellationToken.None,
8483
TaskCreationOptions.None,
@@ -120,6 +119,14 @@ public override void Execute(IRunnable task)
120119
}
121120
}
122121

122+
protected void WakeUp(bool inEventLoop)
123+
{
124+
if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN)
125+
{
126+
this.Execute(WAKEUP_TASK);
127+
}
128+
}
129+
123130
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
124131
{
125132
Contract.Requires(quietPeriod >= TimeSpan.Zero);
@@ -174,10 +181,10 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
174181
// scheduleExecution();
175182
//}
176183

177-
//if (wakeup)
178-
//{
179-
// wakeup(inEventLoop);
180-
//}
184+
if (wakeup)
185+
{
186+
this.WakeUp(inEventLoop);
187+
}
181188

182189
return this.TerminationCompletion;
183190
}
@@ -189,10 +196,7 @@ protected bool ConfirmShutdown()
189196
return false;
190197
}
191198

192-
if (!this.InEventLoop)
193-
{
194-
throw new InvalidOperationException("must be invoked from an event loop");
195-
}
199+
Contract.Assert(this.InEventLoop, "must be invoked from an event loop");
196200

197201
this.CancelScheduledTasks();
198202

@@ -210,8 +214,7 @@ protected bool ConfirmShutdown()
210214
}
211215

212216
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
213-
// todo: ???
214-
//wakeup(true);
217+
this.WakeUp(true);
215218
return false;
216219
}
217220

@@ -227,7 +230,7 @@ protected bool ConfirmShutdown()
227230
// Check if any tasks were added to the queue every 100ms.
228231
// TODO: Change the behavior of takeTask() so that it returns on timeout.
229232
// todo: ???
230-
//wakeup(true);
233+
this.WakeUp(true);
231234
Thread.Sleep(100);
232235

233236
return false;
@@ -243,7 +246,6 @@ protected void CleanupAndTerminate(bool success)
243246
while (true)
244247
{
245248
int oldState = this.executionState;
246-
;
247249
if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState)
248250
{
249251
break;
@@ -398,7 +400,7 @@ IRunnable PollTask()
398400
if (task == null)
399401
{
400402
this.emptyEvent.Reset();
401-
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
403+
if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
402404
{
403405
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
404406
if (nextScheduledTask != null)
@@ -424,27 +426,11 @@ IRunnable PollTask()
424426
return task;
425427
}
426428

427-
#region IDisposable members
428-
429-
public void Dispose()
430-
{
431-
this.Dispose(true);
432-
GC.SuppressFinalize(this);
433-
}
434-
435-
public void Dispose(bool isDisposing)
429+
sealed class NoOpRunnable : IRunnable
436430
{
437-
if (!this.disposed)
431+
public void Run()
438432
{
439-
if (isDisposing)
440-
{
441-
this.thread = null;
442-
}
443433
}
444-
445-
this.disposed = true;
446434
}
447-
448-
#endregion
449435
}
450436
}

src/DotNetty.Handlers/Tls/TlsHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ bool EnsureAuthenticated()
247247
this.state = oldState | State.Authenticating;
248248
if (this.isServer)
249249
{
250-
this.sslStream.AuthenticateAsServerAsync(this.certificate) // todo: change to begin/end
250+
this.sslStream.AuthenticateAsServerAsync(this.certificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false) // todo: change to begin/end
251251
.ContinueWith(AuthenticationCompletionCallback, this, TaskContinuationOptions.ExecuteSynchronously);
252252
}
253253
else

test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,21 @@ public async Task ScheduledTaskFiresOnTimeWhileBusy()
118118
Assert.True(task.IsCompleted);
119119
}
120120

121+
[Theory]
122+
[InlineData(0)]
123+
[InlineData(200)]
124+
public async Task ShutdownWhileIdle(int delayInMs)
125+
{
126+
var scheduler = new SingleThreadEventExecutor("test", TimeSpan.FromMilliseconds(10));
127+
if (delayInMs > 0)
128+
{
129+
Thread.Sleep(delayInMs);
130+
}
131+
Task shutdownTask = scheduler.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(1));
132+
await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(5)));
133+
Assert.True(shutdownTask.IsCompleted);
134+
}
135+
121136
class Container<T>
122137
{
123138
public T Value;

test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<NuGetPackageImportStamp>8624fbb3</NuGetPackageImportStamp>
2222
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
2323
<RestorePackages>true</RestorePackages>
24+
<TargetFrameworkProfile />
2425
</PropertyGroup>
2526
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
2627
<DebugSymbols>true</DebugSymbols>

test/DotNetty.Tests.End2End/End2EndTests.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace DotNetty.Tests.End2End
1111
using System.Text;
1212
using System.Threading.Tasks;
1313
using DotNetty.Buffers;
14+
using DotNetty.Codecs;
1415
using DotNetty.Codecs.Mqtt;
1516
using DotNetty.Codecs.Mqtt.Packets;
1617
using DotNetty.Common.Concurrency;
@@ -49,7 +50,9 @@ public async void EchoServerAndClient()
4950
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
5051
Func<Task> closeServerFunc = await this.StartServerAsync(true, ch =>
5152
{
52-
ch.Pipeline.AddLast(TlsHandler.Server(tlsCertificate));
53+
ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate));
54+
ch.Pipeline.AddLast("server prepender", new LengthFieldPrepender(2));
55+
ch.Pipeline.AddLast("server decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
5356
ch.Pipeline.AddLast(new EchoChannelHandler());
5457
}, testPromise);
5558

@@ -61,7 +64,9 @@ public async void EchoServerAndClient()
6164
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
6265
{
6366
string targetHost = tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
64-
ch.Pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
67+
ch.Pipeline.AddLast("client tls", TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
68+
ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2));
69+
ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
6570
ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise));
6671
}));
6772

@@ -74,8 +79,8 @@ public async void EchoServerAndClient()
7479

7580
this.Output.WriteLine("Connected channel: {0}", clientChannel);
7681

77-
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromMinutes(1)));
78-
Assert.True(testPromise.Task.IsCompleted);
82+
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
83+
Assert.True(testPromise.Task.IsCompleted, "timed out");
7984
testPromise.Task.Wait();
8085
}
8186
finally
@@ -277,7 +282,6 @@ async Task<Func<Task>> StartServerAsync(bool tcpNoDelay, Action<IChannel> childH
277282
var bossGroup = new MultithreadEventLoopGroup(1);
278283
var workerGroup = new MultithreadEventLoopGroup();
279284
bool started = false;
280-
//var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
281285
try
282286
{
283287
ServerBootstrap b = new ServerBootstrap()

0 commit comments

Comments
 (0)