Skip to content

Commit fdf1c52

Browse files
committed
fix toy server output
1 parent 014eaed commit fdf1c52

16 files changed

Lines changed: 333 additions & 152 deletions

File tree

src/RESPite/Buffers/CycleBuffer.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ namespace RESPite.Buffers;
3030
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
3131
public partial struct CycleBuffer
3232
{
33+
#if TRACK_MEMORY
34+
private static MemoryPool<byte> DefaultPool => MemoryTrackedPool<byte>.Shared;
35+
#else
36+
private static MemoryPool<byte> DefaultPool => MemoryPool<byte>.Shared;
37+
#endif
38+
3339
// note: if someone uses an uninitialized CycleBuffer (via default): that's a skills issue; git gud
3440
public static CycleBuffer Create(
3541
MemoryPool<byte>? pool = null,
3642
int pageSize = DefaultPageSize,
3743
ICycleBufferCallback? callback = null)
3844
{
39-
pool ??= MemoryPool<byte>.Shared;
45+
pool ??= DefaultPool;
4046
if (pageSize <= 0) pageSize = DefaultPageSize;
4147
if (pageSize > pool.MaxBufferSize) pageSize = pool.MaxBufferSize;
4248
return new CycleBuffer(pool, pageSize, callback);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#if TRACK_MEMORY
2+
using System.Buffers;
3+
using System.Diagnostics.CodeAnalysis;
4+
5+
namespace RESPite.Buffers;
6+
7+
internal sealed class MemoryTrackedPool<T> : MemoryPool<T>
8+
{
9+
// like MemoryPool<T>, but tracks and reports double disposal via a custom memory manager, which
10+
// allows all future use of a Memory<T> to be tracked; contrast ArrayMemoryPool<T>, which only tracks
11+
// the initial fetch of .Memory from the lease
12+
public override IMemoryOwner<T> Rent(int minBufferSize = -1) => MemoryManager.Rent(minBufferSize);
13+
14+
protected override void Dispose(bool disposing)
15+
{
16+
}
17+
18+
// ReSharper disable once ArrangeModifiersOrder - you're wrong
19+
public static new MemoryTrackedPool<T> Shared { get; } = new();
20+
21+
public override int MaxBufferSize => MemoryPool<T>.Shared.MaxBufferSize;
22+
23+
private MemoryTrackedPool()
24+
{
25+
}
26+
27+
private sealed class MemoryManager : MemoryManager<T>
28+
{
29+
public static IMemoryOwner<T> Rent(int minBufferSize = -1) => new MemoryManager(minBufferSize);
30+
31+
private T[]? array;
32+
private MemoryManager(int minBufferSize)
33+
{
34+
array = ArrayPool<T>.Shared.Rent(Math.Max(64, minBufferSize));
35+
}
36+
37+
private T[] CheckDisposed()
38+
{
39+
return array ?? Throw();
40+
[DoesNotReturn]
41+
static T[] Throw() => throw new ObjectDisposedException("Use-after-free of Memory-" + typeof(T).Name);
42+
}
43+
44+
public override MemoryHandle Pin(int elementIndex = 0) => throw new NotSupportedException(nameof(Pin));
45+
46+
public override void Unpin() => throw new NotSupportedException(nameof(Unpin));
47+
48+
public override Span<T> GetSpan() => CheckDisposed();
49+
50+
protected override bool TryGetArray(out ArraySegment<T> segment)
51+
{
52+
segment = new ArraySegment<T>(CheckDisposed());
53+
return true;
54+
}
55+
56+
protected override void Dispose(bool disposing)
57+
{
58+
var arr = Interlocked.Exchange(ref array, null);
59+
if (arr is not null) ArrayPool<T>.Shared.Return(arr);
60+
}
61+
}
62+
}
63+
#endif

src/RESPite/Messages/RespReader.Debug.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using System.Diagnostics;
1+
using System.Buffers;
2+
using System.Diagnostics;
3+
using System.Text;
24

35
#pragma warning disable IDE0079 // Remove unnecessary suppression
46
#pragma warning disable CS0282 // There is no defined ordering between fields in multiple declarations of partial struct
@@ -30,4 +32,28 @@ internal void DebugReset()
3032
#if DEBUG
3133
internal bool VectorizeDisabled { get; set; }
3234
#endif
35+
36+
private partial ReadOnlySpan<byte> ActiveBuffer { get; }
37+
38+
internal readonly string BufferUtf8()
39+
{
40+
var clone = Clone();
41+
var active = clone.ActiveBuffer;
42+
var totalLen = checked((int)(active.Length + clone._remainingTailLength));
43+
var oversized = ArrayPool<byte>.Shared.Rent(totalLen);
44+
Span<byte> target = oversized.AsSpan(0, totalLen);
45+
46+
while (!target.IsEmpty)
47+
{
48+
active.CopyTo(target);
49+
target = target.Slice(active.Length);
50+
if (!clone.TryMoveToNextSegment()) break;
51+
active = clone.ActiveBuffer;
52+
}
53+
if (!target.IsEmpty) throw new EndOfStreamException();
54+
55+
var s = Encoding.UTF8.GetString(oversized, 0, totalLen);
56+
ArrayPool<byte>.Shared.Return(oversized);
57+
return s;
58+
}
3359
}

src/RESPite/Messages/RespReader.Span.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ private partial void SetCurrent(ReadOnlySpan<byte> value)
5252
_bufferRoot = ref MemoryMarshal.GetReference(value);
5353
_bufferLength = value.Length;
5454
}
55+
private partial ReadOnlySpan<byte> ActiveBuffer => MemoryMarshal.CreateReadOnlySpan(ref _bufferRoot, _bufferLength);
5556
}
5657
#else
5758
public ref partial struct RespReader // much more conservative - uses slices etc
@@ -80,5 +81,6 @@ private readonly partial int CurrentLength
8081
private readonly partial ReadOnlySpan<byte> CurrentSpan() => _buffer.Slice(_bufferIndex);
8182

