-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathRespReader.cs
More file actions
2061 lines (1850 loc) · 78.9 KB
/
RespReader.cs
File metadata and controls
2061 lines (1850 loc) · 78.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
using System.Buffers;
using System.Buffers.Text;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Text;
using RESPite.Internal;
#if NETCOREAPP3_0_OR_GREATER
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;
#endif
#pragma warning disable IDE0079 // Remove unnecessary suppression
#pragma warning disable CS0282 // There is no defined ordering between fields in multiple declarations of partial struct
#pragma warning restore IDE0079 // Remove unnecessary suppression
namespace RESPite.Messages;
/// <summary>
/// Provides low level RESP parsing functionality.
/// </summary>
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
public ref partial struct RespReader
{
[Flags]
private enum RespFlags : byte
{
None = 0,
IsScalar = 1 << 0, // simple strings, bulk strings, etc
IsAggregate = 1 << 1, // arrays, maps, sets, etc
IsNull = 1 << 2, // explicit null RESP types, or bulk-strings/aggregates with length -1
IsInlineScalar = 1 << 3, // a non-null scalar, i.e. with payload+CrLf
IsAttribute = 1 << 4, // is metadata for following elements
IsStreaming = 1 << 5, // unknown length
IsError = 1 << 6, // an explicit error reported inside the protocol
}
// relates to the element we're currently reading
private RespFlags _flags;
private RespPrefix _prefix;
private int _length; // for null: 0; for scalars: the length of the payload; for aggregates: the child count
// the current buffer that we're observing
private int _bufferIndex; // after TryRead, this should be positioned immediately before the actual data
// the position in a multi-segment payload
private long _positionBase; // total data we've already moved past in *previous* buffers
private ReadOnlySequenceSegment<byte>? _tail; // the next tail node
private long _remainingTailLength; // how much more can we consume from the tail?
public long ProtocolBytesRemaining => TotalAvailable;
private readonly int CurrentAvailable => CurrentLength - _bufferIndex;
private readonly long TotalAvailable => CurrentAvailable + _remainingTailLength;
private partial void UnsafeTrimCurrentBy(int count);
private readonly partial ref byte UnsafeCurrent { get; }
private readonly partial int CurrentLength { get; }
private partial void SetCurrent(ReadOnlySpan<byte> value);
private RespPrefix UnsafePeekPrefix() => (RespPrefix)UnsafeCurrent;
private readonly partial ReadOnlySpan<byte> UnsafePastPrefix();
private readonly partial ReadOnlySpan<byte> CurrentSpan();
/// <summary>
/// Get the scalar value as a single-segment span.
/// </summary>
/// <returns><c>True</c> if this is a non-streaming scalar element that covers a single span only, otherwise <c>False</c>.</returns>
/// <remarks>If a scalar reports <c>False</c>, <see cref="ScalarChunks"/> can be used to iterate the entire payload.</remarks>
/// <param name="value">When <c>True</c>, the contents of the scalar value.</param>
public readonly bool TryGetSpan(out ReadOnlySpan<byte> value)
{
if (IsInlineScalar && CurrentAvailable >= _length)
{
value = CurrentSpan().Slice(0, _length);
return true;
}
value = default;
return IsNullScalar;
}
/// <summary>
/// Returns the position after the end of the current element.
/// </summary>
public readonly long BytesConsumed => _positionBase + _bufferIndex + TrailingLength;
/// <summary>
/// Body length of scalar values, plus any terminating sentinels.
/// </summary>
private readonly int TrailingLength => (_flags & RespFlags.IsInlineScalar) == 0 ? 0 : (_length + 2);
/// <summary>
/// Gets the RESP kind of the current element.
/// </summary>
public readonly RespPrefix Prefix => _prefix;
/// <summary>
/// The payload length of this scalar element (includes combined length for streaming scalars).
/// </summary>
public readonly int ScalarLength() =>
IsInlineScalar ? _length : IsNullScalar ? 0 : checked((int)ScalarLengthSlow());
/// <summary>
/// Indicates whether this scalar value is zero-length.
/// </summary>
public readonly bool ScalarIsEmpty() =>
IsInlineScalar ? _length == 0 : (IsNullScalar || !ScalarChunks().MoveNext());
/// <summary>
/// Indicates whether this aggregate value is zero-length.
/// </summary>
public readonly bool AggregateIsEmpty() => AggregateLengthIs(0);
/// <summary>
/// The payload length of this scalar element (includes combined length for streaming scalars).
/// </summary>
public readonly long ScalarLongLength() => IsInlineScalar ? _length : IsNullScalar ? 0 : ScalarLengthSlow();
/// <summary>
/// Indicates whether the payload length of this scalar element is exactly the specified <paramref name="count"/> value.
/// </summary>
public readonly bool ScalarLengthIs(int count)
=> IsInlineScalar ? _length == count : (IsNullScalar ? count == 0 : ScalarLengthIsSlow(count));
private readonly long ScalarLengthSlow()
{
DemandScalar();
long length = 0;
var iterator = ScalarChunks();
while (iterator.MoveNext())
{
length += iterator.CurrentLength;
}
return length;
}
private readonly bool ScalarLengthIsSlow(int expected)
{
DemandScalar();
int length = 0;
var iterator = ScalarChunks();
while (length <= expected && iterator.MoveNext()) // short-circuit if we've read enough to know
{
length += iterator.CurrentLength;
}
return length == expected;
}
/// <summary>
/// The number of child elements associated with an aggregate.
/// </summary>
/// <remarks>For <see cref="RespPrefix.Map"/>
/// and <see cref="RespPrefix.Attribute"/> aggregates, this is <b>twice</b> the value reported in the RESP protocol,
/// i.e. a map of the form <c>%2\r\n...</c> will report <c>4</c> as the length.</remarks>
/// <remarks>Note that if the data could be streaming (<see cref="IsStreaming"/>), it may be preferable to use
/// the <see cref="AggregateChildren"/> API, using the <see cref="RespReader.AggregateEnumerator.MovePast(out RespReader)"/> API to update the outer reader.</remarks>
public readonly int AggregateLength() =>
(_flags & (RespFlags.IsAggregate | RespFlags.IsStreaming)) == RespFlags.IsAggregate
? _length : AggregateLengthSlow();
/// <summary>
/// Indicates whether the number of child elements associated with an aggregate is exactly the specified <paramref name="count"/> value.
/// </summary>
/// <remarks>For <see cref="RespPrefix.Map"/>
/// and <see cref="RespPrefix.Attribute"/> aggregates, this is <b>twice</b> the value reported in the RESP protocol,
/// i.e. a map of the form <c>%2\r\n...</c> will report <c>4</c> as the length.</remarks>
public readonly bool AggregateLengthIs(int count)
=> (_flags & (RespFlags.IsAggregate | RespFlags.IsStreaming)) == RespFlags.IsAggregate
? _length == count : AggregateLengthIsSlow(count);
public delegate T Projection<out T>(ref RespReader value);
public delegate TResult Projection<TState, out TResult>(ref TState state, ref RespReader value)
#if NET9_0_OR_GREATER
where TState : allows ref struct
#endif
;
public void FillAll<T>(scoped Span<T> target, Projection<T> projection)
{
DemandNotNull();
AggregateChildren().FillAll(target, projection);
}
public void FillAll<TState, TResult>(scoped Span<TResult> target, ref TState state, Projection<TState, TResult> projection)
{
DemandNotNull();
AggregateChildren().FillAll(target, ref state, projection);
}
private readonly int AggregateLengthSlow()
{
switch (_flags & (RespFlags.IsAggregate | RespFlags.IsStreaming))
{
case RespFlags.IsAggregate:
return _length;
case RespFlags.IsAggregate | RespFlags.IsStreaming:
break;
default:
DemandAggregate(); // we expect this to throw
break;
}
int count = 0;
var reader = Clone();
while (true)
{
if (!reader.TryReadNextSkipAttributes(skipStreamTerminator: false)) ThrowEof();
if (reader.Prefix == RespPrefix.StreamTerminator)
{
return count;
}
reader.SkipChildren();
count++;
}
}
private readonly bool AggregateLengthIsSlow(int expected)
{
switch (_flags & (RespFlags.IsAggregate | RespFlags.IsStreaming))
{
case RespFlags.IsAggregate:
return _length == expected;
case RespFlags.IsAggregate | RespFlags.IsStreaming:
break;
default:
DemandAggregate(); // we expect this to throw
break;
}
int count = 0;
var reader = Clone();
while (count <= expected) // short-circuit if we've read enough to know
{
if (!reader.TryReadNextSkipAttributes(skipStreamTerminator: false)) ThrowEof();
if (reader.Prefix == RespPrefix.StreamTerminator)
{
break;
}
reader.SkipChildren();
count++;
}
return count == expected;
}
/// <summary>
/// Indicates whether this is a scalar value, i.e. with a potential payload body.
/// </summary>
public readonly bool IsScalar => (_flags & RespFlags.IsScalar) != 0;
internal readonly bool IsInlineScalar => (_flags & RespFlags.IsInlineScalar) != 0;
internal readonly bool IsNullScalar =>
(_flags & (RespFlags.IsScalar | RespFlags.IsNull)) == (RespFlags.IsScalar | RespFlags.IsNull);
/// <summary>
/// Indicates whether this is an aggregate value, i.e. represents a collection of sub-values.
/// </summary>
public readonly bool IsAggregate => (_flags & RespFlags.IsAggregate) != 0;
internal readonly bool IsNonNullAggregate
=> (_flags & (RespFlags.IsAggregate | RespFlags.IsNull)) == RespFlags.IsAggregate;
/// <summary>
/// Indicates whether this is a null value; this could be an explicit <see cref="RespPrefix.Null"/>,
/// or a scalar or aggregate a negative reported length.
/// </summary>
public readonly bool IsNull => (_flags & RespFlags.IsNull) != 0;
/// <summary>
/// Indicates whether this is an attribute value, i.e. metadata relating to later element data.
/// </summary>
public readonly bool IsAttribute => (_flags & RespFlags.IsAttribute) != 0;
/// <summary>
/// Indicates whether this represents streaming content, where the <see cref="ScalarLength"/> or <see cref="AggregateLength"/> is not known in advance.
/// </summary>
public readonly bool IsStreaming => (_flags & RespFlags.IsStreaming) != 0;
/// <summary>
/// Equivalent to both <see cref="IsStreaming"/> and <see cref="IsScalar"/>.
/// </summary>
internal readonly bool IsStreamingScalar => (_flags & (RespFlags.IsScalar | RespFlags.IsStreaming)) ==
(RespFlags.IsScalar | RespFlags.IsStreaming);
/// <summary>
/// Indicates errors reported inside the protocol.
/// </summary>
public readonly bool IsError => (_flags & RespFlags.IsError) != 0;
/// <summary>
/// Gets the effective change (in terms of how many RESP nodes we expect to see) from consuming this element.
/// For simple scalars, this is <c>-1</c> because we have one less node to read; for simple aggregates, this is
/// <c>AggregateLength-1</c> because we will have consumed one element, but now need to read the additional
/// <see cref="AggregateLength" /> child elements. Attributes report <c>0</c>, since they supplement data
/// we still need to consume. The final terminator for streaming data reports a delta of <c>-1</c>, otherwise: <c>0</c>.
/// </summary>
/// <remarks>This does not account for being nested inside a streaming aggregate; the caller must deal with that manually.</remarks>
internal int Delta() =>
(_flags & (RespFlags.IsScalar | RespFlags.IsAggregate | RespFlags.IsStreaming | RespFlags.IsAttribute)) switch
{
RespFlags.IsScalar | RespFlags.IsAggregate=> -1, // null has this
RespFlags.IsScalar => -1,
RespFlags.IsAggregate => _length - 1,
RespFlags.IsAggregate | RespFlags.IsAttribute => _length,
_ => 0,
};
/// <summary>
/// Assert that this is the final element in the current payload.
/// </summary>
/// <exception cref="InvalidOperationException">If additional elements are available.</exception>
public void DemandEnd()
{
#pragma warning disable CS0618 // avoid TryReadNext unless you know what you're doing
while (IsStreamingScalar)
{
if (!TryReadNext()) ThrowEof();
}
if (TryReadNext())
{
Throw(Prefix);
}
#pragma warning restore CS0618
static void Throw(RespPrefix prefix) =>
throw new InvalidOperationException($"Expected end of payload, but found {prefix}");
}
private bool TryReadNextSkipAttributes(bool skipStreamTerminator)
{
#pragma warning disable CS0618 // avoid TryReadNext unless you know what you're doing
while (TryReadNext())
{
if (IsAttribute)
{
SkipChildren();
}
else if (skipStreamTerminator & Prefix is RespPrefix.StreamTerminator)
{
// skip terminator
}
else
{
return true;
}
}
#pragma warning restore CS0618
return false;
}
private bool TryReadNextProcessAttributes<T>(RespAttributeReader<T> respAttributeReader, ref T attributes, bool skipStreamTerminator)
{
#pragma warning disable CS0618 // avoid TryReadNext unless you know what you're doing
while (TryReadNext())
#pragma warning restore CS0618
{
if (IsAttribute)
{
respAttributeReader.Read(ref this, ref attributes);
}
else if (skipStreamTerminator & Prefix is RespPrefix.StreamTerminator)
{
// skip terminator
}
else
{
return true;
}
}
return false;
}
/// <summary>
/// Move to the next content element; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <exception cref="EndOfStreamException">If the data is exhausted before a streaming scalar is exhausted.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
public bool TryMoveNext()
{
while (IsStreamingScalar) // close out the current streaming scalar
{
if (!TryReadNextSkipAttributes(false)) ThrowEof();
}
if (TryReadNextSkipAttributes(true))
{
if (IsError) ThrowError();
return true;
}
return false;
}
/// <summary>
/// Move to the next content element; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <param name="checkError">Whether to check and throw for error messages.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before a streaming scalar is exhausted.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
public bool TryMoveNext(bool checkError)
{
while (IsStreamingScalar) // close out the current streaming scalar
{
if (!TryReadNextSkipAttributes(false)) ThrowEof();
}
if (TryReadNextSkipAttributes(true))
{
if (checkError && IsError) ThrowError();
return true;
}
return false;
}
/// <summary>
/// Move to the next content element; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <param name="respAttributeReader">Parser for attribute data preceding the data.</param>
/// <param name="attributes">The state for attributes encountered.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before a streaming scalar is exhausted.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <typeparam name="T">The type of data represented by this reader.</typeparam>
public bool TryMoveNext<T>(RespAttributeReader<T> respAttributeReader, ref T attributes)
{
while (IsStreamingScalar) // close out the current streaming scalar
{
if (!TryReadNextSkipAttributes(false)) ThrowEof();
}
if (TryReadNextProcessAttributes(respAttributeReader, ref attributes, true))
{
if (IsError) ThrowError();
return true;
}
return false;
}
/// <summary>
/// Move to the next content element, asserting that it is of the expected type; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <param name="prefix">The expected data type.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before a streaming scalar is exhausted.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <exception cref="InvalidOperationException">If the data is not of the expected type.</exception>
public bool TryMoveNext(RespPrefix prefix)
{
bool result = TryMoveNext();
if (result) Demand(prefix);
return result;
}
/// <summary>
/// Move to the next content element; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
public void MoveNext()
{
if (!TryMoveNext()) ThrowEof();
}
/// <summary>
/// Move to the next content element; this skips attribute metadata, checking for RESP error messages by default.
/// </summary>
/// <param name="respAttributeReader">Parser for attribute data preceding the data.</param>
/// <param name="attributes">The state for attributes encountered.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <typeparam name="T">The type of data represented by this reader.</typeparam>
public void MoveNext<T>(RespAttributeReader<T> respAttributeReader, ref T attributes)
{
if (!TryMoveNext(respAttributeReader, ref attributes)) ThrowEof();
}
private bool MoveNextStreamingScalar()
{
if (IsStreamingScalar)
{
#pragma warning disable CS0618 // avoid TryReadNext unless you know what you're doing
while (TryReadNext())
#pragma warning restore CS0618
{
if (IsAttribute)
{
SkipChildren();
}
else
{
if (Prefix != RespPrefix.StreamContinuation)
ThrowProtocolFailure("Streaming continuation expected");
return _length > 0;
}
}
ThrowEof(); // we should have found something!
}
return false;
}
/// <summary>
/// Move to the next content element (<see cref="MoveNext()"/>) and assert that it is a scalar (<see cref="DemandScalar"/>).
/// </summary>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <exception cref="InvalidOperationException">If the data is not a scalar type.</exception>
public void MoveNextScalar()
{
MoveNext();
DemandScalar();
}
/// <summary>
/// Move to the next content element (<see cref="MoveNext()"/>) and assert that it is an aggregate (<see cref="DemandAggregate"/>).
/// </summary>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <exception cref="InvalidOperationException">If the data is not an aggregate type.</exception>
public void MoveNextAggregate()
{
MoveNext();
DemandAggregate();
}
/// <summary>
/// Move to the next content element (<see cref="MoveNext()"/>) and assert that it of type specified
/// in <paramref name="prefix"/>.
/// </summary>
/// <param name="prefix">The expected data type.</param>
/// <param name="respAttributeReader">Parser for attribute data preceding the data.</param>
/// <param name="attributes">The state for attributes encountered.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <exception cref="InvalidOperationException">If the data is not of the expected type.</exception>
/// <typeparam name="T">The type of data represented by this reader.</typeparam>
public void MoveNext<T>(RespPrefix prefix, RespAttributeReader<T> respAttributeReader, ref T attributes)
{
MoveNext(respAttributeReader, ref attributes);
Demand(prefix);
}
/// <summary>
/// Move to the next content element (<see cref="MoveNext()"/>) and assert that it of type specified
/// in <paramref name="prefix"/>.
/// </summary>
/// <param name="prefix">The expected data type.</param>
/// <exception cref="EndOfStreamException">If the data is exhausted before content is found.</exception>
/// <exception cref="RespException">If the data contains an explicit error element.</exception>
/// <exception cref="InvalidOperationException">If the data is not of the expected type.</exception>
public void MoveNext(RespPrefix prefix)
{
MoveNext();
Demand(prefix);
}
internal void Demand(RespPrefix prefix)
{
if (Prefix != prefix) Throw(prefix, Prefix);
static void Throw(RespPrefix expected, RespPrefix actual) =>
throw new InvalidOperationException($"Expected {expected} element, but found {actual}.");
}
private readonly void ThrowError() => throw new RespException(ReadString()!);
/// <summary>
/// Skip all sub elements of the current node; this includes both aggregate children and scalar streaming elements.
/// </summary>
public void SkipChildren()
{
// if this is a simple non-streaming scalar, then: there's nothing complex to do; otherwise, re-use the
// frame scanner logic to seek past the noise (this way, we avoid recursion etc)
switch (_flags & (RespFlags.IsScalar | RespFlags.IsAggregate | RespFlags.IsStreaming))
{
case RespFlags.None:
// no current element
break;
case RespFlags.IsScalar:
// simple scalar
MovePastCurrent();
break;
default:
// something more complex
RespScanState state = new(in this);
if (!state.TryRead(ref this, out _)) ThrowEof();
break;
}
}
/// <summary>
/// Reads the current element as a string value.
/// </summary>
public readonly string? ReadString() => ReadString(out _);
/// <summary>
/// Reads the current element as a string value.
/// </summary>
public readonly string? ReadString(out string prefix)
{
byte[] pooled = [];
try
{
var span = Buffer(ref pooled, stackalloc byte[256]);
prefix = "";
if (span.IsEmpty)
{
return IsNull ? null : "";
}
if (Prefix == RespPrefix.VerbatimString
&& span.Length >= 4 && span[3] == ':')
{
// "the first three bytes provide information about the format of the following string,
// which can be txt for plain text, or mkd for markdown. The fourth byte is always :.
// Then the real string follows."
var prefixValue = RespConstants.UnsafeCpuUInt32(span);
if (prefixValue == PrefixTxt)
{
prefix = "txt";
}
else if (prefixValue == PrefixMkd)
{
prefix = "mkd";
}
else
{
prefix = RespConstants.UTF8.GetString(span.Slice(0, 3));
}
span = span.Slice(4);
}
return RespConstants.UTF8.GetString(span);
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
private static readonly uint
PrefixTxt = RespConstants.UnsafeCpuUInt32("txt:"u8),
PrefixMkd = RespConstants.UnsafeCpuUInt32("mkd:"u8);
/// <summary>
/// Reads the current element as a string value.
/// </summary>
public readonly byte[]? ReadByteArray()
{
byte[] pooled = [];
try
{
var span = Buffer(ref pooled, stackalloc byte[256]);
if (span.IsEmpty)
{
return IsNull ? null : [];
}
return span.ToArray();
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
/// <summary>
/// Reads the current element using a general purpose text parser.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
internal readonly T ParseBytes<T>(Parser<byte, T> parser)
{
byte[] pooled = [];
var span = Buffer(ref pooled, stackalloc byte[256]);
try
{
return parser(span);
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
/// <summary>
/// Reads the current element using a general purpose text parser.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
/// <typeparam name="TState">State required by the parser.</typeparam>
internal readonly T ParseBytes<T, TState>(Parser<byte, TState, T> parser, TState? state)
{
byte[] pooled = [];
var span = Buffer(ref pooled, stackalloc byte[256]);
try
{
return parser(span, default);
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
public readonly unsafe bool TryParseScalar<T>(
delegate* managed<ReadOnlySpan<byte>, out T, bool> parser, out T value)
{
// Fast path: try to get the span directly
return TryGetSpan(out var span) ? parser(span, out value) : TryParseSlow(parser, out value);
}
[MethodImpl(MethodImplOptions.NoInlining)]
private readonly unsafe bool TryParseSlow<T>(
delegate* managed<ReadOnlySpan<byte>, out T, bool> parser,
out T value)
{
byte[] pooled = [];
try
{
var span = Buffer(ref pooled, stackalloc byte[256]);
return parser(span, out value);
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
/// <summary>
/// Tries to read the current scalar element using a parser callback.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
/// <param name="parser">The parser callback.</param>
/// <param name="value">The parsed value if successful.</param>
/// <returns><c>true</c> if parsing succeeded; otherwise, <c>false</c>.</returns>
#pragma warning disable RS0016, RS0027 // public API
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#if DEBUG
[Obsolete("Please prefer the function-pointer API for library-internal use.")]
#endif
public readonly bool TryParseScalar<T>(ScalarParser<byte, T> parser, out T value)
#pragma warning restore RS0016, RS0027 // public API
{
// Fast path: try to get the span directly
return TryGetSpan(out var span) ? parser(span, out value) : TryParseSlow(parser, out value);
}
[MethodImpl(MethodImplOptions.NoInlining)]
private readonly bool TryParseSlow<T>(ScalarParser<byte, T> parser, out T value)
{
byte[] pooled = [];
try
{
var span = Buffer(ref pooled, stackalloc byte[256]);
return parser(span, out value);
}
finally
{
ArrayPool<byte>.Shared.Return(pooled);
}
}
/// <summary>
/// Buffers the current scalar value into the provided target span.
/// </summary>
/// <param name="target">The target span to buffer data into.</param>
/// <returns>
/// A span containing the buffered data. If the scalar data fits entirely within <paramref name="target"/>,
/// returns a slice of <paramref name="target"/> containing all the data. If the scalar data is larger than
/// <paramref name="target"/>, returns <paramref name="target"/> filled with the first <c>target.Length</c> bytes
/// of the scalar data (the remaining data is not buffered).
/// </returns>
/// <remarks>
/// This method first attempts to use <see cref="TryGetSpan"/> to avoid copying. If the data is non-contiguous
/// (e.g., streaming scalars or data spanning multiple buffer segments), it will copy data into <paramref name="target"/>.
/// When the source data exceeds <paramref name="target"/>'s capacity, only the first <c>target.Length</c> bytes
/// are copied and returned.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal readonly ReadOnlySpan<byte> Buffer(Span<byte> target)
{
if (TryGetSpan(out var simple))
{
return simple;
}
#if NET6_0_OR_GREATER
return BufferSlow(ref Unsafe.NullRef<byte[]>(), target, usePool: false);
#else
byte[] pooled = [];
return BufferSlow(ref pooled, target, usePool: false);
#endif
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal readonly ReadOnlySpan<byte> Buffer(scoped ref byte[] pooled, Span<byte> target = default)
=> TryGetSpan(out var simple) ? simple : BufferSlow(ref pooled, target, true);
[MethodImpl(MethodImplOptions.NoInlining)]
private readonly ReadOnlySpan<byte> BufferSlow(scoped ref byte[] pooled, Span<byte> target, bool usePool)
{
DemandScalar();
if (IsInlineScalar && usePool)
{
// grow to the correct size in advance, if needed
var length = ScalarLength();
if (length > target.Length)
{
var bigger = ArrayPool<byte>.Shared.Rent(length);
ArrayPool<byte>.Shared.Return(pooled);
target = pooled = bigger;
}
}
var iterator = ScalarChunks();
ReadOnlySpan<byte> current;
int offset = 0;
while (iterator.MoveNext())
{
// will the current chunk fit?
current = iterator.Current;
if (current.TryCopyTo(target.Slice(offset)))
{
// fits into the current buffer
offset += current.Length;
}
else if (!usePool)
{
// rent disallowed; fill what we can
var available = target.Slice(offset);
current.Slice(0, available.Length).CopyTo(available);
return target; // we filled it
}
else
{
// rent a bigger buffer, copy and recycle
var bigger = ArrayPool<byte>.Shared.Rent(offset + current.Length);
if (offset != 0)
{
target.Slice(0, offset).CopyTo(bigger);
}
ArrayPool<byte>.Shared.Return(pooled);
target = pooled = bigger;
current.CopyTo(target.Slice(offset));
}
}
return target.Slice(0, offset);
}
/// <summary>
/// Reads the current element using a general purpose byte parser.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
internal readonly T ParseChars<T>(Parser<char, T> parser)
{
byte[] bArr = [];
char[] cArr = [];
try
{
var bSpan = Buffer(ref bArr, stackalloc byte[128]);
var maxChars = RespConstants.UTF8.GetMaxCharCount(bSpan.Length);
Span<char> cSpan = maxChars <= 128 ? stackalloc char[128] : (cArr = ArrayPool<char>.Shared.Rent(maxChars));
int chars = RespConstants.UTF8.GetChars(bSpan, cSpan);
return parser(cSpan.Slice(0, chars));
}
finally
{
ArrayPool<byte>.Shared.Return(bArr);
ArrayPool<char>.Shared.Return(cArr);
}
}
/// <summary>
/// Reads the current element using a general purpose byte parser.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
/// <typeparam name="TState">State required by the parser.</typeparam>
internal readonly T ParseChars<T, TState>(Parser<char, TState, T> parser, TState? state)
{
byte[] bArr = [];
char[] cArr = [];
try
{
var bSpan = Buffer(ref bArr, stackalloc byte[128]);
var maxChars = RespConstants.UTF8.GetMaxCharCount(bSpan.Length);
Span<char> cSpan = maxChars <= 128 ? stackalloc char[128] : (cArr = ArrayPool<char>.Shared.Rent(maxChars));
int chars = RespConstants.UTF8.GetChars(bSpan, cSpan);
return parser(cSpan.Slice(0, chars), state);
}
finally
{
ArrayPool<byte>.Shared.Return(bArr);
ArrayPool<char>.Shared.Return(cArr);
}
}
#if NET7_0_OR_GREATER
/// <summary>
/// Reads the current element using <see cref="ISpanParsable{TSelf}"/>.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
#pragma warning disable RS0016, RS0027 // back-compat overload
public readonly T ParseChars<T>(IFormatProvider? formatProvider = null) where T : ISpanParsable<T>
#pragma warning restore RS0016, RS0027 // back-compat overload
{
byte[] bArr = [];
char[] cArr = [];
try
{
var bSpan = Buffer(ref bArr, stackalloc byte[128]);
var maxChars = RespConstants.UTF8.GetMaxCharCount(bSpan.Length);
Span<char> cSpan = maxChars <= 128 ? stackalloc char[128] : (cArr = ArrayPool<char>.Shared.Rent(maxChars));
int chars = RespConstants.UTF8.GetChars(bSpan, cSpan);
return T.Parse(cSpan.Slice(0, chars), formatProvider ?? CultureInfo.InvariantCulture);
}
finally
{
ArrayPool<byte>.Shared.Return(bArr);
ArrayPool<char>.Shared.Return(cArr);
}
}
#endif
#if NET8_0_OR_GREATER
/// <summary>
/// Reads the current element using <see cref="IUtf8SpanParsable{TSelf}"/>.
/// </summary>
/// <typeparam name="T">The type of data being parsed.</typeparam>
#pragma warning disable RS0016, RS0027 // back-compat overload
public readonly T ParseBytes<T>(IFormatProvider? formatProvider = null) where T : IUtf8SpanParsable<T>
#pragma warning restore RS0016, RS0027 // back-compat overload
{
byte[] bArr = [];
try
{
var bSpan = Buffer(ref bArr, stackalloc byte[128]);
return T.Parse(bSpan, formatProvider ?? CultureInfo.InvariantCulture);
}
finally
{
ArrayPool<byte>.Shared.Return(bArr);
}
}
#endif
/// <summary>
/// General purpose parsing callback.
/// </summary>
/// <typeparam name="TSource">The type of source data being parsed.</typeparam>
/// <typeparam name="TState">State required by the parser.</typeparam>
/// <typeparam name="TValue">The output type of data being parsed.</typeparam>
// is this needed?
internal delegate TValue Parser<TSource, TState, TValue>(scoped ReadOnlySpan<TSource> value, TState? state);
/// <summary>
/// General purpose parsing callback.
/// </summary>
/// <typeparam name="TSource">The type of source data being parsed.</typeparam>
/// <typeparam name="TValue">The output type of data being parsed.</typeparam>
// is this needed?
internal delegate TValue Parser<TSource, TValue>(scoped ReadOnlySpan<TSource> value);
/// <summary>
/// Scalar parsing callback that returns a boolean indicating success.
/// </summary>
/// <typeparam name="TSource">The type of source data being parsed.</typeparam>
/// <typeparam name="TValue">The output type of data being parsed.</typeparam>
public delegate bool ScalarParser<TSource, TValue>(scoped ReadOnlySpan<TSource> value, out TValue result);
/// <summary>
/// Initializes a new instance of the <see cref="RespReader"/> struct.
/// </summary>
/// <param name="value">The raw contents to parse with this instance.</param>
public RespReader(ReadOnlySpan<byte> value)
{
_length = 0;
_flags = RespFlags.None;
_prefix = RespPrefix.None;
SetCurrent(value);
_remainingTailLength = _positionBase = 0;
_tail = null;
}