Skip to content

Commit 54a66c6

Browse files
committed
WIP v3 merge attempt
1 parent bf7f846 commit 54a66c6

164 files changed

Lines changed: 12207 additions & 6011 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

StackExchange.Redis.sln.DotSettings

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@
2626
<s:Boolean x:Key="/Default/UserDictionary/Words/=xreadgroup/@EntryIndexedValue">True</s:Boolean>
2727
<s:Boolean x:Key="/Default/UserDictionary/Words/=xrevrange/@EntryIndexedValue">True</s:Boolean>
2828
<s:Boolean x:Key="/Default/UserDictionary/Words/=zcard/@EntryIndexedValue">True</s:Boolean>
29-
<s:Boolean x:Key="/Default/UserDictionary/Words/=zscan/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
29+
<s:Boolean x:Key="/Default/UserDictionary/Words/=zscan/@EntryIndexedValue">True</s:Boolean>
30+
<s:Boolean x:Key="/Default/UserDictionary/Words/=zset/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

docs/exp/SER004.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# RESPite
2+
3+
RESPite is an experimental library that provides high-performance low-level RESP (Redis, etc) parsing and serialization.
4+
It is used as the IO core for StackExchange.Redis v3+. You should not (yet) use it directly unless you have a very
5+
good reason to do so.
6+
7+
```xml
8+
<NoWarn>$(NoWarn);SER004</NoWarn>
9+
```
10+
11+
or more granularly / locally in C#:
12+
13+
``` c#
14+
#pragma warning disable SER004
15+
```

