Skip to content

Commit c64f0d9

Browse files
committed
start migrating processors
1 parent d09b8f6 commit c64f0d9

3 files changed

Lines changed: 131 additions & 63 deletions

File tree

src/StackExchange.Redis/RespReaderExtensions.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,26 @@ public RespPrefix GetFirstPrefix()
5353
}
5454
return prefix;
5555
}
56+
57+
public bool AggregateHasAtLeast(int count)
58+
{
59+
reader.DemandAggregate();
60+
if (reader.IsNull) return false;
61+
if (reader.IsStreaming) return CheckStreamingAggregateAtLeast(in reader, count);
62+
return reader.AggregateLength() >= count;
63+
64+
static bool CheckStreamingAggregateAtLeast(in RespReader reader, int count)
65+
{
66+
var iter = reader.AggregateChildren();
67+
object? attributes = null;
68+
while (count > 0 && iter.MoveNextRaw(null!, ref attributes))
69+
{
70+
count--;
71+
}
72+
73+
return count == 0;
74+
}
75+
}
5676
}
5777

5878
extension(ref RespReader reader)

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 57 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,20 +1312,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
13121312

13131313
private sealed class DoubleProcessor : ResultProcessor<double>
13141314
{
1315-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1315+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
13161316
{
1317-
switch (result.Resp2TypeBulkString)
1317+
switch (reader.Resp2PrefixBulkString)
13181318
{
1319-
case ResultType.Integer:
1320-
if (result.TryGetInt64(out long i64))
1319+
case RespPrefix.Integer:
1320+
if (reader.TryReadInt64(out long i64))
13211321
{
13221322
SetResult(message, i64);
13231323
return true;
13241324
}
13251325
break;
1326-
case ResultType.SimpleString:
1327-
case ResultType.BulkString:
1328-
if (result.TryGetDouble(out double val))
1326+
case RespPrefix.SimpleString:
1327+
case RespPrefix.BulkString:
1328+
if (reader.TryReadDouble(out double val))
13291329
{
13301330
SetResult(message, val);
13311331
return true;
@@ -1403,14 +1403,14 @@ private sealed class Int64DefaultValueProcessor : ResultProcessor<long>
14031403

14041404
public Int64DefaultValueProcessor(long defaultValue) => _defaultValue = defaultValue;
14051405

1406-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1406+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
14071407
{
1408-
if (result.IsNull)
1408+
if (reader.IsNull)
14091409
{
14101410
SetResult(message, _defaultValue);
14111411
return true;
14121412
}
1413-
if (result.Resp2TypeBulkString == ResultType.Integer && result.TryGetInt64(out var i64))
1413+
if (reader.Resp2PrefixBulkString == RespPrefix.Integer && reader.TryReadInt64(out var i64))
14141414
{
14151415
SetResult(message, i64);
14161416
return true;
@@ -1421,14 +1421,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
14211421

14221422
private class Int64Processor : ResultProcessor<long>
14231423
{
1424-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1424+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
14251425
{
1426-
switch (result.Resp2TypeBulkString)
1426+
switch (reader.Resp2PrefixBulkString)
14271427
{
1428-
case ResultType.Integer:
1429-
case ResultType.SimpleString:
1430-
case ResultType.BulkString:
1431-
if (result.TryGetInt64(out long i64))
1428+
case RespPrefix.Integer:
1429+
case RespPrefix.SimpleString:
1430+
case RespPrefix.BulkString:
1431+
if (reader.TryReadInt64(out long i64))
14321432
{
14331433
SetResult(message, i64);
14341434
return true;
@@ -1532,18 +1532,19 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15321532

15331533
private sealed class PubSubNumSubProcessor : Int64Processor
15341534
{
1535-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1535+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
15361536
{
1537-
if (result.Resp2TypeArray == ResultType.Array)
1537+
var snapshot = reader;
1538+
if (reader.Resp2PrefixArray == RespPrefix.Array && reader.AggregateLength() == 2)
15381539
{
1539-
var arr = result.GetItems();
1540-
if (arr.Length == 2 && arr[1].TryGetInt64(out long val))
1540+
var agg = reader.AggregateChildren();
1541+
if (agg.MoveNext() && agg.MoveNext() && agg.Value.TryReadInt64(out long val))
15411542
{
15421543
SetResult(message, val);
15431544
return true;
15441545
}
15451546
}
1546-
return base.SetResultCore(connection, message, result);
1547+
return base.SetResultCore(connection, message, ref snapshot);
15471548
}
15481549
}
15491550

@@ -1563,19 +1564,19 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15631564

15641565
private sealed class NullableDoubleProcessor : ResultProcessor<double?>
15651566
{
1566-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1567+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
15671568
{
1568-
switch (result.Resp2TypeBulkString)
1569+
switch (reader.Resp2PrefixBulkString)
15691570
{
1570-
case ResultType.Integer:
1571-
case ResultType.SimpleString:
1572-
case ResultType.BulkString:
1573-
if (result.IsNull)
1571+
case RespPrefix.Integer:
1572+
case RespPrefix.SimpleString:
1573+
case RespPrefix.BulkString:
1574+
if (reader.IsNull)
15741575
{
15751576
SetResult(message, null);
15761577
return true;
15771578
}
1578-
if (result.TryGetDouble(out double val))
1579+
if (reader.TryReadDouble(out double val))
15791580
{
15801581
SetResult(message, val);
15811582
return true;
@@ -1588,35 +1589,28 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15881589

15891590
private sealed class NullableInt64Processor : ResultProcessor<long?>
15901591
{
1591-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1592+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
15921593
{
1593-
switch (result.Resp2TypeBulkString)
1594+
switch (reader.Resp2PrefixBulkString)
15941595
{
1595-
case ResultType.Integer:
1596-
case ResultType.SimpleString:
1597-
case ResultType.BulkString:
1598-
if (result.IsNull)
1596+
case RespPrefix.Integer:
1597+
case RespPrefix.SimpleString:
1598+
case RespPrefix.BulkString:
1599+
if (reader.IsNull)
15991600
{
16001601
SetResult(message, null);
16011602
return true;
16021603
}
1603-
if (result.TryGetInt64(out long i64))
1604+
if (reader.TryReadInt64(out long i64))
16041605
{
16051606
SetResult(message, i64);
16061607
return true;
16071608
}
16081609
break;
1609-
case ResultType.Array:
1610-
var items = result.GetItems();
1611-
if (items.Length == 1)
1612-
{ // treat an array of 1 like a single reply
1613-
if (items[0].TryGetInt64(out long value))
1614-
{
1615-
SetResult(message, value);
1616-
return true;
1617-
}
1618-
}
1619-
break;
1610+
case RespPrefix.Array when reader.TryReadNext() && reader.IsScalar && reader.TryReadInt64(out long value) && !reader.TryReadNext():
1611+
// treat an array of 1 like a single reply
1612+
SetResult(message, value);
1613+
return true;
16201614
}
16211615
return false;
16221616
}
@@ -1703,14 +1697,14 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
17031697

17041698
private sealed class RedisKeyProcessor : ResultProcessor<RedisKey>
17051699
{
1706-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1700+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
17071701
{
1708-
switch (result.Resp2TypeBulkString)
1702+
switch (reader.Resp2PrefixBulkString)
17091703
{
1710-
case ResultType.Integer:
1711-
case ResultType.SimpleString:
1712-
case ResultType.BulkString:
1713-
SetResult(message, result.AsRedisKey());
1704+
case RespPrefix.Integer:
1705+
case RespPrefix.SimpleString:
1706+
case RespPrefix.BulkString:
1707+
SetResult(message, reader.ReadByteArray());
17141708
return true;
17151709
}
17161710
return false;
@@ -1719,13 +1713,13 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
17191713

17201714
private sealed class RedisTypeProcessor : ResultProcessor<RedisType>
17211715
{
1722-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1716+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
17231717
{
1724-
switch (result.Resp2TypeBulkString)
1718+
switch (reader.Resp2PrefixBulkString)
17251719
{
1726-
case ResultType.SimpleString:
1727-
case ResultType.BulkString:
1728-
string s = result.GetString()!;
1720+
case RespPrefix.SimpleString:
1721+
case RespPrefix.BulkString:
1722+
string s = reader.ReadString()!;
17291723
RedisType value;
17301724
if (string.Equals(s, "zset", StringComparison.OrdinalIgnoreCase)) value = Redis.RedisType.SortedSet;
17311725
else if (!Enum.TryParse<RedisType>(s, true, out value)) value = global::StackExchange.Redis.RedisType.Unknown;
@@ -1977,14 +1971,14 @@ private static LCSMatchResult Parse(in RawResult result)
19771971

19781972
private sealed class RedisValueProcessor : ResultProcessor<RedisValue>
19791973
{
1980-
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
1974+
protected override bool SetResultCore(PhysicalConnection connection, Message message, ref RespReader reader)
19811975
{
1982-
switch (result.Resp2TypeBulkString)
1976+
switch (reader.Resp2PrefixBulkString)
19831977
{
1984-
case ResultType.Integer:
1985-
case ResultType.SimpleString:
1986-
case ResultType.BulkString:
1987-
SetResult(message, result.AsRedisValue());
1978+
case RespPrefix.Integer:
1979+
case RespPrefix.SimpleString:
1980+
case RespPrefix.BulkString:
1981+
SetResult(message, reader.ReadRedisValue());
19881982
return true;
19891983
}
19901984
return false;

tests/StackExchange.Redis.Tests/ResultProcessorUnitTests.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@ namespace StackExchange.Redis.Tests;
1010

1111
public class ResultProcessorUnitTests(ITestOutputHelper log)
1212
{
13+
private const string ATTRIB_FOO_BAR = "|1\r\n+foo\r\n+bar\r\n";
14+
1315
[Theory]
1416
[InlineData(":1\r\n", 1)]
1517
[InlineData("+1\r\n", 1)]
1618
[InlineData("$1\r\n1\r\n", 1)]
19+
[InlineData(",1\r\n", 1)]
20+
[InlineData(ATTRIB_FOO_BAR + ":1\r\n", 1)]
1721
[InlineData(":-42\r\n", -42)]
1822
[InlineData("+-42\r\n", -42)]
1923
[InlineData("$3\r\n-42\r\n", -42)]
24+
[InlineData(",-42\r\n", -42)]
2025
public void Int32(string resp, int value) => Assert.Equal(value, Execute(resp, ResultProcessor.Int32));
2126

2227
[Theory]
@@ -28,9 +33,12 @@ public class ResultProcessorUnitTests(ITestOutputHelper log)
2833
[InlineData(":1\r\n", 1)]
2934
[InlineData("+1\r\n", 1)]
3035
[InlineData("$1\r\n1\r\n", 1)]
36+
[InlineData(",1\r\n", 1)]
37+
[InlineData(ATTRIB_FOO_BAR + ":1\r\n", 1)]
3138
[InlineData(":-42\r\n", -42)]
3239
[InlineData("+-42\r\n", -42)]
3340
[InlineData("$3\r\n-42\r\n", -42)]
41+
[InlineData(",-42\r\n", -42)]
3442
public void Int64(string resp, long value) => Assert.Equal(value, Execute(resp, ResultProcessor.Int64));
3543

3644
[Theory]
@@ -43,8 +51,54 @@ public class ResultProcessorUnitTests(ITestOutputHelper log)
4351
[InlineData("*0\r\n", "")]
4452
[InlineData("*1\r\n+42\r\n", "42")]
4553
[InlineData("*2\r\n+42\r\n:78\r\n", "42,78")]
54+
[InlineData(ATTRIB_FOO_BAR + "*1\r\n+42\r\n", "42")]
4655
public void Int64Array(string resp, string? value) => Assert.Equal(value, Join(Execute(resp, ResultProcessor.Int64Array)));
4756

57+
[Theory]
58+
[InlineData(":42\r\n", 42.0)]
59+
[InlineData("+3.14\r\n", 3.14)]
60+
[InlineData("$4\r\n3.14\r\n", 3.14)]
61+
[InlineData(",3.14\r\n", 3.14)]
62+
[InlineData(ATTRIB_FOO_BAR + ",3.14\r\n", 3.14)]
63+
[InlineData(":-1\r\n", -1.0)]
64+
[InlineData("+inf\r\n", double.PositiveInfinity)]
65+
[InlineData(",inf\r\n", double.PositiveInfinity)]
66+
[InlineData("$4\r\n-inf\r\n", double.NegativeInfinity)]
67+
[InlineData(",-inf\r\n", double.NegativeInfinity)]
68+
[InlineData(",nan\r\n", double.NaN)]
69+
public void Double(string resp, double value) => Assert.Equal(value, Execute(resp, ResultProcessor.Double));
70+
71+
[Theory]
72+
[InlineData("_\r\n", null)]
73+
[InlineData("$-1\r\n", null)]
74+
[InlineData(":42\r\n", 42L)]
75+
[InlineData("+42\r\n", 42L)]
76+
[InlineData("$2\r\n42\r\n", 42L)]
77+
[InlineData(",42\r\n", 42L)]
78+
[InlineData(ATTRIB_FOO_BAR + ":42\r\n", 42L)]
79+
public void NullableInt64(string resp, long? value) => Assert.Equal(value, Execute(resp, ResultProcessor.NullableInt64));
80+
81+
[Theory]
82+
[InlineData("*1\r\n:99\r\n", 99L)]
83+
[InlineData(ATTRIB_FOO_BAR + "*1\r\n:99\r\n", 99L)]
84+
public void NullableInt64ArrayOfOne(string resp, long? value) => Assert.Equal(value, Execute(resp, ResultProcessor.NullableInt64));
85+
86+
[Theory]
87+
[InlineData("*-1\r\n")] // null array
88+
[InlineData("*0\r\n")] // empty array
89+
[InlineData("*2\r\n:1\r\n:2\r\n")] // two elements
90+
public void FailingNullableInt64ArrayOfNonOne(string resp) => ExecuteUnexpected(resp, ResultProcessor.NullableInt64);
91+
92+
[Theory]
93+
[InlineData("_\r\n", null)]
94+
[InlineData("$-1\r\n", null)]
95+
[InlineData(":42\r\n", 42.0)]
96+
[InlineData("+3.14\r\n", 3.14)]
97+
[InlineData("$4\r\n3.14\r\n", 3.14)]
98+
[InlineData(",3.14\r\n", 3.14)]
99+
[InlineData(ATTRIB_FOO_BAR + ",3.14\r\n", 3.14)]
100+
public void NullableDouble(string resp, double? value) => Assert.Equal(value, Execute(resp, ResultProcessor.NullableDouble));
101+
48102
[return: NotNullIfNotNull(nameof(array))]
49103
protected static string? Join<T>(T[]? array, string separator = ",")
50104
{

0 commit comments

Comments
 (0)