Skip to content

Commit ac66774

Browse files
committed
feat: introduce shared Cosmo.Transport.Pipelines library and MSSQL prototype
1 parent 1b7185a commit ac66774

11 files changed

Lines changed: 598 additions & 1 deletion

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>1.6.1</Version>
10+
<Version>1.7.0</Version>
1111
</PropertyGroup>
1212
</Project>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>net10.0;netstandard2.0</TargetFrameworks>
5+
<AssemblyName>Cosmo.Transport.Pipelines</AssemblyName>
6+
<RootNamespace>Cosmo.Transport.Pipelines</RootNamespace>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="System.IO.Pipelines" Version="10.0.0" />
11+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
12+
</ItemGroup>
13+
14+
</Project>
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
using System.Buffers;
2+
using System.Text;
3+
4+
namespace Cosmo.Transport.Pipelines;
5+
6+
/// <summary>
7+
/// Reusable high-performance parsing extensions for System.IO.Pipelines.
8+
/// </summary>
9+
public static class PipelineExtensions
10+
{
11+
public static bool TryReadInt32BigEndian(this ref SequenceReader<byte> reader, out int value)
12+
{
13+
if (reader.TryRead(out byte b1) && reader.TryRead(out byte b2) &&
14+
reader.TryRead(out byte b3) && reader.TryRead(out byte b4))
15+
{
16+
value = (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
17+
return true;
18+
}
19+
value = 0;
20+
return false;
21+
}
22+
23+
public static string ReadUtf8String(this ref SequenceReader<byte> reader, int byteLength)
24+
{
25+
if (reader.Remaining < byteLength) throw new InvalidOperationException("Not enough data.");
26+
27+
byte[] buffer = new byte[byteLength];
28+
reader.TryCopyTo(buffer);
29+
reader.Advance(byteLength);
30+
return Encoding.UTF8.GetString(buffer);
31+
}
32+
33+
public static string ReadUtf16String(this ref SequenceReader<byte> reader, int charCount)
34+
{
35+
int byteLen = charCount * 2;
36+
if (reader.Remaining < byteLen) throw new InvalidOperationException("Not enough data.");
37+
38+
byte[] buffer = new byte[byteLen];
39+
reader.TryCopyTo(buffer);
40+
reader.Advance(byteLen);
41+
return Encoding.Unicode.GetString(buffer);
42+
}
43+
44+
public static string ReadAsciiString(this ref SequenceReader<byte> reader, long byteLength)
45+
{
46+
if (reader.Remaining < byteLength) throw new InvalidOperationException("Not enough data.");
47+
48+
byte[] buffer = new byte[byteLength];
49+
reader.TryCopyTo(buffer);
50+
reader.Advance(byteLength);
51+
return Encoding.ASCII.GetString(buffer);
52+
}
53+
54+
/// <summary>
55+
/// Reads a line ending in \n (and strips trailing \r if present).
56+
/// Returns null if no \n is found.
57+
/// </summary>
58+
public static string? ReadLine(this ref SequenceReader<byte> reader)
59+
{
60+
if (!reader.TryReadTo(out ReadOnlySequence<byte> lineSeq, (byte)'\n'))
61+
return null;
62+
63+
var span = lineSeq.IsSingleSegment ? lineSeq.FirstSpan : lineSeq.ToArray().AsSpan();
64+
if (!span.IsEmpty && span[^1] == '\r') span = span[..^1];
65+
66+
return Encoding.ASCII.GetString(
67+
#if NETSTANDARD2_0
68+
span.ToArray()
69+
#else
70+
span
71+
#endif
72+
);
73+
}
74+
75+
public static bool TryReadLittleEndian(this ref SequenceReader<byte> reader, out ushort value)
76+
{
77+
if (reader.TryRead(out byte b1) && reader.TryRead(out byte b2))
78+
{
79+
value = (ushort)(b1 | (b2 << 8));
80+
return true;
81+
}
82+
value = 0;
83+
return false;
84+
}
85+
86+
public static bool TryReadLittleEndian(this ref SequenceReader<byte> reader, out uint value)
87+
{
88+
if (reader.TryRead(out byte b1) && reader.TryRead(out byte b2) &&
89+
reader.TryRead(out byte b3) && reader.TryRead(out byte b4))
90+
{
91+
value = (uint)(b1 | (b2 << 8) | (b3 << 16) | (b4 << 24));
92+
return true;
93+
}
94+
value = 0;
95+
return false;
96+
}
97+
98+
public static bool TryReadLittleEndian(this ref SequenceReader<byte> reader, out ulong value)
99+
{
100+
if (TryReadLittleEndian(ref reader, out uint low) && TryReadLittleEndian(ref reader, out uint high))
101+
{
102+
value = ((ulong)high << 32) | low;
103+
return true;
104+
}
105+
value = 0;
106+
return false;
107+
}
108+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using System.IO.Pipelines;
2+
using System.Net.Sockets;
3+
using System.Runtime.InteropServices;
4+
5+
namespace Cosmo.Transport.Pipelines;
6+
7+
/// <summary>
8+
/// A high-performance wrapper for a Socket using System.IO.Pipelines.
9+
/// </summary>
10+
public sealed class PipelineSocketTransport : IAsyncDisposable
11+
{
12+
private readonly Socket _socket;
13+
public PipeReader Reader { get; }
14+
public PipeWriter Writer { get; }
15+
16+
public PipelineSocketTransport(Socket socket)
17+
{
18+
_socket = socket;
19+
_socket.NoDelay = true;
20+
21+
var stream = new NetworkStream(_socket, ownsSocket: true);
22+
Reader = PipeReader.Create(stream);
23+
Writer = PipeWriter.Create(stream);
24+
}
25+
26+
/// <summary>
27+
/// Manually fills a PipeWriter from a socket.
28+
/// Use this when you want more control than PipeReader.Create(stream).
29+
/// </summary>
30+
public static async Task FillPipeAsync(Socket socket, PipeWriter writer, CancellationToken ct)
31+
{
32+
try
33+
{
34+
while (!ct.IsCancellationRequested)
35+
{
36+
var memory = writer.GetMemory(4096);
37+
#if NETSTANDARD2_0
38+
if (!MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
39+
{
40+
// Fallback for cases where memory is not backed by an array
41+
byte[] buffer = new byte[memory.Length];
42+
int read = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
43+
if (read == 0) break;
44+
buffer.AsSpan(0, read).CopyTo(memory.Span);
45+
writer.Advance(read);
46+
}
47+
else
48+
{
49+
int bytesRead = await socket.ReceiveAsync(segment, SocketFlags.None).ConfigureAwait(false);
50+
if (bytesRead == 0) break;
51+
writer.Advance(bytesRead);
52+
}
53+
#else
54+
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None, ct);
55+
if (bytesRead == 0) break;
56+
writer.Advance(bytesRead);
57+
#endif
58+
var flush = await writer.FlushAsync(ct);
59+
if (flush.IsCompleted) break;
60+
}
61+
}
62+
catch (OperationCanceledException) { }
63+
finally
64+
{
65+
await writer.CompleteAsync();
66+
}
67+
}
68+
69+
public async ValueTask DisposeAsync()
70+
{
71+
await Reader.CompleteAsync().ConfigureAwait(false);
72+
await Writer.CompleteAsync().ConfigureAwait(false);
73+
_socket.Dispose();
74+
}
75+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#if !NET10_0_OR_GREATER
2+
3+
namespace System.Runtime.CompilerServices
4+
{
5+
using System.ComponentModel;
6+
7+
[EditorBrowsable(EditorBrowsableState.Never)]
8+
internal static class IsExternalInit { }
9+
10+
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Field | AttributeTargets.Property, AllowMultiple = false, Inherited = false)]
11+
internal sealed class RequiredMemberAttribute : Attribute { }
12+
13+
[AttributeUsage(AttributeTargets.All, AllowMultiple = true, Inherited = false)]
14+
internal sealed class CompilerFeatureRequiredAttribute : Attribute
15+
{
16+
public CompilerFeatureRequiredAttribute(string featureName) { FeatureName = featureName; }
17+
public string FeatureName { get; }
18+
public bool IsOptional { get; set; }
19+
public const string RequiredMembers = nameof(RequiredMembers);
20+
public const string RefStructs = nameof(RefStructs);
21+
}
22+
}
23+
24+
namespace System.Diagnostics.CodeAnalysis
25+
{
26+
[AttributeUsage(AttributeTargets.Constructor, AllowMultiple = false, Inherited = false)]
27+
internal sealed class SetsRequiredMembersAttribute : Attribute { }
28+
29+
[AttributeUsage(AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.Property, Inherited = false)]
30+
internal sealed class AllowNullAttribute : Attribute { }
31+
32+
[AttributeUsage(AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.Property, Inherited = false)]
33+
internal sealed class DisallowNullAttribute : Attribute { }
34+
}
35+
36+
#endif
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
using System.Buffers;
2+
using System.Runtime.CompilerServices;
3+
4+
namespace Cosmo.Transport.Pipelines;
5+
6+
#if !NETSTANDARD2_1_OR_GREATER && !NETCOREAPP3_0_OR_GREATER && !NET5_0_OR_GREATER
7+
8+
/// <summary>
9+
/// A polyfill for SequenceReader for .NET Standard 2.0.
10+
/// </summary>
11+
public ref struct SequenceReader<T> where T : unmanaged, IEquatable<T>
12+
{
13+
private ReadOnlySequence<T> _sequence;
14+
private SequencePosition _currentPosition;
15+
private SequencePosition _nextPosition;
16+
private ReadOnlySpan<T> _currentSpan;
17+
private int _index;
18+
private long _consumed;
19+
20+
public SequenceReader(ReadOnlySequence<T> sequence)
21+
{
22+
_sequence = sequence;
23+
_currentPosition = sequence.Start;
24+
_nextPosition = _currentPosition;
25+
_currentSpan = default;
26+
_index = 0;
27+
_consumed = 0;
28+
29+
GetNextSpan();
30+
}
31+
32+
public bool End => _index >= _currentSpan.Length && _consumed >= _sequence.Length;
33+
public long Remaining => _sequence.Length - _consumed;
34+
public long Consumed => _consumed;
35+
public ReadOnlySpan<T> UnreadSpan => _currentSpan.Slice(_index);
36+
37+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
38+
public bool TryRead(out T value)
39+
{
40+
if (End)
41+
{
42+
value = default;
43+
return false;
44+
}
45+
46+
value = _currentSpan[_index];
47+
_index++;
48+
_consumed++;
49+
50+
if (_index >= _currentSpan.Length)
51+
{
52+
GetNextSpan();
53+
}
54+
55+
return true;
56+
}
57+
58+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
59+
public void Advance(long count)
60+
{
61+
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
62+
63+
while (count > 0 && !End)
64+
{
65+
int remainingInSpan = _currentSpan.Length - _index;
66+
if (remainingInSpan > count)
67+
{
68+
_index += (int)count;
69+
_consumed += count;
70+
count = 0;
71+
}
72+
else
73+
{
74+
_consumed += remainingInSpan;
75+
count -= remainingInSpan;
76+
GetNextSpan();
77+
}
78+
}
79+
}
80+
81+
public void TryCopyTo(Span<T> destination)
82+
{
83+
if (Remaining < destination.Length) throw new InvalidOperationException("Not enough data.");
84+
85+
int copied = 0;
86+
while (copied < destination.Length)
87+
{
88+
int toCopy = Math.Min(_currentSpan.Length - _index, destination.Length - copied);
89+
_currentSpan.Slice(_index, toCopy).CopyTo(destination.Slice(copied));
90+
91+
copied += toCopy;
92+
Advance(toCopy);
93+
}
94+
}
95+
96+
private void GetNextSpan()
97+
{
98+
if (_sequence.TryGet(ref _nextPosition, out ReadOnlyMemory<T> memory))
99+
{
100+
_currentSpan = memory.Span;
101+
_index = 0;
102+
}
103+
else
104+
{
105+
_currentSpan = default;
106+
_index = 0;
107+
}
108+
}
109+
}
110+
111+
#endif
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>net10.0;netstandard2.0</TargetFrameworks>
5+
<AssemblyName>CosmoSQLClient.MsSql.Pipelines</AssemblyName>
6+
<RootNamespace>CosmoSQLClient.MsSql.Pipelines</RootNamespace>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<ProjectReference Include="..\CosmoSQLClient.Core\CosmoSQLClient.Core.csproj" />
11+
<ProjectReference Include="..\Cosmo.Transport.Pipelines\Cosmo.Transport.Pipelines.csproj" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<PackageReference Include="System.IO.Pipelines" Version="10.0.0" />
16+
</ItemGroup>
17+
18+
</Project>

0 commit comments

Comments
 (0)