8283
private partial void SetCurrent(ReadOnlySpan<byte> value) => _buffer = value;
84+
private partial ReadOnlySpan<byte> ActiveBuffer => _buffer;
8385
}
8486
#endif

src/RESPite/Messages/RespReader.Utils.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ private readonly LengthPrefixResult TryReadLengthPrefix(ReadOnlySpan<byte> bytes
9090
}
9191
}
9292

93-
private readonly RespReader Clone() => this; // useful for performing streaming operations without moving the primary
93+
/// <summary>
94+
/// Create an isolated copy of this reader, which can be advanced independently.
95+
/// </summary>
96+
public readonly RespReader Clone() => this; // useful for performing streaming operations without moving the primary
9497

9598
[MethodImpl(MethodImplOptions.NoInlining), DoesNotReturn]
9699
private readonly void ThrowProtocolFailure(string message)

src/RESPite/Messages/RespReader.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public readonly int ScalarLength() =>
110110
public readonly bool ScalarIsEmpty() =>
111111
IsInlineScalar ? _length == 0 : (IsNullScalar || !ScalarChunks().MoveNext());
112112

113+
/// <summary>
114+
/// Indicates whether this aggregate value is zero-length.
115+
/// </summary>
116+
public readonly bool AggregateIsEmpty() => AggregateLengthIs(0);
117+
113118
/// <summary>
114119
/// The payload length of this scalar element (includes combined length for streaming scalars).
115120
/// </summary>

src/RESPite/PublicAPI/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
[SER004]RESPite.Messages.RespReader.AggregateEnumerator.FillAll<TState, TResult>(scoped System.Span<TResult> target, ref TState state, RESPite.Messages.RespReader.Projection<TState, TResult>! projection) -> void
4949
[SER004]RESPite.Messages.RespReader.AggregateEnumerator.MoveNextRaw() -> bool
5050
[SER004]RESPite.Messages.RespReader.AggregateEnumerator.MoveNextRaw<T>(RESPite.Messages.RespAttributeReader<T>! respAttributeReader, ref T attributes) -> bool
51+
[SER004]RESPite.Messages.RespReader.AggregateIsEmpty() -> bool
5152
[SER004]RESPite.Messages.RespReader.AggregateLengthIs(int count) -> bool
53+
[SER004]RESPite.Messages.RespReader.Clone() -> RESPite.Messages.RespReader
5254
[SER004]RESPite.Messages.RespReader.FillAll<TState, TResult>(scoped System.Span<TResult> target, ref TState state, RESPite.Messages.RespReader.Projection<TState, TResult>! projection) -> void
5355
[SER004]RESPite.Messages.RespReader.Projection<TState, TResult>
5456
[SER004]RESPite.Messages.RespReader.ReadArray<TState, TResult>(ref TState state, RESPite.Messages.RespReader.Projection<TState, TResult>! projection, bool scalar = false) -> TResult[]?

src/StackExchange.Redis/RespReaderExtensions.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,34 @@ public string DebugReadTruncatedString(int maxChars)
4141
public RedisChannel ReadRedisChannel(RedisChannel.RedisChannelOptions options)
4242
=> new(reader.ReadByteArray(), options);
4343

