Skip to content

Commit 0198044

Browse files
committed
remove the last traces of Pipelines.Sockets.Unofficial
1 parent 9c92a65 commit 0198044

8 files changed

Lines changed: 180 additions & 65 deletions

File tree

Directory.Packages.props

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
<!-- Packages we depend on for StackExchange.Redis, upgrades can create binding redirect pain! -->
44
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
55
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
6-
<PackageVersion Include="Pipelines.Sockets.Unofficial" Version="2.2.8" />
76
<PackageVersion Include="System.Diagnostics.PerformanceCounter" Version="5.0.0" />
87
<PackageVersion Include="System.Threading.Channels" Version="5.0.0" />
98
<PackageVersion Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.3.0" />
Lines changed: 148 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,161 @@
1-
// ReSharper disable once CheckNamespace
1+
#if !NET
2+
using System.Diagnostics;
3+
using System.Diagnostics.CodeAnalysis;
4+
using System.Runtime.CompilerServices;
5+
using System.Runtime.InteropServices;
6+
7+
// ReSharper disable once CheckNamespace
28
namespace System.Net.Sockets;
39

410
internal static class SocketExtensions
511
{
6-
#if !NET
7-
internal static ValueTask ConnectAsync(this Socket socket, EndPoint remoteEP, CancellationToken cancellationToken = default)
12+
internal static async ValueTask ConnectAsync(this Socket socket, EndPoint remoteEP, CancellationToken cancellationToken = default)
813
{
9-
throw new NotImplementedException();
14+
// this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
15+
using var args = new SocketAwaitableEventArgs(SocketFlags.None, cancellationToken);
16+
args.RemoteEndPoint = remoteEP;
17+
if (!socket.ConnectAsync(args))
18+
{
19+
args.Complete();
20+
}
21+
await args;
1022
}
1123

12-
internal static ValueTask<int> SendAsync(this Socket socket, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
24+
internal static async ValueTask<int> SendAsync(this Socket socket, ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
1325
{
14-
throw new NotImplementedException();
26+
// this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
27+
using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
28+
args.SetBuffer(buffer);
29+
if (!socket.SendAsync(args))
30+
{
31+
args.Complete();
32+
}
33+
return await args;
1534
}
1635

17-
internal static ValueTask<int> ReceiveAsync(this Socket socket, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
36+
internal static async ValueTask<int> ReceiveAsync(this Socket socket, Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
1837
{
19-
throw new NotImplementedException();
38+
// this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
39+
using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
40+
args.SetBuffer(buffer);
41+
if (!socket.ReceiveAsync(args))
42+
{
43+
args.Complete();
44+
}
45+
return await args;
46+
}
47+
48+
/// <summary>
49+
/// Awaitable SocketAsyncEventArgs, where awaiting the args yields either the BytesTransferred or throws the relevant socket exception,
50+
/// plus support for cancellation via <see cref="SocketError.TimedOut"/>.
51+
/// </summary>
52+
private sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion, IDisposable
53+
{
54+
public new void Dispose()
55+
{
56+
cancelRegistration.Dispose();
57+
base.Dispose();
58+
}
59+
60+
private CancellationTokenRegistration cancelRegistration;
61+
public SocketAwaitableEventArgs(SocketFlags socketFlags, CancellationToken cancellationToken)
62+
{
63+
SocketFlags = socketFlags;
64+
if (cancellationToken.CanBeCanceled)
65+
{
66+
cancellationToken.ThrowIfCancellationRequested();
67+
cancelRegistration = cancellationToken.Register(Timeout);
68+
}
69+
}
70+
71+
public void SetBuffer(ReadOnlyMemory<byte> buffer)
72+
{
73+
if (!MemoryMarshal.TryGetArray(buffer, out var segment)) ThrowNotSupported();
74+
SetBuffer(segment.Array ?? [], segment.Offset, segment.Count);
75+
76+
[DoesNotReturn]
77+
static void ThrowNotSupported() => throw new NotSupportedException("Only array-backed buffers are supported");
78+
}
79+
80+
public void Timeout() => Abort(SocketError.TimedOut);
81+
82+
public void Abort(SocketError error)
83+
{
84+
_forcedError = error;
85+
OnCompleted(this);
86+
}
87+
88+
private volatile SocketError _forcedError; // Success = 0, no field init required
89+
90+
// ReSharper disable once InconsistentNaming
91+
private static readonly Action _callbackCompleted = () => { };
92+
93+
private Action? _callback;
94+
95+
public SocketAwaitableEventArgs GetAwaiter() => this;
96+
97+
/// <summary>
98+
/// Indicates whether the current operation is complete; used as part of "await".
99+
/// </summary>
100+
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
101+
102+
/// <summary>
103+
/// Gets the result of the async operation is complete; used as part of "await".
104+
/// </summary>
105+
public int GetResult()
106+
{
107+
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
108+
109+
_callback = null;
110+
111+
var error = _forcedError;
112+
if (error is SocketError.Success) error = SocketError;
113+
if (error is not SocketError.Success) ThrowSocketException(error);
114+
115+
return BytesTransferred;
116+
117+
static void ThrowSocketException(SocketError e) => throw new SocketException((int)e);
118+
}
119+
120+
/// <summary>
121+
/// Schedules a continuation for this operation; used as part of "await".
122+
/// </summary>
123+
public void OnCompleted(Action continuation)
124+
{
125+
if (ReferenceEquals(Volatile.Read(ref _callback), _callbackCompleted)
126+
|| ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
127+
{
128+
// this is the rare "kinda already complete" case; push to worker to prevent possible stack dive,
129+
// but prefer the custom scheduler when possible
130+
RunOnThreadPool(continuation);
131+
}
132+
}
133+
134+
/// <summary>
135+
/// Schedules a continuation for this operation; used as part of "await".
136+
/// </summary>
137+
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
138+
139+
/// <summary>
140+
/// Marks the operation as complete - this should be invoked whenever a SocketAsyncEventArgs operation returns false.
141+
/// </summary>
142+
public void Complete() => OnCompleted(this);
143+
144+
private static void RunOnThreadPool(Action action)
145+
=> ThreadPool.QueueUserWorkItem(static state => ((Action)state).Invoke(), action);
146+
147+
/// <summary>
148+
/// Invoked automatically when an operation completes asynchronously.
149+
/// </summary>
150+
protected override void OnCompleted(SocketAsyncEventArgs e)
151+
{
152+
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
153+
if (continuation is not null)
154+
{
155+
// continue on the thread-pool
156+
RunOnThreadPool(continuation);
157+
}
158+
}
20159
}
21-
#endif
22160
}
161+
#endif

src/StackExchange.Redis/Configuration/Tunnel.cs

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -57,47 +57,29 @@ private sealed class HttpProxyTunnel : Tunnel
5757
offset += encoding.GetBytes(ep, 0, ep.Length, chunk, offset);
5858
offset += encoding.GetBytes(Suffix, 0, Suffix.Length, chunk, offset);
5959

60-
static void SafeAbort(object? obj)
61-
{
62-
try
63-
{
64-
(obj as SocketAwaitableEventArgs)?.Abort(SocketError.TimedOut);
65-
}
66-
catch { } // best effort only
67-
}
68-
69-
using (var args = new SocketAwaitableEventArgs())
70-
using (cancellationToken.Register(static s => SafeAbort(s), args))
71-
{
72-
args.SetBuffer(chunk, 0, offset);
73-
if (!socket.SendAsync(args)) args.Complete();
74-
await args;
60+
await socket.SendAsync(chunk.AsMemory(0, offset), SocketFlags.None, cancellationToken).ForAwait();
7561

76-
// we expect to see: "HTTP/1.1 200 OK\n"; note our buffer is definitely big enough already
77-
int toRead = Math.Max(encoding.GetByteCount(ExpectedResponse1), encoding.GetByteCount(ExpectedResponse2)), read;
78-
offset = 0;
62+
// we expect to see: "HTTP/1.1 200 OK\n"; note our buffer is definitely big enough already
63+
int toRead = Math.Max(encoding.GetByteCount(ExpectedResponse1), encoding.GetByteCount(ExpectedResponse2)), read;
64+
offset = 0;
7965

80-
var actualResponse = "";
81-
while (toRead > 0 && !actualResponse.EndsWith("\r\n\r\n"))
82-
{
83-
args.SetBuffer(chunk, offset, toRead);
84-
if (!socket.ReceiveAsync(args)) args.Complete();
85-
read = await args;
86-
87-
if (read <= 0) break; // EOF (since we're never doing zero-length reads)
88-
toRead -= read;
89-
offset += read;
66+
var actualResponse = "";
67+
while (toRead > 0 && !actualResponse.EndsWith("\r\n\r\n"))
68+
{
69+
read = await socket.ReceiveAsync(chunk.AsMemory(offset, toRead), SocketFlags.None, cancellationToken).ForAwait();
70+
if (read <= 0) break; // EOF (since we're never doing zero-length reads)
71+
toRead -= read;
72+
offset += read;
9073

91-
actualResponse = encoding.GetString(chunk, 0, offset);
92-
}
93-
if (toRead != 0 && !actualResponse.EndsWith("\r\n\r\n")) throw new EndOfStreamException("EOF negotiating HTTP tunnel");
94-
// lazy
95-
if (ExpectedResponse1 != actualResponse && ExpectedResponse2 != actualResponse)
96-
{
97-
throw new InvalidOperationException("Unexpected response negotiating HTTP tunnel");
98-
}
99-
ArrayPool<byte>.Shared.Return(chunk);
74+
actualResponse = encoding.GetString(chunk, 0, offset);
75+
}
76+
if (toRead != 0 && !actualResponse.EndsWith("\r\n\r\n")) throw new EndOfStreamException("EOF negotiating HTTP tunnel");
77+
// lazy
78+
if (ExpectedResponse1 != actualResponse && ExpectedResponse2 != actualResponse)
79+
{
80+
throw new InvalidOperationException("Unexpected response negotiating HTTP tunnel");
10081
}
82+
ArrayPool<byte>.Shared.Return(chunk);
10183
}
10284
return default; // no need for custom stream wrapper here
10385
}

src/StackExchange.Redis/Delegates.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public static bool IsSingle(this MulticastDelegate handler)
102102
}
103103
}
104104
private static Func<MulticastDelegate, T> GetViaReflection<T>(FieldInfo field)
105-
=> handler => (T)field.GetValue(handler);
105+
=> handler => (T)field.GetValue(handler)!;
106106
#endif
107107

108108
/// <summary>

src/StackExchange.Redis/StackExchange.Redis.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
<!-- needed everywhere (with hack to override to LTS when possible) -->
2222
<PackageReference Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'net8.0'))" Include="Microsoft.Extensions.Logging.Abstractions" VersionOverride="8.0"/>
2323
<PackageReference Condition="!$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'net8.0'))" Include="Microsoft.Extensions.Logging.Abstractions" />
24-
<!--<PackageReference Include="Pipelines.Sockets.Unofficial" />-->
2524

