Skip to content

Commit befe691

Browse files
committed
WIP
1 parent 7012510 commit befe691

7 files changed

Lines changed: 89 additions & 309 deletions

File tree

src/RESPite/Buffers/CycleBuffer.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public void DiscardCommitted(long count)
149149
private void DiscardCommittedSlow(long count)
150150
{
151151
DebugCounters.OnDiscardPartial(count);
152+
DebugAssertValid();
152153
#if DEBUG
153154
var originalLength = GetCommittedLength();
154155
var originalCount = count;
@@ -262,7 +263,6 @@ private void DebugAssertValid()
262263

263264
public long GetCommittedLength()
264265
{
265-
DebugAssertValid();
266266
if (ReferenceEquals(startSegment, endSegment))
267267
{
268268
return endSegmentCommitted;
@@ -427,6 +427,10 @@ public Memory<byte> GetUncommittedMemory(int hint = 0)
427427
return MemoryMarshal.AsMemory(GetNextSegment().Memory);
428428
}
429429

430+
/// <summary>
431+
/// This is the available unused buffer space, commonly used as the IO read-buffer to avoid
432+
/// additional buffer-copy operations.
433+
/// </summary>
430434
public int UncommittedAvailable
431435
{
432436
get

src/StackExchange.Redis/ChannelMessageQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private async Task OnMessageAsyncImpl()
279279
try
280280
{
281281
var task = handler?.Invoke(next);
282-
if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
282+
if (task != null && !task.IsCompletedSuccessfully) await task.ForAwait();
283283
}
284284
catch { } // matches MessageCompletable
285285
}

src/StackExchange.Redis/PhysicalConnection.Read.cs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@ internal sealed partial class PhysicalConnection
2222
private volatile ReadStatus _readStatus = ReadStatus.NotStarted;
2323
internal ReadStatus GetReadStatus() => _readStatus;
2424

25-
internal void StartReading() => ReadAllAsync(Stream.Null).RedisFireAndForget();
25+
internal void StartReading() => ReadAllAsync().RedisFireAndForget();
2626

27-
private async Task ReadAllAsync(Stream tail)
27+
private async Task ReadAllAsync()
2828
{
29+
var tail = _ioStream ?? Stream.Null;
2930
_readStatus = ReadStatus.Init;
3031
RespScanState state = default;
31-
var readBuffer = CycleBuffer.Create();
32+
_readBuffer = CycleBuffer.Create();
3233
try
3334
{
3435
int read;
3536
do
3637
{
3738
_readStatus = ReadStatus.ReadAsync;
38-
var buffer = readBuffer.GetUncommittedMemory();
39+
var buffer = _readBuffer.GetUncommittedMemory();
3940
var pending = tail.ReadAsync(buffer, CancellationToken.None);
4041
#if DEBUG
4142
bool inline = pending.IsCompleted;
@@ -49,11 +50,12 @@ private async Task ReadAllAsync(Stream tail)
4950
_readStatus = ReadStatus.TryParseResult;
5051
}
5152
// another formatter glitch
52-
while (CommitAndParseFrames(ref state, ref readBuffer, read));
53+
while (CommitAndParseFrames(ref state, read));
54+
5355
_readStatus = ReadStatus.ProcessBufferComplete;
5456

5557
// Volatile.Write(ref _readStatus, ReaderCompleted);
56-
readBuffer.Release(); // clean exit, we can recycle
58+
_readBuffer.Release(); // clean exit, we can recycle
5759
_readStatus = ReadStatus.RanToCompletion;
5860
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
5961
}
@@ -67,37 +69,56 @@ private async Task ReadAllAsync(Stream tail)
6769
_readStatus = ReadStatus.Faulted;
6870
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
6971
}
72+
finally
73+
{
74+
_readBuffer = default; // wipe, however we exited
75+
}
7076
}
7177

7278
private static byte[]? SharedNoLease;
7379

74-
private bool CommitAndParseFrames(ref RespScanState state, ref CycleBuffer readBuffer, int bytesRead)
80+
private CycleBuffer _readBuffer;
81+
private long GetReadCommittedLength()
82+
{
83+
try
84+
{
85+
var len = _readBuffer.GetCommittedLength();
86+
return len < 0 ? -1 : len;
87+
}
88+
catch
89+
{
90+
return -1;
91+
}
92+
}
93+
94+
private bool CommitAndParseFrames(ref RespScanState state, int bytesRead)
7595
{
7696
if (bytesRead <= 0)
7797
{
7898
return false;
7999
}
80100

101+
totalBytesReceived += bytesRead;
81102
#if PARSE_DETAIL
82103
string src = $"parse {bytesRead}";
83104
try
84105
#endif
85106
{
86-
Debug.Assert(readBuffer.GetCommittedLength() >= 0, "multi-segment running-indices are corrupt");
107+
Debug.Assert(_readBuffer.GetCommittedLength() >= 0, "multi-segment running-indices are corrupt");
87108
#if PARSE_DETAIL
88109
src += $" ({readBuffer.GetCommittedLength()}+{bytesRead}-{state.TotalBytes})";
89110
#endif
90111
Debug.Assert(
91-
bytesRead <= readBuffer.UncommittedAvailable,
92-
$"Insufficient bytes in {nameof(CommitAndParseFrames)}; got {bytesRead}, Available={readBuffer.UncommittedAvailable}");
93-
readBuffer.Commit(bytesRead);
112+
bytesRead <= _readBuffer.UncommittedAvailable,
113+
$"Insufficient bytes in {nameof(CommitAndParseFrames)}; got {bytesRead}, Available={_readBuffer.UncommittedAvailable}");
114+
_readBuffer.Commit(bytesRead);
94115
#if PARSE_DETAIL
95116
src += $",total {readBuffer.GetCommittedLength()}";
96117
#endif
97118
var scanner = RespFrameScanner.Default;
98119

99120
OperationStatus status = OperationStatus.NeedMoreData;
100-
if (readBuffer.TryGetCommitted(out var fullSpan))
121+
if (_readBuffer.TryGetCommitted(out var fullSpan))
101122
{
102123
int fullyConsumed = 0;
103124
var toParse = fullSpan.Slice((int)state.TotalBytes); // skip what we've already parsed
@@ -138,11 +159,11 @@ state is
138159
status = OperationStatus.NeedMoreData;
139160
}
140161

141-
readBuffer.DiscardCommitted(fullyConsumed);
162+
_readBuffer.DiscardCommitted(fullyConsumed);
142163
}
143164
else // the same thing again, but this time with multi-segment sequence
144165
{
145-
var fullSequence = readBuffer.GetAllCommitted();
166+
var fullSequence = _readBuffer.GetAllCommitted();
146167
Debug.Assert(
147168
fullSequence is { IsEmpty: false, IsSingleSegment: false },
148169
"non-trivial sequence expected");
@@ -184,7 +205,7 @@ state is
184205
status = OperationStatus.NeedMoreData;
185206
}
186207

187-
readBuffer.DiscardCommitted(fullyConsumed);
208+
_readBuffer.DiscardCommitted(fullyConsumed);
188209
}
189210

190211
if (status != OperationStatus.NeedMoreData)

0 commit comments

Comments
 (0)