diff --git a/src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs b/src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs index d26bb57ec..e1b2b0c3a 100644 --- a/src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs +++ b/src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs @@ -10,6 +10,7 @@ public sealed class DiagnosticsChannel : IDisposable { private readonly Channel _channel; private readonly CancellationTokenSource _ctxSource; + public ChannelReader Reader => _channel.Reader; public Cancel CancellationToken => _ctxSource.Token; @@ -33,14 +34,10 @@ public void TryComplete(Exception? exception = null) public ValueTask WaitToWrite(Cancel ctx) => _channel.Writer.WaitToWriteAsync(ctx); - public void Write(Diagnostic diagnostic) - { - var written = _channel.Writer.TryWrite(diagnostic); - if (!written) - { - //TODO - } - } + // Unbounded channel: TryWrite only fails if the writer is completed, which + // means a producer raced past StopAsync/DisposeAsync. Drop silently — + // diagnostics must never throw back into the caller. + public void Write(Diagnostic diagnostic) => _ = _channel.Writer.TryWrite(diagnostic); public void Dispose() => _ctxSource.Dispose(); } diff --git a/src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs b/src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs index 156fc94e6..4c8377b26 100644 --- a/src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs +++ b/src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information using System.Collections.Concurrent; -using System.Diagnostics; using System.IO.Abstractions; namespace Elastic.Documentation.Diagnostics; @@ -21,6 +20,12 @@ public class DiagnosticsCollector(IReadOnlyCollection output public int Hints => _hints; private Task? _started; + // True once the background reader delegate has actually begun executing. + // _started becoming non-null is not enough: Task.Run short-circuits to a + // canceled Task when given an already-canceled token, and the delegate + // never runs. StopAsync uses this to decide whether awaiting _started is + // meaningful or whether the channel is guaranteed to have no drainer. + private volatile bool _readerStarted; public HashSet OffendingFiles { get; } = []; @@ -30,6 +35,8 @@ public class DiagnosticsCollector(IReadOnlyCollection output public bool NoHints { get; set; } + public bool IsStarted => _readerStarted; + public virtual DiagnosticsCollector StartAsync(Cancel ctx) { _ = EnsureStarted(ctx); @@ -45,6 +52,7 @@ private Task EnsureStarted(Cancel cancellationToken) _started = Task.Run(async () => { _ = await Channel.WaitToWrite(cancellationToken); + _readerStarted = true; while (!Channel.CancellationToken.IsCancellationRequested) { try @@ -61,18 +69,18 @@ private Task EnsureStarted(Cancel cancellationToken) Drain(); }, cancellationToken); return _started; + } - void Drain() + private void Drain() + { + while (Channel.Reader.TryRead(out var item)) { - while (Channel.Reader.TryRead(out var item)) - { - if (item.Severity == Severity.Hint && NoHints) - continue; - HandleItem(item); - _ = OffendingFiles.Add(item.File); - foreach (var output in outputs) - output.Write(item); - } + if (item.Severity == Severity.Hint && NoHints) + continue; + HandleItem(item); + _ = OffendingFiles.Add(item.File); + foreach (var output in outputs) + output.Write(item); } } @@ -91,9 +99,24 @@ protected virtual void HandleItem(Diagnostic diagnostic) { } public virtual async Task StopAsync(Cancel cancellationToken) { Channel.TryComplete(); - if (_started is not null) + // StartAsync was never called. Items may sit in the channel but + // nobody is coming to drain them — awaiting Channel.Reader.Completion + // here would deadlock. Returning is the correct behaviour for + // fire-and-forget collectors (the channel dies with the instance). + if (_started is null) + return; + + try + { await _started; - await Channel.Reader.Completion; + } + catch (OperationCanceledException) + { + // Reader was canceled before its final Drain(); mop up below. + } + // Defensive: if the reader exited early via cancellation, items may + // still be queued. Drain them synchronously so they're not lost. + Drain(); } public void EmitCrossLink(string link) => CrossLinks.Add(link); diff --git a/src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs b/src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs index 7a8203fa9..bdd4adfe1 100644 --- a/src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs +++ b/src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs @@ -20,6 +20,9 @@ public interface IDiagnosticsCollector : IAsyncDisposable HashSet OffendingFiles { get; } ConcurrentDictionary InUseSubstitutionKeys { get; } + /// True once the background reader is actively draining the channel. + bool IsStarted { get; } + Task StartAsync(Cancel cancellationToken); Task StopAsync(Cancel cancellationToken); @@ -51,6 +54,13 @@ public interface IDiagnosticsCollector : IAsyncDisposable async Task WaitForDrain() { + if (!IsStarted) + { + throw new InvalidOperationException( + "WaitForDrain called on a collector that was never started; no reader is draining the channel. " + + "Call StartAsync first or dispose the collector to drain synchronously."); + } + var start = DateTime.UtcNow; while (Channel.Reader.TryPeek(out _)) { diff --git a/tests/Elastic.Changelog.Tests/Changelogs/DiagnosticsCollectorDisposeTests.cs b/tests/Elastic.Changelog.Tests/Changelogs/DiagnosticsCollectorDisposeTests.cs new file mode 100644 index 000000000..f5649b25a --- /dev/null +++ b/tests/Elastic.Changelog.Tests/Changelogs/DiagnosticsCollectorDisposeTests.cs @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using AwesomeAssertions; +using Elastic.Documentation.Diagnostics; + +namespace Elastic.Changelog.Tests.Changelogs; + +public class DiagnosticsCollectorDisposeTests +{ + private sealed class RecordingOutput : IDiagnosticsOutput + { + public List Items { get; } = []; + public void Write(Diagnostic diagnostic) => Items.Add(diagnostic); + } + + private static async Task ShouldComplete(Task task, TimeSpan timeout, string because) + { + using var cts = new CancellationTokenSource(timeout); + var completed = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cts.Token)); + completed.Should().BeSameAs(task, because); + await task; + } + + // Regression: the changelog-scrubber lambda used a DiagnosticsCollector without calling + // StartAsync. Emitting a diagnostic and then disposing deadlocked on + // Channel.Reader.Completion because nothing was draining the channel, + // causing the lambda to hit its 180s timeout. + [Fact] + public async Task DisposeAsync_WithoutStartAsyncAfterEmit_DoesNotHang() + { + var output = new RecordingOutput(); + var collector = new DiagnosticsCollector([output]); + collector.EmitWarning("file.yaml", "test warning that nobody is reading"); + + await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5), + "DisposeAsync must not deadlock when StartAsync was never called"); + + collector.Warnings.Should().Be(1, "severity counters update regardless of reader state"); + collector.IsStarted.Should().BeFalse(); + collector.OffendingFiles.Should().BeEmpty("OffendingFiles is only populated by the background reader"); + output.Items.Should().BeEmpty("IDiagnosticsOutput sinks are only invoked by the background reader"); + } + + [Fact] + public async Task StopAsync_WithoutStartAsyncAfterEmit_DoesNotHang() + { + var output = new RecordingOutput(); + var collector = new DiagnosticsCollector([output]); + collector.EmitError("file.yaml", "test error that nobody is reading"); + + await ShouldComplete(collector.StopAsync(CancellationToken.None), TimeSpan.FromSeconds(5), + "StopAsync must not deadlock when StartAsync was never called"); + + collector.Errors.Should().Be(1); + collector.IsStarted.Should().BeFalse(); + output.Items.Should().BeEmpty(); + } + + [Fact] + public async Task DisposeAsync_WithoutStartAsyncAndNoEmissions_DoesNotHang() + { + var collector = new DiagnosticsCollector([]); + + await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5), + "Instantiate-and-dispose with no emissions must be a no-op"); + + collector.IsStarted.Should().BeFalse(); + collector.Warnings.Should().Be(0); + collector.Errors.Should().Be(0); + } + + [Fact] + public async Task WaitForDrain_WithoutStartAsync_ThrowsImmediately() + { + var collector = new DiagnosticsCollector([]); + collector.EmitWarning(string.Empty, "queued"); + + Func act = () => ((IDiagnosticsCollector)collector).WaitForDrain(); + _ = await act.Should().ThrowAsync(); + } +}