Skip to content

Commit 0dc03c1

Browse files
release v1.0.9
- [fix] fix recv bug
1 parent bda954c commit 0dc03c1

2 files changed

Lines changed: 95 additions & 110 deletions

File tree

Miku.Core/NetClient.cs

Lines changed: 89 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
using System.Collections.Concurrent;
44
using System.Net.Sockets;
55
using System.Collections.Generic;
6-
using System.Runtime.CompilerServices;
7-
using System.Runtime.InteropServices;
86
using System.Threading;
97

108
namespace Miku.Core
@@ -347,150 +345,137 @@ private static void Stop(SocketAsyncEventArgs args)
347345

348346
private static void Receive(SocketAsyncEventArgs args)
349347
{
350-
NetClient client = (NetClient)args.UserToken!;
351-
if (!client.IsConnected || client._socket == null)
348+
var client = (NetClient)args.UserToken!;
349+
var sock = client._socket;
350+
if (!client.IsConnected || sock == null)
351+
return;
352+
353+
// handle socket‐level close/errors
354+
if (args.SocketError != SocketError.Success || args.BytesTransferred == 0)
352355
{
356+
client.Stop();
353357
return;
354358
}
355359

356360
// check if the remote host closed the connection
357361
if (args is { BytesTransferred: > 0, SocketError: SocketError.Success })
358362
{
359-
int totalConsumed = 0;
360363
bool hasLeftover = client._receivedData.WrittenCount > 0;
361364
ReadOnlyMemory<byte> receivedData = new(args.Buffer, 0, args.BytesTransferred);
362-
ReadOnlyMemory<byte> processData = receivedData;
363-
365+
ReadOnlyMemory<byte> remainder;
364366
if (hasLeftover)
365367
{
366368
// copy
367369
client._receivedData.Write(receivedData.Span);
368-
processData = client._receivedData.WrittenMemory;
370+
Process(client, client._receivedData.WrittenMemory, out remainder);
371+
}
372+
else
373+
{
374+
Process(client, receivedData, out remainder);
369375
}
370376

371-
try
377+
if (!remainder.IsEmpty)
372378
{
373-
ReadOnlyMemory<byte> src = processData;
379+
if (hasLeftover)
380+
{
381+
byte[] temp = ArrayPool<byte>.Shared.Rent(remainder.Length);
382+
remainder.Span.CopyTo(temp);
383+
client._receivedData.Clear();
384+
client._receivedData.Write(temp);
385+
ArrayPool<byte>.Shared.Return(temp);
386+
}
387+
else
388+
{
389+
client._receivedData.Write(remainder.Span);
390+
}
391+
}
374392

375-
while (!processData.IsEmpty)
393+
if (client._socket != null)
394+
{
395+
if (!client._socket.ReceiveAsync(args))
396+
Receive(args);
397+
}
398+
else
399+
{
400+
Stop(args);
401+
}
402+
}
403+
else
404+
{
405+
Stop(args);
406+
}
407+
}
408+
409+
private static void Process(NetClient client, ReadOnlyMemory<byte> src, out ReadOnlyMemory<byte> remainder)
410+
{
411+
remainder = ReadOnlyMemory<byte>.Empty;
412+
413+
if (src.IsEmpty)
414+
return;
415+
int totalConsumed = 0;
416+
ReadOnlyMemory<byte> processData = src;
417+
418+
while (!processData.IsEmpty)
419+
{
420+
// reverse order - last middleware first
421+
if (client._middlewares.Count == 0)
422+
{
423+
totalConsumed += processData.Length;
424+
}
425+
else
426+
{
427+
for (int i = client._middlewares.Count - 1; i >= 0; i--)
376428
{
377-
// reverse order - last middleware first
378-
for (int i = client._middlewares.Count - 1; i >= 0; i--)
429+
var middleware = client._middlewares[i];
430+
try
379431
{
380-
var middleware = client._middlewares[i];
381-
try
432+
var (halt, consumed) = middleware.ProcessReceive(ref processData, out processData);
433+
// some middlewares might halt the processing
434+
if (halt)
382435
{
383-
var (halt, consumed) = middleware.ProcessReceive(ref processData, out processData);
384-
// some middlewares might halt the processing
385-
if (halt)
386-
{
387-
goto cont_receive;
388-
}
389-
390-
totalConsumed += consumed;
436+
return;
391437
}
392-
catch (Exception e)
393-
{
394-
client._receivedData.Clear();
395-
client.OnError?.Invoke(e);
396-
goto cont_receive;
397-
}
398-
}
399438

400-
// invoke event
401-
try
402-
{
403-
client.OnDataReceived?.Invoke(processData);
439+
totalConsumed += consumed;
404440
}
405441
catch (Exception e)
406442
{
443+
client._receivedData.Clear();
407444
client.OnError?.Invoke(e);
445+
return;
408446
}
409-
410-
for (int i = client._middlewares.Count - 1; i >= 0; i--)
411-
{
412-
var middleware = client._middlewares[i];
413-
try
414-
{
415-
middleware.PostReceive();
416-
}
417-
catch (Exception e)
418-
{
419-
client.OnError?.Invoke(e);
420-
}
421-
}
422-
423-
// still the original data or partially the original data
424-
var offset = Unsafe.ByteOffset(ref MemoryMarshal.GetReference(src.Span),
425-
ref MemoryMarshal.GetReference(processData.Span)).ToInt64();
426-
if (Math.Abs(offset) < src.Length)
427-
{
428-
totalConsumed = (int)offset + processData.Length;
429-
}
430-
431-
processData = totalConsumed < src.Length
432-
? src.Slice(totalConsumed)
433-
: ReadOnlyMemory<byte>.Empty;
434447
}
435448
}
449+
450+
// invoke event
451+
try
452+
{
453+
client.OnDataReceived?.Invoke(processData);
454+
}
436455
catch (Exception e)
437456
{
438457
client.OnError?.Invoke(e);
439458
}
440459

441-
cont_receive:
442-
if (totalConsumed > 0)
460+
for (int i = client._middlewares.Count - 1; i >= 0; i--)
443461
{
444-
// not completely consumed
445-
if (totalConsumed < args.BytesTransferred)
446-
{
447-
// copy tail of receivedData to a temporary buffer
448-
if (hasLeftover)
449-
{
450-
byte[] tempBuffer =
451-
ArrayPool<byte>.Shared.Rent(client._receivedData.WrittenCount - totalConsumed);
452-
client._receivedData.WrittenMemory.Span.Slice(totalConsumed).CopyTo(tempBuffer);
453-
client._receivedData.Clear();
454-
client._receivedData.Write(tempBuffer);
455-
ArrayPool<byte>.Shared.Return(tempBuffer);
456-
}
457-
else
458-
{
459-
client._receivedData.Clear();
460-
client._receivedData.Write(args.Buffer.AsSpan(totalConsumed,
461-
args.BytesTransferred - totalConsumed));
462-
}
463-
}
464-
else
462+
var middleware = client._middlewares[i];
463+
try
465464
{
466-
client._receivedData.Clear();
465+
middleware.PostReceive();
467466
}
468-
}
469-
// consumed nothing
470-
else
471-
{
472-
if (hasLeftover)
467+
catch (Exception e)
473468
{
474-
client._receivedData.Clear();
469+
client.OnError?.Invoke(e);
475470
}
476-
477-
client._receivedData.Write(processData.Span);
478471
}
479472

480-
if (client._socket != null)
481-
{
482-
if (!client._socket.ReceiveAsync(args))
483-
Receive(args);
484-
}
485-
else
486-
{
487-
Stop(args);
488-
}
489-
}
490-
else
491-
{
492-
Stop(args);
473+
processData = totalConsumed < src.Length
474+
? src.Slice(totalConsumed)
475+
: ReadOnlyMemory<byte>.Empty;
493476
}
477+
478+
remainder = totalConsumed < src.Length ? src.Slice(totalConsumed) : ReadOnlyMemory<byte>.Empty;
494479
}
495480
}
496481
}