docs/exp/SER005.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Unit Testing
2+
3+
Unit testing is great! Yay, do more of that!
4+
5+
This type is provided for external unit testing, in particular by people using modules or server features
6+
not directly implemented by SE.Redis - for example to verify messsage parsing or formatting without
7+
talking to a RESP server.
8+
9+
These types are considered slightly more... *mercurial*. We encourage you to use them, but *occasionally*
10+
(not just for fun) you might need to update your test code if we tweak something. This should not impact
11+
"real" library usage.
12+
13+
```xml
14+
<NoWarn>$(NoWarn);SER005</NoWarn>
15+
```
16+
17+
or more granularly / locally in C#:
18+
19+
``` c#
20+
#pragma warning disable SER005
21+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#nullable enable
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#nullable enable

src/RESPite/Shared/AsciiHash.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Buffers.Binary;
33
using System.Diagnostics;
44
using System.Diagnostics.CodeAnalysis;

src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using RESPite.Messages;
23

34
namespace StackExchange.Redis;
45

@@ -11,18 +12,14 @@ public readonly struct LatencyHistoryEntry
1112

1213
private sealed class Processor : ArrayResultProcessor<LatencyHistoryEntry>
1314
{
14-
protected override bool TryParse(in RawResult raw, out LatencyHistoryEntry parsed)
15+
protected override bool TryParse(ref RespReader reader, out LatencyHistoryEntry parsed)
1516
{
16-
if (raw.Resp2TypeArray == ResultType.Array)
17+
if (reader.IsAggregate
18+
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
19+
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration))
1720
{
18-
var items = raw.GetItems();
19-
if (items.Length >= 2
20-
&& items[0].TryGetInt64(out var timestamp)
21-
&& items[1].TryGetInt64(out var duration))
22-
{
23-
parsed = new LatencyHistoryEntry(timestamp, duration);
24-
return true;
25-
}
21+
parsed = new LatencyHistoryEntry(timestamp, duration);
22+
return true;
2623
}
2724
parsed = default;
2825
return false;

src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using RESPite.Messages;
23

34
namespace StackExchange.Redis;
45

@@ -11,17 +12,17 @@ public readonly struct LatencyLatestEntry
1112

1213
private sealed class Processor : ArrayResultProcessor<LatencyLatestEntry>
1314
{
14-
protected override bool TryParse(in RawResult raw, out LatencyLatestEntry parsed)
15+
protected override bool TryParse(ref RespReader reader, out LatencyLatestEntry parsed)
1516
{
16-
if (raw.Resp2TypeArray == ResultType.Array)
17+
if (reader.IsAggregate && reader.TryMoveNext() && reader.IsScalar)
1718
{
18-
var items = raw.GetItems();
19-
if (items.Length >= 4
20-
&& items[1].TryGetInt64(out var timestamp)
21-
&& items[2].TryGetInt64(out var duration)
22-
&& items[3].TryGetInt64(out var maxDuration))
19+
var eventName = reader.ReadString()!;
20+
21+
if (reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
22+
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration)
23+
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var maxDuration))
2324
{
24-
parsed = new LatencyLatestEntry(items[0].GetString()!, timestamp, duration, maxDuration);
25+
parsed = new LatencyLatestEntry(eventName, timestamp, duration, maxDuration);
2526
return true;
2627
}
2728
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
using System;
2+
using System.IO;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using System.Threading.Tasks.Sources;
6+
7+
namespace StackExchange.Redis;
8+
9+
internal sealed class BufferedAsyncStreamWriter : CycleBufferStreamWriter, IValueTaskSource
10+
{
11+
private ManualResetValueTaskSourceCore<bool> _readerTask;
12+
13+
public BufferedAsyncStreamWriter(Stream target, CancellationToken cancellationToken = default)
14+
: base(target, cancellationToken)
15+
{
16+
WriteComplete = Task.Run(CopyOutAsync, cancellationToken);
17+
_readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying
18+
}
19+
20+
public override Task WriteComplete { get; }
21+
22+
private async Task CopyOutAsync()
23+
{
24+
try
25+
{
26+
while (true)
27+
{
28+
ValueTask pending = AwaitWake();
29+
if (!pending.IsCompleted)
30+
{
31+
lock (this)
32+
{
33+
// double-checked marking inactive
34+
if (!pending.IsCompleted) OnWriterInactive(); // update state flags
35+
}
36+
}
37+
// await activation and check status;
38+
await pending.ConfigureAwait(false);
39+
40+
StateFlags stateFlags;
41+
while (true)
42+
{
43+
ReadOnlyMemory<byte> memory;
44+
lock (this)
45+
{
46+
stateFlags = State;
47+
var minBytes = (stateFlags & StateFlags.Flush) == 0 ? -1 : 1;
48+
if (!GetFirstChunkInsideLock(minBytes, out memory))
49+
{
50+
// out of data; remove flush flag and wait for more work
51+
stateFlags &= ~StateFlags.Flush;
52+
break;
53+
}
54+
}
55+
56+
if (IsFaulted) ThrowCompleteOrFaulted(); // this is cheap to check ongoing
57+
if (!memory.IsEmpty)
58+
{
59+
OnWritten(memory.Length);
60+
OnDebugBufferLog(memory);
61+
62+
await Target.WriteAsync(memory, CancellationToken).ConfigureAwait(false);
63+
}
64+
65+
lock (this)
66+
{
67+
DiscardCommitted(memory.Length);
68+
}
69+
}
70+
await Target.FlushAsync(CancellationToken).ConfigureAwait(false);
71+
72+
if ((stateFlags & StateFlags.Closed) != 0) break;
73+
}
74+
75+
// recycle on clean exit (only), since we know the buffers aren't being used
76+
lock (this)
77+
{
78+
ReleaseBuffer();
79+
}
80+
}
81+
catch (Exception ex)
82+
{
83+
Complete(ex);
84+
}
85+
// note we do *not* close the stream here - we have to settle for flushing; Close is explicit
86+
}
87+
88+
private ValueTask AwaitWake()
89+
{
90+
lock (this) // guard all transitions
91+
{
92+
return new(this, _readerTask.Version);
93+
}
94+
}
95+
96+
void IValueTaskSource.GetResult(short token)
97+
{
98+
lock (this) // guard all transitions
99+
{
100+
_readerTask.GetResult(token); // may throw, note
101+
_readerTask.Reset();
102+
}
103+
}
104+
105+
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
106+
{
107+
lock (this) // guard all transitions
108+
{
109+
return _readerTask.GetStatus(token);
110+
}
111+
}
112+
113+
void IValueTaskSource.OnCompleted(
114+
Action<object?> continuation,
115+
object? state,
116+
short token,
117+
ValueTaskSourceOnCompletedFlags flags)
118+
{
119+
lock (this) // guard all transitions
120+
{
121+
_readerTask.OnCompleted(continuation, state, token, flags);
122+
}
123+
}
124+
125+
protected override void OnWakeReader()
126+
{
127+
lock (this) // guard all transitions
128+
{
129+
_readerTask.SetResult(true);
130+
}
131+
}
132+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.IO;
3+
using System.IO.Pipelines;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace StackExchange.Redis;
8+
9+
internal sealed class PipeStreamWriter : BufferedStreamWriter
10+
{
11+
private readonly PipeWriter _writer;
12+
13+
public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default)
14+
: base(target, cancellationToken)
15+
{
16+
var pipe = new Pipe();
17+
WriteComplete = pipe.Reader.CopyToAsync(Target, cancellationToken);
18+
_writer = pipe.Writer;
19+
}
20+
21+
public override Task WriteComplete { get; }
22+
23+
private long _nonFlushed;
24+
public override void Advance(int count)
25+
{
26+
_nonFlushed += count;
27+
_writer.Advance(count);
28+
}
29+
30+
public override void Flush()
31+
{
32+
var tmp = _nonFlushed;
33+
_nonFlushed = 0;
34+
OnWritten(tmp);
35+
var pending = _writer.FlushAsync();
36+
if (pending.IsCompleted)
37+
{
38+
pending.GetAwaiter().GetResult();
39+
}
40+
else
41+
{
42+
// this is bad, but: this type is a temporary kludge while I fix a bug;
43+
// this only happens during back-pressure events, which should be rare
44+
pending.AsTask().Wait(CancellationToken);
45+
}
46+
}
47+
48+
public override Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);
49+
50+
public override Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);
51+
52+
public override void Complete(Exception? exception = null) => _writer.Complete(exception);
53+
}

0 commit comments

Comments
 (0)