2625
<!-- hashing; net10 version bumps System.Buffers, which is a pain-point on netfx; as such, pin on anything down-level -->
2726
<PackageReference Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'net8.0'))" Include="System.IO.Hashing" />

tests/BasicTest/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class RedisBenchmarks : IDisposable
5151
[GlobalSetup]
5252
public void Setup()
5353
{
54-
// Pipelines.Sockets.Unofficial.SocketConnection.AssertDependencies();
54+
// Dependencies.Assert();
5555
var options = ConfigurationOptions.Parse("127.0.0.1:6379");
5656
connection = ConnectionMultiplexer.Connect(options);
5757
db = connection.GetDatabase(3);

toys/StackExchange.Redis.Server/RespServer.cs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66
using System.IO;
77
using System.IO.Pipelines;
88
using System.Linq;
9+
using System.Net.Sockets;
910
using System.Reflection;
1011
using System.Text;
1112
using System.Threading;
1213
using System.Threading.Tasks;
13-
using Pipelines.Sockets.Unofficial;
14-
using Pipelines.Sockets.Unofficial.Arenas;
1514
using RESPite;
1615
using RESPite.Buffers;
1716
using RESPite.Messages;
@@ -282,8 +281,6 @@ protected virtual void Dispose(bool disposing)
282281
DoShutdown(ShutdownReason.ServerDisposed);
283282
}
284283