Miku.UnitTest/ServerTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public void Setup()
1818
AppDomain.CurrentDomain.UnhandledException += (_, e) => { Console.WriteLine(e.ExceptionObject); };
1919
}
2020

21-
[Test, CancelAfter(1000)]
21+
[Test]
2222
public async Task ServerReceiveTest()
2323
{
2424
// A data for testing.
@@ -68,7 +68,7 @@ public async Task ServerReceiveTest()
6868
server.Stop();
6969
}
7070

71-
[Test, CancelAfter(1000)]
71+
[Test]
7272
public async Task ServerStopsClientTest()
7373
{
7474
// A data for testing.
@@ -116,7 +116,7 @@ public async Task ServerStopsClientTest()
116116
server.Stop();
117117
}
118118

119-
[Test, CancelAfter(1000)]
119+
[Test]
120120
public async Task EchoTest()
121121
{
122122
// A data for testing.
@@ -166,7 +166,7 @@ public async Task EchoTest()
166166
server.Stop();
167167
}
168168

169-
[Test, CancelAfter(1000)]
169+
[Test]
170170
public async Task FramingMiddlewareTest()
171171
{
172172
// A data for testing.
@@ -222,7 +222,7 @@ public async Task FramingMiddlewareTest()
222222
server.Stop();
223223
}
224224

225-
[Test, CancelAfter(1000)]
225+
[Test]
226226
public async Task MultipleMiddlewareTest()
227227
{
228228
// A data for testing.
@@ -280,7 +280,7 @@ public async Task MultipleMiddlewareTest()
280280
server.Stop();
281281
}
282282

283-
[Test, CancelAfter(1000)]
283+
[Test]
284284
public async Task PingPongTest()
285285
{
286286
// ip port info

0 commit comments

Comments
 (0)