Skip to content

Commit 8a62364

Browse files
Previous test pass after adding timeout code.
1 parent ee55bd0 commit 8a62364

File tree

5 files changed

+109
-48
lines changed

5 files changed

+109
-48
lines changed
Lines changed: 80 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Diagnostics.Contracts;
45
using System.Threading;
56
using System.Threading.Channels;
@@ -20,7 +21,12 @@ public class BatchingChannelReader<T> : BufferingChannelReader<T, List<T>>
2021
/// Constructs a BatchingChannelReader.
2122
/// Use the .Batch extension instead of constructing this directly.
2223
/// </summary>
23-
public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false) : base(source, singleReader, syncCont)
24+
public BatchingChannelReader(
25+
ChannelReader<T> source,
26+
int batchSize,
27+
bool singleReader,
28+
bool syncCont = false)
29+
: base(source, singleReader, syncCont)
2430
{
2531
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Must be at least 1.");
2632
Contract.EndContractBlock();
@@ -32,31 +38,59 @@ public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool single
3238
/// If no full batch is waiting, will force buffering any batch that has at least one item.
3339
/// Returns true if anything was added to the buffer.
3440
/// </summary>
35-
public bool ForceBatch()
41+
public bool ForceBatch() => TryPipeItems(true);
42+
43+
long _timeout = -1;
44+
Timer? _timer;
45+
46+
/// <summary>
47+
/// Specifies a timeout by which a batch will be emmited there is at least one item but has been waiting
48+
/// for longer than the timeout value.
49+
/// </summary>
50+
/// <param name="millisecondsTimeout">
51+
/// The timeout value where after a batch is forced.<br/>
52+
/// A value of zero or less cancels/clears any timeout.
53+
/// </param>
54+
/// <returns>The current reader.</returns>
55+
public BatchingChannelReader<T> WithTimeout(long millisecondsTimeout)
3656
{
37-
if (Buffer is null || Buffer.Reader.Completion.IsCompleted) return false;
38-
if (TryPipeItems()) return true;
39-
if (_batch is null) return false;
57+
_timeout = millisecondsTimeout <= 0 ? Timeout.Infinite : millisecondsTimeout;
4058

41-
lock (Buffer)
59+
if (Buffer is null || Buffer.Reader.Completion.IsCompleted)
60+
return this;
61+
62+
if (_timeout == Timeout.Infinite)
4263
{
43-
if (Buffer.Reader.Completion.IsCompleted) return false;
44-
if (TryPipeItems()) return true;
45-
List<T>? c = _batch;
46-
if (c is null || Buffer.Reader.Completion.IsCompleted)
47-
return false;
48-
c.TrimExcess();
49-
_batch = null;
50-
return Buffer.Writer.TryWrite(c); // Should always be true at this point.
64+
Interlocked.Exchange(ref _timer, null)?.Dispose();
65+
return this;
5166
}
67+
68+
LazyInitializer.EnsureInitialized(ref _timer,
69+
() => new Timer(obj => ForceBatch()));
70+
71+
return this;
5272
}
5373

74+
/// <param name="timeout">
75+
/// The timeout value where after a batch is forced.<br/>
76+
/// A value of zero or less cancels/clears any timeout.<br/>
77+
/// Note: Values are converted to milliseconds.
78+
/// </param>
79+
/// <inheritdoc cref="WithTimeout(long)"/>
80+
public BatchingChannelReader<T> WithTimeout(TimeSpan timeout)
81+
=> WithTimeout(TimeSpan.FromMilliseconds(timeout.TotalMilliseconds));
82+
5483
/// <inheritdoc />
55-
protected override bool TryPipeItems()
84+
protected override void OnBeforeFinalFlush()
85+
=> Interlocked.Exchange(ref _timer, null)?.Dispose();
86+
87+
/// <inheritdoc />
88+
protected override bool TryPipeItems(bool flush)
5689
{
5790
if (Buffer is null || Buffer.Reader.Completion.IsCompleted)
5891
return false;
5992

93+
var batched = false;
6094
lock (Buffer)
6195
{
6296
if (Buffer.Reader.Completion.IsCompleted) return false;
@@ -65,37 +99,50 @@ protected override bool TryPipeItems()
6599
ChannelReader<T>? source = Source;
66100
if (source is null || source.Completion.IsCompleted)
67101
{
68-
// All finished, release the last batch to the buffer.
102+
// All finished, if necessary, release the last batch to the buffer.
69103
if (c is null) return false;
70-
71-
c.TrimExcess();
72-
_batch = null;
73-
74-
Buffer.Writer.TryWrite(c);
75-
return true;
104+
goto flushBatch;
76105
}
77106

78107
while (source.TryRead(out T? item))
79108
{
80109
if (c is null) _batch = c = new List<T>(_batchSize) { item };
81110
else c.Add(item);
82111

83-
if (c.Count == _batchSize)
84-
{
85-
_batch = null;
86-
Buffer.Writer.TryWrite(c);
87-
return true;
88-
}
112+
Debug.Assert(c.Count <= _batchSize);
113+
if (c.Count != _batchSize) continue; // should never be greater.
114+
115+
_batch = null; // _batch should always have at least 1 item in it.
116+
batched = Buffer.Writer.TryWrite(c);
117+
Debug.Assert(batched);
118+
c = null;
89119
}
90120

91-
return false;
121+
if (!flush || c is null)
122+
goto finalizeTimer;
123+
124+
flushBatch:
125+
126+
c.TrimExcess();
127+
_batch = null;
128+
129+
batched = Buffer.Writer.TryWrite(c);
130+
Debug.Assert(batched);
131+
132+
finalizeTimer:
133+
134+
var ok = _timer?.Change(_batch is null ? Timeout.Infinite : _timeout, 0);
135+
Debug.Assert(ok ?? true);
136+
137+
return batched;
92138
}
93139
}
94140

95141
/// <inheritdoc />
96-
protected override async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> bufferWait, CancellationToken cancellationToken)
142+
protected override async ValueTask<bool> WaitToReadAsyncCore(
143+
ValueTask<bool> bufferWait,
144+
CancellationToken cancellationToken)
97145
{
98-
99146
ChannelReader<T>? source = Source;
100147
if (source is null) return await bufferWait.ConfigureAwait(false);
101148

@@ -108,7 +155,7 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> buf
108155
if (b.IsCompleted) return await b.ConfigureAwait(false);
109156

110157
ValueTask<bool> s = source.WaitToReadAsync(token);
111-
if (s.IsCompleted && !b.IsCompleted) TryPipeItems();
158+
if (s.IsCompleted && !b.IsCompleted) TryPipeItems(false);
112159

113160
if (b.IsCompleted)
114161
{
@@ -123,7 +170,7 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> buf
123170
return await b.ConfigureAwait(false);
124171
}
125172

126-
TryPipeItems();
173+
TryPipeItems(false);
127174
goto start;
128175
}
129176
}

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ protected BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, b
4444

