Skip to content

Commit 0464e10

Browse files
cotticursoragent
andauthored
Prevent deadlock on DiagnosticsCollector (#3348)
* Prevent deadlock on DiagnosticsCollector - Gate Channel.Write on a _readerStarted flag set inside the reader delegate, so writes are dropped when no reader will drain them (per Mpdreamz). - _readerStarted, not _started != null, drives IsStarted; Task.Run with an already-canceled token leaves _started non-null without ever running the delegate (per CodeRabbit). - StopAsync no longer needs a sync-drain branch — gating keeps the channel empty when no reader started. Keep a defensive Drain after awaiting the reader Task to mop up if it exited early via cancellation. - Drop the default IsStarted => true on IDiagnosticsCollector and require implementers to declare it (per reakaleek + Mpdreamz). - Add a regression test for instantiate-and-dispose with no emissions (per Mpdreamz). Co-authored-by: Cursor <cursoragent@cursor.com> * Set _readerStarted after WaitToWrite to require execution proof Per review: only flip the flag once the reader has cleared its initial setup, so it strictly means "the delegate executed past the first await". A canceled token or a Task.Run short-circuit leaves the flag false. Co-authored-by: Cursor <cursoragent@cursor.com> * Fix StartAsync/Write race that dropped diagnostics The Write gate (`if (_readerStarted) Channel.Write(...)`) had a scheduling race: between StartAsync returning and the Task.Run delegate hitting `await Channel.WaitToWrite`, any emitted diagnostic was silently dropped. This broke the F# authoring suite, which synchronously schedules the reader and emits diagnostics before the reader had a chance to flip the flag. - Write unconditionally — UnboundedChannel never blocks, and items in an undrained channel are GC'd with the collector. - StopAsync now gates on `_started is null` (was StartAsync ever called?) rather than `_readerStarted` (is the reader executing?). Once StartAsync was called with a non-canceled token, awaiting `_started` is always meaningful: it completes when the reader exits after draining the channel. - IsStarted still backed by `_readerStarted` for accurate introspection (execution proof, per earlier review feedback). - Drop the leftover //TODO in DiagnosticsChannel.Write. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 20b4b96 commit 0464e10

4 files changed

Lines changed: 134 additions & 21 deletions

File tree

src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public sealed class DiagnosticsChannel : IDisposable
1010
{
1111
private readonly Channel<Diagnostic> _channel;
1212
private readonly CancellationTokenSource _ctxSource;
13+
1314
public ChannelReader<Diagnostic> Reader => _channel.Reader;
1415

1516
public Cancel CancellationToken => _ctxSource.Token;
@@ -33,14 +34,10 @@ public void TryComplete(Exception? exception = null)
3334

3435
public ValueTask<bool> WaitToWrite(Cancel ctx) => _channel.Writer.WaitToWriteAsync(ctx);
3536

36-
public void Write(Diagnostic diagnostic)
37-
{
38-
var written = _channel.Writer.TryWrite(diagnostic);
39-
if (!written)
40-
{
41-
//TODO
42-
}
43-
}
37+
// Unbounded channel: TryWrite only fails if the writer is completed, which
38+
// means a producer raced past StopAsync/DisposeAsync. Drop silently —
39+
// diagnostics must never throw back into the caller.
40+
public void Write(Diagnostic diagnostic) => _ = _channel.Writer.TryWrite(diagnostic);
4441

4542
public void Dispose() => _ctxSource.Dispose();
4643
}

src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See the LICENSE file in the project root for more information
44

55
using System.Collections.Concurrent;
6-
using System.Diagnostics;
76
using System.IO.Abstractions;
87

98
namespace Elastic.Documentation.Diagnostics;
@@ -21,6 +20,12 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output
2120
public int Hints => _hints;
2221

2322
private Task? _started;
23+
// True once the background reader delegate has actually begun executing.
24+
// _started becoming non-null is not enough: Task.Run short-circuits to a
25+
// canceled Task when given an already-canceled token, and the delegate
26+
// never runs. StopAsync uses this to decide whether awaiting _started is
27+
// meaningful or whether the channel is guaranteed to have no drainer.
28+
private volatile bool _readerStarted;
2429

2530
public HashSet<string> OffendingFiles { get; } = [];
2631

@@ -30,6 +35,8 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output
3035

3136
public bool NoHints { get; set; }
3237

38+
public bool IsStarted => _readerStarted;
39+
3340
public virtual DiagnosticsCollector StartAsync(Cancel ctx)
3441
{
3542
_ = EnsureStarted(ctx);
@@ -45,6 +52,7 @@ private Task EnsureStarted(Cancel cancellationToken)
4552
_started = Task.Run(async () =>
4653
{
4754
_ = await Channel.WaitToWrite(cancellationToken);
55+
_readerStarted = true;
4856
while (!Channel.CancellationToken.IsCancellationRequested)
4957
{
5058
try
@@ -61,18 +69,18 @@ private Task EnsureStarted(Cancel cancellationToken)
6169
Drain();
6270
}, cancellationToken);
6371
return _started;
72+
}
6473

65-
void Drain()
74+
private void Drain()
75+
{
76+
while (Channel.Reader.TryRead(out var item))
6677
{
67-
while (Channel.Reader.TryRead(out var item))
68-
{
69-
if (item.Severity == Severity.Hint && NoHints)
70-
continue;
71-
HandleItem(item);
72-
_ = OffendingFiles.Add(item.File);
73-
foreach (var output in outputs)
74-
output.Write(item);
75-
}
78+
if (item.Severity == Severity.Hint && NoHints)
79+
continue;
80+
HandleItem(item);
81+
_ = OffendingFiles.Add(item.File);
82+
foreach (var output in outputs)
83+
output.Write(item);
7684
}
7785
}
7886

@@ -91,9 +99,24 @@ protected virtual void HandleItem(Diagnostic diagnostic) { }
9199
public virtual async Task StopAsync(Cancel cancellationToken)
92100
{
93101
Channel.TryComplete();
94-
if (_started is not null)
102+
// StartAsync was never called. Items may sit in the channel but
103+
// nobody is coming to drain them — awaiting Channel.Reader.Completion
104+
// here would deadlock. Returning is the correct behaviour for
105+
// fire-and-forget collectors (the channel dies with the instance).
106+
if (_started is null)
107+
return;
108+
109+
try
110+
{
95111
await _started;
96-
await Channel.Reader.Completion;
112+
}
113+
catch (OperationCanceledException)
114+
{
115+
// Reader was canceled before its final Drain(); mop up below.
116+
}
117+
// Defensive: if the reader exited early via cancellation, items may
118+
// still be queued. Drain them synchronously so they're not lost.
119+
Drain();
97120
}
98121

99122
public void EmitCrossLink(string link) => CrossLinks.Add(link);

src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ public interface IDiagnosticsCollector : IAsyncDisposable
2020
HashSet<string> OffendingFiles { get; }
2121
ConcurrentDictionary<string, bool> InUseSubstitutionKeys { get; }
2222

23+
/// True once the background reader is actively draining the channel.
24+
bool IsStarted { get; }
25+
2326
Task StartAsync(Cancel cancellationToken);
2427
Task StopAsync(Cancel cancellationToken);
2528

@@ -51,6 +54,13 @@ public interface IDiagnosticsCollector : IAsyncDisposable
5154

5255
async Task WaitForDrain()
5356
{
57+
if (!IsStarted)
58+
{
59+
throw new InvalidOperationException(
60+
"WaitForDrain called on a collector that was never started; no reader is draining the channel. " +
61+
"Call StartAsync first or dispose the collector to drain synchronously.");
62+
}
63+
5464
var start = DateTime.UtcNow;
5565
while (Channel.Reader.TryPeek(out _))
5666
{
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using AwesomeAssertions;
6+
using Elastic.Documentation.Diagnostics;
7+
8+
namespace Elastic.Changelog.Tests.Changelogs;
9+
10+
public class DiagnosticsCollectorDisposeTests
11+
{
12+
private sealed class RecordingOutput : IDiagnosticsOutput
13+
{
14+
public List<Diagnostic> Items { get; } = [];
15+
public void Write(Diagnostic diagnostic) => Items.Add(diagnostic);
16+
}
17+
18+
private static async Task ShouldComplete(Task task, TimeSpan timeout, string because)
19+
{
20+
using var cts = new CancellationTokenSource(timeout);
21+
var completed = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cts.Token));
22+
completed.Should().BeSameAs(task, because);
23+
await task;
24+
}
25+
26+
// Regression: the changelog-scrubber lambda used a DiagnosticsCollector without calling
27+
// StartAsync. Emitting a diagnostic and then disposing deadlocked on
28+
// Channel.Reader.Completion because nothing was draining the channel,
29+
// causing the lambda to hit its 180s timeout.
30+
[Fact]
31+
public async Task DisposeAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
32+
{
33+
var output = new RecordingOutput();
34+
var collector = new DiagnosticsCollector([output]);
35+
collector.EmitWarning("file.yaml", "test warning that nobody is reading");
36+
37+
await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
38+
"DisposeAsync must not deadlock when StartAsync was never called");
39+
40+
collector.Warnings.Should().Be(1, "severity counters update regardless of reader state");
41+
collector.IsStarted.Should().BeFalse();
42+
collector.OffendingFiles.Should().BeEmpty("OffendingFiles is only populated by the background reader");
43+
output.Items.Should().BeEmpty("IDiagnosticsOutput sinks are only invoked by the background reader");
44+
}
45+
46+
[Fact]
47+
public async Task StopAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
48+
{
49+
var output = new RecordingOutput();
50+
var collector = new DiagnosticsCollector([output]);
51+
collector.EmitError("file.yaml", "test error that nobody is reading");
52+
53+
await ShouldComplete(collector.StopAsync(CancellationToken.None), TimeSpan.FromSeconds(5),
54+
"StopAsync must not deadlock when StartAsync was never called");
55+
56+
collector.Errors.Should().Be(1);
57+
collector.IsStarted.Should().BeFalse();
58+
output.Items.Should().BeEmpty();
59+
}
60+
61+
[Fact]
62+
public async Task DisposeAsync_WithoutStartAsyncAndNoEmissions_DoesNotHang()
63+
{
64+
var collector = new DiagnosticsCollector([]);
65+
66+
await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
67+
"Instantiate-and-dispose with no emissions must be a no-op");
68+
69+
collector.IsStarted.Should().BeFalse();
70+
collector.Warnings.Should().Be(0);
71+
collector.Errors.Should().Be(0);
72+
}
73+
74+
[Fact]
75+
public async Task WaitForDrain_WithoutStartAsync_ThrowsImmediately()
76+
{
77+
var collector = new DiagnosticsCollector([]);
78+
collector.EmitWarning(string.Empty, "queued");
79+
80+
Func<Task> act = () => ((IDiagnosticsCollector)collector).WaitForDrain();
81+
_ = await act.Should().ThrowAsync<InvalidOperationException>();
82+
}
83+
}

0 commit comments

Comments
 (0)