Skip to content

Commit 381d59b

Browse files
Final tweaks before release.
1 parent 8a62364 commit 381d59b

File tree

5 files changed

+162
-29
lines changed

5 files changed

+162
-29
lines changed

Open.ChannelExtensions.ComparisonTests/Open.ChannelExtensions.ComparisonTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
5-
<TargetFramework>net5.0</TargetFramework>
5+
<TargetFramework>net6.0</TargetFramework>
66
</PropertyGroup>
77

88
<ItemGroup>

Open.ChannelExtensions.Tests/BatchTests.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,84 @@ public static async Task ForceBatchTest2()
190190
await Task.Delay(500);
191191
}));
192192
}
193+
194+
[Fact]
195+
public static async Task TimeoutTest0()
196+
{
197+
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
198+
BatchingChannelReader<int> reader = c.Reader.Batch(10).WithTimeout(500);
199+
var complete = false;
200+
_ = Task.Run(async () =>
201+
{
202+
for (var i = 0; i < 5; i++)
203+
{
204+
c.Writer.TryWrite(i);
205+
}
206+
207+
await Task.Delay(1000);
208+
complete = true;
209+
c.Writer.Complete();
210+
});
211+
212+
using var tokenSource = new CancellationTokenSource(6000);
213+
Assert.Equal(1, await reader.ReadAllAsync(tokenSource.Token, async (batch, i) =>
214+
{
215+
switch (i)
216+
{
217+
case 0:
218+
Assert.Equal(5, batch.Count);
219+
Assert.False(complete);
220+
break;
221+
222+
default:
223+
throw new Exception("Shouldn't arrive here.");
224+
}
225+
await Task.Delay(100);
226+
}));
227+
}
228+
229+
230+
[Fact]
231+
public static async Task TimeoutTest1()
232+
{
233+
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
234+
BatchingChannelReader<int> reader = c.Reader.Batch(10).WithTimeout(500);
235+
_ = Task.Run(async () =>
236+
{
237+
for(var i = 0;i<15;i++)
238+
{
239+
c.Writer.TryWrite(i);
240+
}
241+
242+
await Task.Delay(1000);
243+
244+
for (var i = 0; i < 15; i++)
245+
{
246+
c.Writer.TryWrite(i);
247+
}
248+
c.Writer.Complete();
249+
});
250+
251+
using var tokenSource = new CancellationTokenSource(6000);
252+
Assert.Equal(4, await reader.ReadAllAsync(tokenSource.Token, async (batch, i) =>
253+
{
254+
switch (i)
255+
{
256+
case 0:
257+
case 2:
258+
Assert.Equal(10, batch.Count);
259+
break;
260+
case 1:
261+
case 3:
262+
Assert.Equal(5, batch.Count);
263+
break;
264+
265+
default:
266+
throw new Exception("Shouldn't arrive here.");
267+
}
268+
await Task.Delay(100);
269+
}));
270+
}
271+
193272
}
273+

Open.ChannelExtensions/BatchingChannelReader.cs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,12 @@ protected override bool TryPipeItems(bool flush)
9090
if (Buffer is null || Buffer.Reader.Completion.IsCompleted)
9191
return false;
9292