4545
source.Completion.ContinueWith(t =>
4646
{
47+
OnBeforeFinalFlush();
4748
// Need to be sure writing is done before we continue...
4849
lock (Buffer)
4950
{
50-
while (TryPipeItems()) { }
51+
TryPipeItems(true);
5152
Buffer.Writer.Complete(t.Exception);
5253
}
5354

@@ -56,28 +57,38 @@ protected BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, b
5657
}
5758
}
5859

60+
/// <summary>
61+
/// Called before the last items are flushed to the buffer.
62+
/// </summary>
63+
protected virtual void OnBeforeFinalFlush()
64+
{ }
65+
5966
private readonly Task _completion;
6067
/// <inheritdoc />
6168
public override Task Completion => _completion;
6269

6370
/// <summary>
6471
/// The method that triggers adding entries to the buffer.
6572
/// </summary>
66-
/// <returns></returns>
67-
protected abstract bool TryPipeItems();
73+
/// <param name="flush">Signals that all items should be piped.</param>
74+
/// <returns>True if items were transferred.</returns>
75+
protected abstract bool TryPipeItems(bool flush);
6876

6977
/// <inheritdoc />
7078
public override bool TryRead(out TOut item)
7179
{
72-
if (Buffer is not null) do
80+
if (Buffer is not null)
81+
{
82+
do
7383
{
7484
if (Buffer.Reader.TryRead(out TOut? i))
7585
{
7686
item = i;
7787
return true;
7888
}
7989
}
80-
while (TryPipeItems());
90+
while (TryPipeItems(false));
91+
}
8192

8293
item = default!;
8394
return false;
@@ -113,7 +124,7 @@ protected virtual async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> buff
113124
if (bufferWait.IsCompleted) return await bufferWait.ConfigureAwait(false);
114125

115126
ValueTask<bool> s = source.WaitToReadAsync(token);
116-
if (s.IsCompleted && !bufferWait.IsCompleted) TryPipeItems();
127+
if (s.IsCompleted && !bufferWait.IsCompleted) TryPipeItems(false);
117128

118129
if (bufferWait.IsCompleted)
119130
{
@@ -126,7 +137,7 @@ protected virtual async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> buff
126137
tokenSource.Cancel();
127138
return await bufferWait.ConfigureAwait(false);
128139
}
129-
TryPipeItems();
140+
TryPipeItems(false);
130141

131142
goto start;
132143
}

