Skip to content

Commit 07291eb

Browse files
committed
Merged PR 58375: [SignalR] Extra backpressure timeout
#### AI description (iteration 1) #### PR Classification This PR is a feature enhancement that improves SignalR’s message buffering by adding an extra backpressure cancellation timeout. #### PR Summary The changes strengthen SignalR's handling of backpressure conditions by introducing a 5-second timeout for message buffering operations and updating the related tests to verify cancellation behavior. - **`src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs`**: Added tests for both default cancellation and explicit cancellation during backpressure, and refined test naming and timeout assertions. - **`src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs`**: Introduced a new backpressure timeout test scenario and updated the simulation of timeout conditions. - **`src/SignalR/common/Shared/MessageBuffer.cs`**: Implemented the extra 5-second cancellation timeout using a TimeProvider and linked cancellation tokens to improve backpressure handling. <!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot -->
1 parent fdb16a3 commit 07291eb

4 files changed

Lines changed: 124 additions & 30 deletions

File tree

src/SignalR/common/Shared/MessageBuffer.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ internal sealed class MessageBuffer : IDisposable
3333

3434
#if NET8_0_OR_GREATER
3535
private readonly PeriodicTimer _timer;
36+
private readonly TimeProvider _timeProvider;
3637
#else
3738
private readonly TimerAwaitable _timer = new(AckRate, AckRate);
3839
#endif
@@ -68,8 +69,8 @@ public MessageBuffer(ConnectionContext connection, IHubProtocol protocol, long b
6869
public MessageBuffer(ConnectionContext connection, IHubProtocol protocol, long bufferLimit, ILogger logger, TimeProvider timeProvider)
6970
{
7071
#if NET8_0_OR_GREATER
71-
timeProvider ??= TimeProvider.System;
72-
_timer = new(AckRate, timeProvider);
72+
_timeProvider = timeProvider;
73+
_timer = new(AckRate, _timeProvider);
7374
#endif
7475

7576
_buffer = new LinkedBuffer();
@@ -132,14 +133,17 @@ public ValueTask<FlushResult> WriteAsync(HubMessage hubMessage, CancellationToke
132133

133134
private async ValueTask<FlushResult> WriteAsyncCore(Type hubMessageType, ReadOnlyMemory<byte> messageBytes, CancellationToken cancellationToken)
134135
{
135-
// If backpressure is being observed a cancelable token is needed to make sure we can break out of waiting when the connection is closed
136-
Debug.Assert(cancellationToken.CanBeCanceled);
137-
138136
// TODO: Add backpressure based on message count
139137
if (_bufferedByteCount > _bufferLimit)
140138
{
139+
#if NET
140+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5), _timeProvider);
141+
#else
142+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
143+
#endif
144+
using var _ = CancellationTokenUtils.CreateLinkedToken(cts.Token, cancellationToken, out var linkedToken);
141145
// primitive backpressure if buffer is full
142-
while (await _waitForAck.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
146+
while (await _waitForAck.Reader.WaitToReadAsync(linkedToken).ConfigureAwait(false))
143147
{
144148
if (_waitForAck.Reader.TryRead(out var count) && count < _bufferLimit)
145149
{

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,14 @@ private ValueTask<FlushResult> WriteCore(HubMessage message, CancellationToken c
274274
static async ValueTask<FlushResult> WriteAsync(MessageBuffer messageBuffer, HubConnectionContext hubConnectionContext,
275275
HubMessage message, CancellationToken cancellationToken)
276276
{
277-
CancellationTokenSource? cts = null;
278277
var connectionToken = hubConnectionContext.ConnectionAborted;
279278
if (message is CloseMessage)
280279
{
281280
// If it's a CloseMessage, we might already have triggered the ConnectionAborted token
282-
// We would like to successfully send the CloseMessage for graceful close which means we can't use the ConnectionAborted token,
283-
// but we need to make sure we don't get blocked by backpressure or anything, so we use a short-lived token.
284-
cts = new CancellationTokenSource(TimeSpan.FromSeconds(5), hubConnectionContext._timeProvider);
285-
connectionToken = cts.Token;
281+
// We would like to successfully send the CloseMessage for graceful close which means we can't use the ConnectionAborted token.
282+
connectionToken = CancellationToken.None;
286283
}
287284

288-
using var __ = cts;
289-
290285
// MessageBuffer can wait on things other than the PipeWriter (which is canceled by other means)
291286
// So we need to make sure the cancellation token passed to it is also canceled when the connection is aborted
292287
using var _ = CancellationTokenUtils.CreateLinkedToken(connectionToken, cancellationToken, out var linkedToken);

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5272,11 +5272,13 @@ public enum CloseScenario
52725272
{
52735273
PingTimeout,
52745274
Abort,
5275+
BackpressureTimeout,
52755276
}
52765277

52775278
[Theory]
52785279
[InlineData(CloseScenario.PingTimeout)]
52795280
[InlineData(CloseScenario.Abort)]
5281+
[InlineData(CloseScenario.BackpressureTimeout)]
52805282
public async Task StatefulReconnectWithMessageBufferBackpressureIsCancelable(CloseScenario scenario)
52815283
{
52825284
using (StartVerifiableLog(write => write.EventId.Name == "FailedWritingMessage"))
@@ -5307,29 +5309,34 @@ public async Task StatefulReconnectWithMessageBufferBackpressureIsCancelable(Clo
53075309

53085310
await client2.SendHubMessageAsync(new InvocationMessage(nameof(MethodHub.BroadcastMethod), [new string('a', 100)]));
53095311

5312+
Assert.IsType<InvocationMessage>(await client2.ReadAsync().DefaultTimeout());
5313+
5314+
await client2.SendHubMessageAsync(new InvocationMessage(nameof(MethodHub.BroadcastMethod), [new string('a', 100)]));
5315+
53105316
switch (scenario)
53115317
{
53125318
case CloseScenario.PingTimeout:
5313-
{
5314-
// We go over the 100 ms timeout interval multiple times
5315-
for (var i = 0; i < 3; i++)
53165319
{
5317-
timeProvider.Advance(timeout + TimeSpan.FromMilliseconds(1));
5318-
client1.TickHeartbeat();
5320+
// We go over the 100 ms timeout interval multiple times
5321+
for (var i = 0; i < 3; i++)
5322+
{
5323+
timeProvider.Advance(timeout + TimeSpan.FromMilliseconds(1));
5324+
client1.TickHeartbeat();
5325+
}
5326+
break;
53195327
}
5320-
break;
5321-
}
53225328
case CloseScenario.Abort:
5323-
{
5324-
client1.Connection.Abort();
5325-
break;
5326-
}
5329+
{
5330+
client1.Connection.Abort();
5331+
break;
5332+
}
5333+
case CloseScenario.BackpressureTimeout:
5334+
{
5335+
timeProvider.Advance(TimeSpan.FromSeconds(5) + TimeSpan.FromMilliseconds(1));
5336+
break;
5337+
}
53275338
}
53285339

5329-
Assert.IsType<InvocationMessage>(await client2.ReadAsync().DefaultTimeout());
5330-
5331-
await client2.SendHubMessageAsync(new InvocationMessage(nameof(MethodHub.BroadcastMethod), [new string('a', 100)]));
5332-
53335340
// This one might not be blocked on client1 if the server sends to client2 first during Broadcast
53345341
Assert.IsType<InvocationMessage>(await client2.ReadAsync().DefaultTimeout());
53355342

src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ public async Task SendingAckMessageDelayedDuringResend()
563563
}
564564

565565
[Fact]
566-
public async Task BackpressureWriteMessageCanBeCanceled()
566+
public async Task PipeBackpressureWriteMessageCanBeCanceled()
567567
{
568568
var cts = new CancellationTokenSource();
569569
var protocol = new JsonHubProtocol();
@@ -581,7 +581,7 @@ public async Task BackpressureWriteMessageCanBeCanceled()
581581

582582
cts.Cancel();
583583

584-
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await writeTask);
584+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await writeTask).DefaultTimeout();
585585

586586
DuplexPipe.UpdateConnectionPair(ref pipes, connection, pipeOptions);
587587
var resendTask = messageBuffer.ResendAsync(pipes.Transport.Output);
@@ -607,6 +607,94 @@ public async Task BackpressureWriteMessageCanBeCanceled()
607607

608608
await resendTask;
609609
}
610+
611+
[Fact]
612+
public async Task BufferedBackpressureWriteMessageDefaultCancellation()
613+
{
614+
var cts = new CancellationTokenSource();
615+
var protocol = new JsonHubProtocol();
616+
var connection = new TestConnectionContext();
617+
var pipeOptions = new PipeOptions(pauseWriterThreshold: 100, resumeWriterThreshold: 50);
618+
var pipes = DuplexPipe.CreateConnectionPair(new PipeOptions(), pipeOptions);
619+
connection.Transport = pipes.Transport;
620+
var timeProvider = new FakeTimeProvider();
621+
using var messageBuffer = new MessageBuffer(connection, protocol, bufferLimit: 50, NullLogger.Instance, timeProvider);
622+
623+
await messageBuffer.WriteAsync(new SerializedHubMessage(new InvocationMessage("t", new object[] { new byte[40] })), cts.Token);
624+
625+
// Write will hit pipe backpressure
626+
var writeTask = messageBuffer.WriteAsync(new SerializedHubMessage(new InvocationMessage("t", new object[] { new byte[40] })), cts.Token);
627+
Assert.False(writeTask.IsCompleted);
628+
629+
timeProvider.Advance(TimeSpan.FromSeconds(5) + TimeSpan.FromMilliseconds(1));
630+
631+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await writeTask).DefaultTimeout();
632+
633+
DuplexPipe.UpdateConnectionPair(ref pipes, connection, pipeOptions);
634+
var resendTask = messageBuffer.ResendAsync(pipes.Transport.Output);
635+
636+
var res = await pipes.Application.Input.ReadAsync();
637+
var buffer = res.Buffer;
638+
Assert.True(protocol.TryParseMessage(ref buffer, new TestBinder(), out var message));
639+
Assert.IsType<SequenceMessage>(message);
640+
641+
pipes.Application.Input.AdvanceTo(buffer.Start);
642+
643+
res = await pipes.Application.Input.ReadAsync();
644+
buffer = res.Buffer;
645+
Assert.True(protocol.TryParseMessage(ref buffer, new TestBinder(), out message));
646+
Assert.IsType<InvocationMessage>(message);
647+
648+
pipes.Application.Input.AdvanceTo(buffer.Start);
649+
650+
Assert.False(pipes.Application.Input.TryRead(out res));
651+
652+
await resendTask;
653+
}
654+
655+
[Fact]
656+
public async Task BufferedBackpressureWriteMessageCanBeCanceled()
657+
{
658+
var cts = new CancellationTokenSource();
659+
var protocol = new JsonHubProtocol();
660+
var connection = new TestConnectionContext();
661+
var pipeOptions = new PipeOptions(pauseWriterThreshold: 100, resumeWriterThreshold: 50);
662+
var pipes = DuplexPipe.CreateConnectionPair(new PipeOptions(), pipeOptions);
663+
connection.Transport = pipes.Transport;
664+
var timeProvider = new FakeTimeProvider();
665+
using var messageBuffer = new MessageBuffer(connection, protocol, bufferLimit: 50, NullLogger.Instance, timeProvider);
666+
667+
await messageBuffer.WriteAsync(new SerializedHubMessage(new InvocationMessage("t", new object[] { new byte[40] })), cts.Token);
668+
669+
// Write will hit pipe backpressure
670+
var writeTask = messageBuffer.WriteAsync(new SerializedHubMessage(new InvocationMessage("t", new object[] { new byte[40] })), cts.Token);
671+
Assert.False(writeTask.IsCompleted);
672+
673+
cts.Cancel();
674+
675+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await writeTask).DefaultTimeout();
676+
677+
DuplexPipe.UpdateConnectionPair(ref pipes, connection, pipeOptions);
678+
var resendTask = messageBuffer.ResendAsync(pipes.Transport.Output);
679+
680+
var res = await pipes.Application.Input.ReadAsync();
681+
var buffer = res.Buffer;
682+
Assert.True(protocol.TryParseMessage(ref buffer, new TestBinder(), out var message));
683+
Assert.IsType<SequenceMessage>(message);
684+
685+
pipes.Application.Input.AdvanceTo(buffer.Start);
686+
687+
res = await pipes.Application.Input.ReadAsync();
688+
buffer = res.Buffer;
689+
Assert.True(protocol.TryParseMessage(ref buffer, new TestBinder(), out message));
690+
Assert.IsType<InvocationMessage>(message);
691+
692+
pipes.Application.Input.AdvanceTo(buffer.Start);
693+
694+
Assert.False(pipes.Application.Input.TryRead(out res));
695+
696+
await resendTask;
697+
}
610698
}
611699

612700
internal sealed class TestConnectionContext : ConnectionContext

0 commit comments

Comments
 (0)