Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public sealed class DiagnosticsChannel : IDisposable
{
private readonly Channel<Diagnostic> _channel;
private readonly CancellationTokenSource _ctxSource;

public ChannelReader<Diagnostic> Reader => _channel.Reader;

public Cancel CancellationToken => _ctxSource.Token;
Expand All @@ -33,6 +34,8 @@ public void TryComplete(Exception? exception = null)

public ValueTask<bool> WaitToWrite(Cancel ctx) => _channel.Writer.WaitToWriteAsync(ctx);

// Unbounded channel: TryWrite only fails if the writer is completed, which
// means a producer raced past StopAsync/DisposeAsync.
public void Write(Diagnostic diagnostic)
{
var written = _channel.Writer.TryWrite(diagnostic);
Expand Down
56 changes: 41 additions & 15 deletions src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information

using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO.Abstractions;

namespace Elastic.Documentation.Diagnostics;
Expand All @@ -21,6 +20,13 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output
public int Hints => _hints;

private Task? _started;
// True once the background reader has passed its initial WaitToWrite setup
// and is about to enter the read loop. _started becoming non-null is not
// enough: Task.Run short-circuits to a canceled Task when given an
// already-canceled token, never running the delegate. Setting the flag
// only after WaitToWrite means we know the reader will actually drain
// the channel before we let writes accumulate in it.
private volatile bool _readerStarted;

public HashSet<string> OffendingFiles { get; } = [];

Expand All @@ -30,6 +36,8 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output

public bool NoHints { get; set; }

public bool IsStarted => _readerStarted;

public virtual DiagnosticsCollector StartAsync(Cancel ctx)
{
_ = EnsureStarted(ctx);
Expand All @@ -45,6 +53,7 @@ private Task EnsureStarted(Cancel cancellationToken)
_started = Task.Run(async () =>
{
_ = await Channel.WaitToWrite(cancellationToken);
_readerStarted = true;
while (!Channel.CancellationToken.IsCancellationRequested)
{
try
Expand All @@ -61,18 +70,18 @@ private Task EnsureStarted(Cancel cancellationToken)
Drain();
}, cancellationToken);
return _started;
}

void Drain()
private void Drain()
{
while (Channel.Reader.TryRead(out var item))
{
while (Channel.Reader.TryRead(out var item))
{
if (item.Severity == Severity.Hint && NoHints)
continue;
HandleItem(item);
_ = OffendingFiles.Add(item.File);
foreach (var output in outputs)
output.Write(item);
}
if (item.Severity == Severity.Hint && NoHints)
continue;
HandleItem(item);
_ = OffendingFiles.Add(item.File);
foreach (var output in outputs)
output.Write(item);
}
}

Expand All @@ -91,17 +100,34 @@ protected virtual void HandleItem(Diagnostic diagnostic) { }
public virtual async Task StopAsync(Cancel cancellationToken)
{
Channel.TryComplete();
if (_started is not null)
await _started;
await Channel.Reader.Completion;
// No reader was ever scheduled, so Write() gated everything out and the
// channel is empty — nothing to await, nothing to drain.
if (!_readerStarted)
return;

try
{
await _started!;
}
catch (OperationCanceledException)
{
// Reader was canceled before its final Drain(); mop up below.
}
// Defensive: if the reader exited early via cancellation, items may
// still be queued. Drain them synchronously so they're not lost.
Drain();
}

public void EmitCrossLink(string link) => CrossLinks.Add(link);

public virtual void Write(Diagnostic diagnostic)
{
IncrementSeverityCount(diagnostic);
Channel.Write(diagnostic);
// Severity counters are always accurate; the channel only matters if a
// reader will actually drain it. Skip the channel write otherwise so
// items don't accumulate and StopAsync has nothing to wait for.
if (_readerStarted)
Channel.Write(diagnostic);
}

public void Emit(Severity severity, string file, string message) =>
Expand Down
10 changes: 10 additions & 0 deletions src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public interface IDiagnosticsCollector : IAsyncDisposable
HashSet<string> OffendingFiles { get; }
ConcurrentDictionary<string, bool> InUseSubstitutionKeys { get; }

/// True once the background reader is actively draining the channel.
bool IsStarted { get; }

Task StartAsync(Cancel cancellationToken);
Task StopAsync(Cancel cancellationToken);

Expand Down Expand Up @@ -51,6 +54,13 @@ public interface IDiagnosticsCollector : IAsyncDisposable

async Task WaitForDrain()
{
if (!IsStarted)
{
throw new InvalidOperationException(
"WaitForDrain called on a collector that was never started; no reader is draining the channel. " +
"Call StartAsync first or dispose the collector to drain synchronously.");
}

var start = DateTime.UtcNow;
while (Channel.Reader.TryPeek(out _))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using AwesomeAssertions;
using Elastic.Documentation.Diagnostics;

namespace Elastic.Changelog.Tests.Changelogs;

public class DiagnosticsCollectorDisposeTests
{
private sealed class RecordingOutput : IDiagnosticsOutput
{
public List<Diagnostic> Items { get; } = [];
public void Write(Diagnostic diagnostic) => Items.Add(diagnostic);
}

private static async Task ShouldComplete(Task task, TimeSpan timeout, string because)
{
using var cts = new CancellationTokenSource(timeout);
var completed = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cts.Token));
completed.Should().BeSameAs(task, because);
await task;
}

// Regression: the changelog-scrubber lambda used a DiagnosticsCollector without calling
// StartAsync. Emitting a diagnostic and then disposing deadlocked on
// Channel.Reader.Completion because nothing was draining the channel,
// causing the lambda to hit its 180s timeout.
[Fact]
public async Task DisposeAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
{
var output = new RecordingOutput();
var collector = new DiagnosticsCollector([output]);
collector.EmitWarning("file.yaml", "test warning that nobody is reading");

await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
"DisposeAsync must not deadlock when StartAsync was never called");

collector.Warnings.Should().Be(1, "severity counters update regardless of reader state");
collector.IsStarted.Should().BeFalse();
collector.OffendingFiles.Should().BeEmpty("writes are gated when no reader will drain them");
output.Items.Should().BeEmpty();
}

[Fact]
public async Task StopAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
{
var output = new RecordingOutput();
var collector = new DiagnosticsCollector([output]);
collector.EmitError("file.yaml", "test error that nobody is reading");

await ShouldComplete(collector.StopAsync(CancellationToken.None), TimeSpan.FromSeconds(5),
"StopAsync must not deadlock when StartAsync was never called");

collector.Errors.Should().Be(1);
collector.IsStarted.Should().BeFalse();
output.Items.Should().BeEmpty();
}

[Fact]
public async Task DisposeAsync_WithoutStartAsyncAndNoEmissions_DoesNotHang()
{
var collector = new DiagnosticsCollector([]);

await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
"Instantiate-and-dispose with no emissions must be a no-op");

collector.IsStarted.Should().BeFalse();
collector.Warnings.Should().Be(0);
collector.Errors.Should().Be(0);
}

[Fact]
public async Task WaitForDrain_WithoutStartAsync_ThrowsImmediately()
{
var collector = new DiagnosticsCollector([]);
collector.EmitWarning(string.Empty, "queued");

Func<Task> act = () => ((IDiagnosticsCollector)collector).WaitForDrain();
_ = await act.Should().ThrowAsync<InvalidOperationException>();
}
}
Loading