44+
private bool TryGetFirst(out string first)
45+
{
46+
if (reader.IsNonNullAggregate && !reader.AggregateIsEmpty())
47+
{
48+
var clone = reader.Clone();
49+
if (clone.TryMoveNext())
50+
{
51+
unsafe
52+
{
53+
if (clone.IsScalar &&
54+
clone.TryParseScalar(&PhysicalConnection.PushKindMetadata.TryParse, out PhysicalConnection.PushKind kind))
55+
{
56+
first = kind.ToString();
57+
return true;
58+
}
59+
}
60+
61+
first = clone.GetOverview();
62+
return true;
63+
}
64+
}
65+
first = "";
66+
return false;
67+
}
68+
4469
public string GetOverview()
4570
{
71+
// return reader.BufferUtf8(); // <== for when you really can't grok what is happening
4672
if (reader.Prefix is RespPrefix.None)
4773
{
4874
var copy = reader;
@@ -54,6 +80,7 @@ public string GetOverview()
5480
return reader.Prefix switch
5581
{
5682
RespPrefix.SimpleString or RespPrefix.Integer or RespPrefix.SimpleError or RespPrefix.Double => $"{reader.Prefix}: {reader.ReadString()}",
83+
RespPrefix.Push when reader.TryGetFirst(out var first) => $"{reader.Prefix} ({first}): {reader.AggregateLength()} items",
5784
_ when reader.IsScalar => $"{reader.Prefix}: {reader.ScalarLength()} bytes, '{reader.DebugReadTruncatedString(16)}'",
5885
_ when reader.IsAggregate => $"{reader.Prefix}: {reader.AggregateLength()} items",
5986
_ => $"(unknown: {reader.Prefix})",

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,21 @@ protected override void OnOutOfBand(RedisClient client, TypedRedisValue message)
129129
base.OnOutOfBand(client, message);
130130
}
131131

132+
/*
133+
public override void OnFlush(RedisClient client, int messages, long bytes)
134+
{
135+
if (bytes >= 0)
136+
{
137+
_log?.WriteLine($"[{client}] flushed {messages} messages, {bytes} bytes");
138+
}
139+
else
140+
{
141+
_log?.WriteLine($"[{client}] flushed {messages} messages"); // bytes not available
142+
}
143+
base.OnFlush(client, messages, bytes);
144+
}
145+
*/
146+
132147
public override TypedRedisValue OnUnknownCommand(in RedisClient client, in RedisRequest request, ReadOnlySpan<byte> command)
133148
{
134149
_log?.WriteLine($"[{client}] unknown command: {Encoding.ASCII.GetString(command)}");

tests/StackExchange.Redis.Tests/PubSubTests.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ public async Task TestBasicPubSub(string? channelPrefix, bool wildCard, string b
154154
Assert.Equal(0, count);
155155
}
156156

157+
[Fact]
158+
public async Task Ping()
159+
{
160+
await using var conn = ConnectFactory(shared: false);
161+
var pub = GetAnyPrimary(conn.DefaultClient);
162+
var sub = conn.GetSubscriber();
163+
await sub.SubscribeAsync(RedisChannel.Literal(Me()), (_, __) => { }); // to ensure we're in subscriber mode
164+
await PingAsync(pub, sub, 5).ForAwait();
165+
}
166+
157167
[Fact]
158168
public async Task TestBasicPubSubFireAndForget()
159169
{
@@ -313,6 +323,7 @@ public async Task TestMassivePublishWithWithoutFlush_Local()
313323
public async Task TestMassivePublishWithWithoutFlush_Remote()
314324
{
315325
Skip.UnlessLongRunning();
326+
SkipIfWouldUseInProcessServer();
316327
await using var conn = Create(configuration: TestConfig.Current.RemoteServerAndPort);
317328

318329
var sub = conn.GetSubscriber();
@@ -436,6 +447,7 @@ await sub.SubscribeAsync(channel, (_, val) =>
436447
[Fact]
437448
public async Task PubSubGetAllCorrectOrder()
438449
{
450+
SkipIfWouldUseInProcessServer();
439451
await using (var conn = Create(configuration: TestConfig.Current.RemoteServerAndPort, syncTimeout: 20000, log: Writer))
440452
{
441453
var sub = conn.GetSubscriber();
@@ -506,6 +518,7 @@ async Task RunLoop()
506518
[Fact]
507519
public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
508520
{
521+
SkipIfWouldUseInProcessServer();
509522
await using (var conn = Create(configuration: TestConfig.Current.RemoteServerAndPort, syncTimeout: 20000, log: Writer))
510523
{
511524
var sub = conn.GetSubscriber();
@@ -572,6 +585,7 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Sync()
572585
[Fact]
573586
public async Task PubSubGetAllCorrectOrder_OnMessage_Async()
574587
{
588+
SkipIfWouldUseInProcessServer();
575589
await using (var conn = Create(configuration: TestConfig.Current.RemoteServerAndPort, syncTimeout: 20000, log: Writer))
576590
{
577591
var sub = conn.GetSubscriber();

0 commit comments

Comments
 (0)