93-
var batched = false;
9493
lock (Buffer)
9594
{
9695
if (Buffer.Reader.Completion.IsCompleted) return false;
9796

97+
var batched = false;
98+
var newBatch = false;
9899
List<T>? c = _batch;
99100
ChannelReader<T>? source = Source;
100101
if (source is null || source.Completion.IsCompleted)
@@ -106,35 +107,54 @@ protected override bool TryPipeItems(bool flush)
106107

107108
while (source.TryRead(out T? item))
108109
{
109-
if (c is null) _batch = c = new List<T>(_batchSize) { item };
110-
else c.Add(item);
110+
if (c is not null) c.Add(item);
111+
else
112+
{
113+
newBatch = true; // a new batch could start but not be emmited.
114+
_batch = c = new List<T>(_batchSize) { item };
115+
}
111116

112117
Debug.Assert(c.Count <= _batchSize);
113-
if (c.Count != _batchSize) continue; // should never be greater.
118+
var full = c.Count == _batchSize;
119+
while (!full && source.TryRead(out item))
120+
{
121+
c.Add(item);
122+
full = c.Count == _batchSize;
123+
}
114124

115-
_batch = null; // _batch should always have at least 1 item in it.
116-
batched = Buffer.Writer.TryWrite(c);
117-
Debug.Assert(batched);
118-
c = null;
125+
if (!full) break;
126+
127+
Emit(ref c);
119128
}
120129

121130
if (!flush || c is null)
122131
goto finalizeTimer;
123132

124-
flushBatch:
133+
flushBatch:
125134

126135
c.TrimExcess();
127-
_batch = null;
136+
Emit(ref c);
128137

129-
batched = Buffer.Writer.TryWrite(c);
130-
Debug.Assert(batched);
131-
132-
finalizeTimer:
133-
134-
var ok = _timer?.Change(_batch is null ? Timeout.Infinite : _timeout, 0);
135-
Debug.Assert(ok ?? true);
138+
finalizeTimer:
139+
140+
// Are we adding to the existing batch (active timeout) or did we create a new one?
141+
if (newBatch && _batch is not null)
142+
{
143+
var ok = _timer?.Change(_timeout, 0);
144+
Debug.Assert(ok ?? true);
145+
}
136146

137147
return batched;
148+
149+
void Emit(ref List<T>? c)
150+
{
151+
_batch = null;
152+
newBatch = false;
153+
if (!batched) _timer?.Change(Timeout.Infinite, 0); // Since we're emmitting one, let's ensure the timeout is cancelled.
154+
batched = Buffer!.Writer.TryWrite(c!);
155+
Debug.Assert(batched);
156+
c = null;
157+
}
138158
}
139159
}
140160

Open.ChannelExtensions/Documentation.xml

Lines changed: 35 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,27 @@
66
<EnableNETAnalyzers>true</EnableNETAnalyzers>
77
<Nullable>enable</Nullable>
88
<Authors>electricessence</Authors>
9-
<Description>A set of extensions for optimizing/simplifying System.Threading.Channels usage.
9+
<Description>
10+
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
1011

11-
Includes:
12-
ReadUntilCancelled, ReadAll, ReadAllConcurrently, WriteAll, WriteAllConcurrently, and Pipe operations.
12+
Includes:
13+
ReadUntilCancelled, ReadAll, ReadAllConcurrently, WriteAll, WriteAllConcurrently, and Pipe operations.
1314

14-
Part of the "Open" set of libraries.</Description>
15+
Part of the "Open" set of libraries.
16+
</Description>
1517
<PackageTags>channel;channels;channel reader;channel writer;threading;tasks;extensions;async</PackageTags>
1618
<Copyright>© electricessence (Oren F.) All rights reserved.</Copyright>
1719
<PackageProjectUrl>https://github.com/Open-NET-Libraries/Open.ChannelExtensions</PackageProjectUrl>
1820
<RepositoryUrl>https://github.com/Open-NET-Libraries/Open.ChannelExtensions</RepositoryUrl>
1921
<RepositoryType>git</RepositoryType>
2022
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
21-
<Version>5.0.0</Version>
22-
<PackageReleaseNotes>Updated to System.Threading.Channels and set verson to match.</PackageReleaseNotes>
23+
<Version>5.1.0</Version>
24+
<PackageReleaseNotes>Added .WithTimeout(milliseconds) method for BatchChannelReader.</PackageReleaseNotes>
2325
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2426
<PublishRepositoryUrl>true</PublishRepositoryUrl>
2527
<IncludeSymbols>true</IncludeSymbols>
2628
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
27-
<PackageIcon>logo.png</PackageIcon>
29+
<PackageIcon>logo.png</PackageIcon>
2830
</PropertyGroup>
2931

3032
<ItemGroup>

0 commit comments

Comments
 (0)