Skip to content

Commit 52df251

Browse files
Copilotpetesramek
andcommitted
feat: add IAsyncEnumerable decode/encode and zero-allocation IDuplexPipe support
Co-authored-by: petesramek <2333452+petesramek@users.noreply.github.com> Agent-Logs-Url: https://github.com/petesramek/polyline-algorithm-csharp/sessions/8bac762b-8608-43c2-9c46-e9485c2acf63
1 parent a71090d commit 52df251

16 files changed

Lines changed: 1303 additions & 4 deletions

src/PolylineAlgorithm/Abstraction/AbstractPolylineDecoder.cs

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@ namespace PolylineAlgorithm.Abstraction;
1111
using PolylineAlgorithm.Internal.Logging;
1212
using PolylineAlgorithm.Properties;
1313
using System;
14+
using System.Buffers;
15+
using System.Collections.Generic;
1416
using System.Diagnostics.CodeAnalysis;
17+
using System.IO.Pipelines;
18+
using System.Runtime.CompilerServices;
19+
using System.Threading;
20+
using System.Threading.Tasks;
1521

1622
/// <summary>
1723
/// Decodes encoded polyline strings into sequences of geographic coordinates.
@@ -20,7 +26,7 @@ namespace PolylineAlgorithm.Abstraction;
2026
/// <remarks>
2127
/// This abstract class provides a base implementation for decoding polylines, allowing subclasses to define how to handle specific polyline formats.
2228
/// </remarks>
23-
public abstract class AbstractPolylineDecoder<TPolyline, TCoordinate> : IPolylineDecoder<TPolyline, TCoordinate> {
29+
public abstract class AbstractPolylineDecoder<TPolyline, TCoordinate> : IPolylineDecoder<TPolyline, TCoordinate>, IAsyncPolylineDecoder<TPolyline, TCoordinate>, IPolylinePipeDecoder<TCoordinate> {
2430
/// <summary>
2531
/// Initializes a new instance of the <see cref="AbstractPolylineDecoder{TPolyline, TCoordinate}"/> class with default encoding options.
2632
/// </summary>
@@ -138,6 +144,135 @@ static void ValidateEmptySequence(ILogger<AbstractPolylineDecoder<TPolyline, TCo
138144
/// </returns>
139145
protected abstract ReadOnlyMemory<char> GetReadOnlyMemory(TPolyline polyline);
140146

147+
/// <summary>
148+
/// Asynchronously decodes the specified encoded polyline into a sequence of geographic coordinates by
149+
/// iterating the synchronous <see cref="Decode"/> implementation and checking the cancellation token
150+
/// between each yielded coordinate.
151+
/// </summary>
152+
/// <param name="polyline">
153+
/// The <typeparamref name="TPolyline"/> instance containing the encoded polyline string to decode.
154+
/// </param>
155+
/// <param name="cancellationToken">
156+
/// A <see cref="CancellationToken"/> to observe while iterating.
157+
/// </param>
158+
/// <returns>
159+
/// An <see cref="IAsyncEnumerable{T}"/> of <typeparamref name="TCoordinate"/> representing the decoded
160+
/// latitude and longitude pairs.
161+
/// </returns>
162+
public async IAsyncEnumerable<TCoordinate> DecodeAsync(
163+
TPolyline polyline,
164+
[EnumeratorCancellation] CancellationToken cancellationToken) {
165+
166+
foreach (TCoordinate coordinate in Decode(polyline)) {
167+
cancellationToken.ThrowIfCancellationRequested();
168+
yield return coordinate;
169+
}
170+
171+
await Task.CompletedTask.ConfigureAwait(false);
172+
}
173+
174+
/// <summary>
175+
/// Asynchronously decodes encoded polyline bytes read from <paramref name="reader"/> into a sequence of
176+
/// geographic coordinates with zero intermediate allocations.
177+
/// </summary>
178+
/// <remarks>
179+
/// The method processes the pipe in chunks using <see cref="SequenceReader{T}"/> to handle multi-segment
180+
/// <see cref="ReadOnlySequence{T}"/> buffers transparently. The pipe reader is not completed by this method.
181+
/// </remarks>
182+
/// <param name="reader">
183+
/// The <see cref="PipeReader"/> from which the encoded polyline bytes are consumed.
184+
/// </param>
185+
/// <param name="cancellationToken">
186+
/// A <see cref="CancellationToken"/> to observe while waiting for data from the pipe.
187+
/// </param>
188+
/// <returns>
189+
/// An <see cref="IAsyncEnumerable{T}"/> of <typeparamref name="TCoordinate"/> representing the decoded
190+
/// latitude and longitude pairs.
191+
/// </returns>
192+
/// <exception cref="ArgumentNullException">
193+
/// Thrown when <paramref name="reader"/> is <see langword="null"/>.
194+
/// </exception>
195+
public async IAsyncEnumerable<TCoordinate> DecodeAsync(
196+
PipeReader reader,
197+
[EnumeratorCancellation] CancellationToken cancellationToken) {
198+
199+
if (reader is null) {
200+
throw new ArgumentNullException(nameof(reader));
201+
}
202+
203+
int latitude = 0;
204+
int longitude = 0;
205+
bool firstRead = true;
206+
207+
while (true) {
208+
ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
209+
ReadOnlySequence<byte> buffer = result.Buffer;
210+
211+
if (firstRead && buffer.IsEmpty && result.IsCompleted) {
212+
throw new ArgumentException(
213+
string.Format(ExceptionMessageResource.PolylineCannotBeShorterThanExceptionMessage, 0),
214+
nameof(reader));
215+
}
216+
217+
firstRead = false;
218+
219+
// Process the buffer synchronously so that the SequenceReader<byte> (a ref struct) never lives
220+
// across a yield boundary.
221+
var decoded = new List<TCoordinate>();
222+
(SequencePosition consumed, latitude, longitude) = ProcessPipeBuffer(buffer, latitude, longitude, decoded);
223+
224+
foreach (TCoordinate coordinate in decoded) {
225+
yield return coordinate;
226+
}
227+
228+
// Tell the pipe how far we have consumed and examined.
229+
reader.AdvanceTo(consumed, buffer.End);
230+
231+
if (result.IsCompleted) {
232+
break;
233+
}
234+
}
235+
}
236+
237+
/// <summary>
238+
/// Synchronously processes a <see cref="ReadOnlySequence{T}"/> pipe buffer, decoding as many complete
239+
/// coordinate pairs as possible and returning the updated variance state and the consumed position.
240+
/// <see cref="System.Buffers.SequenceReader{T}"/> is used here because this method is not an async iterator
241+
/// and therefore the ref-struct constraint does not apply.
242+
/// </summary>
243+
private (SequencePosition consumed, int latitude, int longitude) ProcessPipeBuffer(
244+
ReadOnlySequence<byte> buffer,
245+
int latitude,
246+
int longitude,
247+
List<TCoordinate> results) {
248+
249+
var sequenceReader = new SequenceReader<byte>(buffer);
250+
SequencePosition consumed = buffer.Start;
251+
252+
while (!sequenceReader.End) {
253+
// Save state before attempting to decode a coordinate pair so we can roll back if only
254+
// part of the pair is available in the current buffer.
255+
SequencePosition pairStart = sequenceReader.Position;
256+
int savedLatitude = latitude;
257+
int savedLongitude = longitude;
258+
259+
if (!PolylineEncoding.TryReadValue(ref latitude, ref sequenceReader)
260+
|| !PolylineEncoding.TryReadValue(ref longitude, ref sequenceReader)) {
261+
262+
latitude = savedLatitude;
263+
longitude = savedLongitude;
264+
break;
265+
}
266+
267+
consumed = sequenceReader.Position;
268+
results.Add(CreateCoordinate(
269+
PolylineEncoding.Denormalize(latitude, CoordinateValueType.Latitude),
270+
PolylineEncoding.Denormalize(longitude, CoordinateValueType.Longitude)));
271+
}
272+
273+
return (consumed, latitude, longitude);
274+
}
275+
141276
/// <summary>
142277
/// Creates a coordinate instance from the given latitude and longitude values.
143278
/// </summary>

src/PolylineAlgorithm/Abstraction/AbstractPolylineEncoder.cs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ namespace PolylineAlgorithm.Abstraction;
1414
using System.Collections;
1515
using System.Collections.Generic;
1616
using System.Diagnostics;
17+
using System.IO.Pipelines;
18+
using System.Runtime.CompilerServices;
19+
using System.Threading;
20+
using System.Threading.Tasks;
1721

1822
/// <summary>
1923
/// Provides functionality to encode a collection of geographic coordinates into an encoded polyline string.
@@ -22,7 +26,7 @@ namespace PolylineAlgorithm.Abstraction;
2226
/// <remarks>
2327
/// This abstract class serves as a base for specific polyline encoders, allowing customization of the encoding process.
2428
/// </remarks>
25-
public abstract class AbstractPolylineEncoder<TCoordinate, TPolyline> : IPolylineEncoder<TCoordinate, TPolyline> {
29+
public abstract class AbstractPolylineEncoder<TCoordinate, TPolyline> : IPolylineEncoder<TCoordinate, TPolyline>, IAsyncPolylineEncoder<TCoordinate, TPolyline>, IPolylinePipeEncoder<TCoordinate> {
2630
/// <summary>
2731
/// Initializes a new instance of the <see cref="AbstractPolylineEncoder{TCoordinate, TPolyline}"/> class with default encoding options.
2832
/// </summary>
@@ -217,5 +221,90 @@ static void ValidateBuffer(ILogger<AbstractPolylineDecoder<TPolyline, TCoordinat
217221
/// </returns>
218222

219223
protected abstract double GetLatitude(TCoordinate current);
224+
225+
/// <summary>
226+
/// Asynchronously encodes a sequence of geographic coordinates into an encoded polyline by collecting all
227+
/// coordinates from the <see cref="IAsyncEnumerable{T}"/> and then encoding them synchronously.
228+
/// </summary>
229+
/// <param name="coordinates">
230+
/// The asynchronous collection of <typeparamref name="TCoordinate"/> instances to encode.
231+
/// </param>
232+
/// <param name="cancellationToken">
233+
/// A <see cref="CancellationToken"/> to observe while collecting coordinates.
234+
/// </param>
235+
/// <returns>
236+
/// A <see cref="ValueTask{TResult}"/> containing the encoded <typeparamref name="TPolyline"/>.
237+
/// </returns>
238+
/// <exception cref="ArgumentNullException">
239+
/// Thrown when <paramref name="coordinates"/> is <see langword="null"/>.
240+
/// </exception>
241+
public async ValueTask<TPolyline> EncodeAsync(
242+
IAsyncEnumerable<TCoordinate> coordinates,
243+
CancellationToken cancellationToken) {
244+
245+
if (coordinates is null) {
246+
throw new ArgumentNullException(nameof(coordinates));
247+
}
248+
249+
var list = new List<TCoordinate>();
250+
251+
await foreach (TCoordinate coordinate in coordinates.WithCancellation(cancellationToken).ConfigureAwait(false)) {
252+
list.Add(coordinate);
253+
}
254+
255+
return Encode(list);
256+
}
257+
258+
/// <summary>
259+
/// Asynchronously encodes a sequence of geographic coordinates and writes the encoded polyline bytes directly
260+
/// into <paramref name="writer"/> with zero intermediate allocations.
261+
/// </summary>
262+
/// <remarks>
263+
/// Each coordinate pair is encoded directly into the <see cref="PipeWriter"/>'s buffer using
264+
/// <see cref="PolylineEncoding.WriteValue(int, System.Buffers.IBufferWriter{byte})"/>,
265+
/// avoiding intermediate string or character-array allocations. The writer is flushed periodically
266+
/// but is not completed by this method.
267+
/// </remarks>
268+
/// <param name="coordinates">
269+
/// The asynchronous collection of <typeparamref name="TCoordinate"/> instances to encode.
270+
/// </param>
271+
/// <param name="writer">
272+
/// The <see cref="PipeWriter"/> to which the encoded bytes are written.
273+
/// </param>
274+
/// <param name="cancellationToken">
275+
/// A <see cref="CancellationToken"/> to observe while iterating coordinates.
276+
/// </param>
277+
/// <returns>
278+
/// A <see cref="ValueTask"/> representing the asynchronous encode-and-write operation.
279+
/// </returns>
280+
/// <exception cref="ArgumentNullException">
281+
/// Thrown when <paramref name="coordinates"/> or <paramref name="writer"/> is <see langword="null"/>.
282+
/// </exception>
283+
public async ValueTask EncodeAsync(
284+
IAsyncEnumerable<TCoordinate> coordinates,
285+
PipeWriter writer,
286+
CancellationToken cancellationToken) {
287+
288+
if (coordinates is null) {
289+
throw new ArgumentNullException(nameof(coordinates));
290+
}
291+
292+
if (writer is null) {
293+
throw new ArgumentNullException(nameof(writer));
294+
}
295+
296+
CoordinateVariance variance = new();
297+
298+
await foreach (TCoordinate coordinate in coordinates.WithCancellation(cancellationToken).ConfigureAwait(false)) {
299+
variance.Next(
300+
PolylineEncoding.Normalize(GetLatitude(coordinate), CoordinateValueType.Latitude),
301+
PolylineEncoding.Normalize(GetLongitude(coordinate), CoordinateValueType.Longitude));
302+
303+
PolylineEncoding.WriteValue(variance.Latitude, writer);
304+
PolylineEncoding.WriteValue(variance.Longitude, writer);
305+
}
306+
307+
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
308+
}
220309
}
221310

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//
2+
// Copyright © Pete Sramek. All rights reserved.
3+
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4+
//
5+
6+
namespace PolylineAlgorithm.Abstraction;
7+
8+
using System.Collections.Generic;
9+
using System.Threading;
10+
11+
/// <summary>
12+
/// Defines a contract for asynchronously decoding an encoded polyline into a sequence of geographic coordinates.
13+
/// </summary>
14+
public interface IAsyncPolylineDecoder<TPolyline, TCoordinate> {
15+
/// <summary>
16+
/// Asynchronously decodes the specified encoded polyline into a sequence of geographic coordinates.
17+
/// </summary>
18+
/// <param name="polyline">
19+
/// The <typeparamref name="TPolyline"/> instance containing the encoded polyline string to decode.
20+
/// </param>
21+
/// <param name="cancellationToken">
22+
/// A <see cref="CancellationToken"/> to observe while waiting for the task to complete.
23+
/// </param>
24+
/// <returns>
25+
/// An <see cref="IAsyncEnumerable{T}"/> of <typeparamref name="TCoordinate"/> representing the decoded
26+
/// latitude and longitude pairs, streamed asynchronously.
27+
/// </returns>
28+
IAsyncEnumerable<TCoordinate> DecodeAsync(TPolyline polyline, CancellationToken cancellationToken);
29+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//
2+
// Copyright © Pete Sramek. All rights reserved.
3+
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4+
//
5+
6+
namespace PolylineAlgorithm.Abstraction;
7+
8+
using System.Collections.Generic;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
/// <summary>
13+
/// Defines a contract for asynchronously encoding a sequence of geographic coordinates into an encoded polyline.
14+
/// </summary>
15+
public interface IAsyncPolylineEncoder<TCoordinate, TPolyline> {
16+
/// <summary>
17+
/// Asynchronously encodes a sequence of geographic coordinates into an encoded polyline representation.
18+
/// </summary>
19+
/// <param name="coordinates">
20+
/// The asynchronous collection of <typeparamref name="TCoordinate"/> instances to encode into a polyline.
21+
/// </param>
22+
/// <param name="cancellationToken">
23+
/// A <see cref="CancellationToken"/> to observe while waiting for the task to complete.
24+
/// </param>
25+
/// <returns>
26+
/// A <see cref="ValueTask{TResult}"/> that represents the asynchronous operation, containing a
27+
/// <typeparamref name="TPolyline"/> with the encoded polyline string that represents the input coordinates.
28+
/// </returns>
29+
ValueTask<TPolyline> EncodeAsync(IAsyncEnumerable<TCoordinate> coordinates, CancellationToken cancellationToken);
30+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//
2+
// Copyright © Pete Sramek. All rights reserved.
3+
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4+
//
5+
6+
namespace PolylineAlgorithm.Abstraction;
7+
8+
using System.Collections.Generic;
9+
using System.IO.Pipelines;
10+
using System.Threading;
11+
12+
/// <summary>
13+
/// Defines a contract for zero-allocation decoding of an encoded polyline streamed from a <see cref="PipeReader"/>
14+
/// into a sequence of geographic coordinates.
15+
/// </summary>
16+
/// <remarks>
17+
/// Implementations operate directly on pipe buffers to avoid intermediate string or character-array allocations,
18+
/// making this interface well-suited for high-throughput or memory-constrained scenarios where the encoded
19+
/// polyline arrives over a network stream or another <see cref="System.IO.Pipelines.IDuplexPipe"/> source.
20+
/// </remarks>
21+
public interface IPolylinePipeDecoder<TCoordinate> {
22+
/// <summary>
23+
/// Asynchronously decodes encoded polyline bytes read from <paramref name="reader"/> into a sequence of
24+
/// geographic coordinates, operating with zero intermediate allocations.
25+
/// </summary>
26+
/// <param name="reader">
27+
/// The <see cref="PipeReader"/> from which the encoded polyline bytes are consumed.
28+
/// The reader is not completed by this method; the caller is responsible for its lifetime.
29+
/// </param>
30+
/// <param name="cancellationToken">
31+
/// A <see cref="CancellationToken"/> to observe while waiting for the task to complete.
32+
/// </param>
33+
/// <returns>
34+
/// An <see cref="IAsyncEnumerable{T}"/> of <typeparamref name="TCoordinate"/> representing the decoded
35+
/// latitude and longitude pairs, streamed asynchronously as they become available from the pipe.
36+
/// </returns>
37+
IAsyncEnumerable<TCoordinate> DecodeAsync(PipeReader reader, CancellationToken cancellationToken);
38+
}

0 commit comments

Comments
 (0)