Open.ChannelExtensions/Extensions.Batch.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,22 @@ namespace Open.ChannelExtensions;
55

66
public static partial class Extensions
77
{
8-
9-
108
/// <summary>
119
/// Batches results into the batch size provided with a max capacity of batches.
1210
/// </summary>
1311
/// <typeparam name="T">The output type of the source channel.</typeparam>
1412
/// <param name="source">The channel to read from.</param>
15-
/// <param name="batchSize">The maximum size of each batch.</param>
13+
/// <param name="batchSize">
14+
/// The maximum size of each batch.
15+
/// Note: setting this value sets the capacity of each batch (reserves memory).
16+
/// </param>
1617
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
1718
/// <param name="allowSynchronousContinuations">True can reduce the amount of scheduling and markedly improve performance, but may produce unexpected or even undesirable behavior.</param>
1819
/// <returns>A channel reader containing the batches.</returns>
19-
public static BatchingChannelReader<T> Batch<T>(this ChannelReader<T> source, int batchSize, bool singleReader = false, bool allowSynchronousContinuations = false)
20+
public static BatchingChannelReader<T> Batch<T>(
21+
this ChannelReader<T> source,
22+
int batchSize,
23+
bool singleReader = false,
24+
bool allowSynchronousContinuations = false)
2025
=> new(source ?? throw new ArgumentNullException(nameof(source)), batchSize, singleReader, allowSynchronousContinuations);
2126
}

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public JoiningChannelReader(ChannelReader<TList> source, bool singleReader) : ba
1414
{
1515
}
1616

17-
protected override bool TryPipeItems()
17+
protected override bool TryPipeItems(bool _)
1818
{
1919
ChannelReader<TList>? source = Source;
2020
if (source is null

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,7 @@ public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
6565
if (target is null) throw new ArgumentNullException(nameof(target));
6666
Contract.EndContractBlock();
6767

68-
#pragma warning disable CA2012 // Not awaited because it's run by the scheduler and no need to incur the .AsTask() call.
69-
_ = PipeTo(source, target.Writer, true, cancellationToken);
70-
#pragma warning restore CA2012
68+
Task.Run(()=>PipeTo(source, target.Writer, true, cancellationToken));
7169

7270
return target.Reader;
7371
}

0 commit comments

Comments
 (0)