285-
private readonly Arena _arena = new();
286-
287284
public virtual RedisServer.Node DefaultNode => null;
288285

289286
public async Task RunClientAsync(IDuplexPipe pipe, RedisServer.Node node = null, object state = null)
@@ -334,25 +331,23 @@ public async Task RunClientAsync(IDuplexPipe pipe, RedisServer.Node node = null,
334331
}
335332
client.Complete();
336333
await output;
334+
client = null; // already completed
337335
}
338-
catch (ConnectionResetException) { }
339-
catch (ObjectDisposedException) { }
336+
catch (SocketException) { } // expected
337+
catch (OperationCanceledException) { } // expected
338+
catch (ObjectDisposedException) { } // expected
340339
catch (Exception ex)
341340
{
342-
if (ex.GetType().Name != nameof(ConnectionResetException))
343-
{
344-
// aspnet core has one too; swallow it by pattern
345-
fault = ex;
346-
throw;
347-
}
341+
// unexpected: report as a failure exit
342+
fault = ex;
348343
}
349344
finally
350345
{
351346
RedisRequest.ReleaseLease(ref commandLease);
352347
client?.Complete(fault);
353348
RemoveClient(client);
354-
try { pipe.Input.Complete(fault); } catch { }
355-
try { pipe.Output.Complete(fault); } catch { }
349+
try { await pipe.Input.CompleteAsync(fault); } catch { }
350+
try { await pipe.Output.CompleteAsync(fault); } catch { }
356351

357352
if (fault != null && !_isShutdown)
358353
{

toys/StackExchange.Redis.Server/RespSocketServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
/* left for ref only
22
using System.Net;
33
using System.Threading.Tasks;
44
using Pipelines.Sockets.Unofficial;
@@ -25,3 +25,4 @@ protected override void Dispose(bool disposing)
2525
}
2626
}
2727
}
28+
*/

0 commit comments

Comments
 (0)