Skip to content

Commit b8bb39f

Browse files
Consistently use async in the web socket connection API (#686)
1 parent 58c2df2 commit b8bb39f

8 files changed

Lines changed: 133 additions & 87 deletions

File tree

Modules/Websockets/Handler/WebsocketConnection.cs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,51 +47,47 @@ public sealed class WebsocketConnection : IWebSocketConnection, IWebsocketConnec
4747
#region Initialization
4848

4949
public WebsocketConnection(ISocket socket, IRequest request, List<string> supportedProtocols,
50-
Action<IWebsocketConnection>? onOpen,
51-
Action<IWebsocketConnection>? onClose,
52-
Action<IWebsocketConnection, string>? onMessage,
53-
Action<IWebsocketConnection, byte[]>? onBinary,
54-
Action<IWebsocketConnection, byte[]>? onPing,
55-
Action<IWebsocketConnection, byte[]>? onPong,
56-
Action<IWebsocketConnection, Exception>? onError)
50+
Func<IWebsocketConnection, Task>? onOpen,
51+
Func<IWebsocketConnection, Task>? onClose,
52+
Func<IWebsocketConnection, string, Task>? onMessage,
53+
Func<IWebsocketConnection, byte[], Task>? onBinary,
54+
Func<IWebsocketConnection, byte[], Task>? onPing,
55+
Func<IWebsocketConnection, byte[], Task>? onPong,
56+
Func<IWebsocketConnection, Exception, Task>? onError)
5757
{
5858
Socket = socket;
5959
Request = request;
6060

6161
SupportedProtocols = supportedProtocols;
6262

63-
OnOpen = (onOpen != null) ? () => onOpen(this) : () => { };
64-
OnClose = (onClose != null) ? () => onClose(this) : () => { };
65-
OnMessage = (onMessage != null) ? x => onMessage(this, x) : x => { };
66-
OnBinary = (onBinary != null) ? x => onBinary(this, x) : x => { };
67-
OnPing = (onPing != null) ? x => onPing(this, x) : x => SendPong(x);
68-
OnPong = (onPong != null) ? x => onPong(this, x) : x => { };
69-
OnError = (onError != null) ? x => onError(this, x) : x => { };
63+
OnOpen = (onOpen != null) ? () => WebsocketDispatcher.Schedule(() => onOpen(this)) : () => { };
64+
OnClose = (onClose != null) ? () => WebsocketDispatcher.Schedule(() => onClose(this)) : () => { };
65+
OnMessage = (onMessage != null) ? x => WebsocketDispatcher.Schedule(() => onMessage(this, x)) : x => { };
66+
OnBinary = (onBinary != null) ? x => WebsocketDispatcher.Schedule(() => onBinary(this, x)) : x => { };
67+
OnPing = (onPing != null) ? x => WebsocketDispatcher.Schedule(() => onPing(this, x)) : x => WebsocketDispatcher.Schedule(() => SendPongAsync(x));
68+
OnPong = (onPong != null) ? x => WebsocketDispatcher.Schedule(() => onPong(this, x)) : x => { };
69+
OnError = (onError != null) ? x => WebsocketDispatcher.Schedule(() => onError(this, x)) : x => { };
7070
}
7171

7272
#endregion
7373

7474
#region Functionality
7575

76-
public Task Send(string message)
77-
{
78-
return Send(message, GetHandler().FrameText);
79-
}
76+
public Task Send(string message) => SendAsync(message);
8077

81-
public Task Send(byte[] message)
82-
{
83-
return Send(message, GetHandler().FrameBinary);
84-
}
78+
public Task Send(byte[] message) => SendAsync(message);
8579

86-
public Task SendPing(byte[] message)
87-
{
88-
return Send(message, GetHandler().FramePing);
89-
}
80+
public Task SendPing(byte[] message) => SendPingAsync(message);
9081

91-
public Task SendPong(byte[] message)
92-
{
93-
return Send(message, GetHandler().FramePong);
94-
}
82+
public Task SendPong(byte[] message) => SendPongAsync(message);
83+
84+
public Task SendAsync(string message) => Send(message, GetHandler().FrameText);
85+
86+
public Task SendAsync(byte[] message) => Send(message, GetHandler().FrameBinary);
87+
88+
public Task SendPingAsync(byte[] message) => Send(message, GetHandler().FramePing);
89+
90+
public Task SendPongAsync(byte[] message) => Send(message, GetHandler().FramePong);
9591

9692
private Task Send<T>(T message, Func<T, byte[]> createFrame)
9793
{
@@ -109,6 +105,7 @@ private Task Send<T>(T message, Func<T, byte[]> createFrame)
109105
}
110106

111107
var bytes = createFrame(message);
108+
112109
return SendBytes(bytes);
113110
}
114111

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using Fleck;
2+
3+
namespace GenHTTP.Modules.Websockets.Handler;
4+
5+
/// <summary>
6+
/// Flecks uses synchronous callbacks for integration of user
7+
/// logic, but provides asynchronous methods for interacting
8+
/// with the websocket connection, which is counter-intuitive
9+
/// and leads to errors. GenHTTP´s integration forces asynchronous
10+
/// callbacks to be supplied by the user which requires us to dispatch
11+
/// them on the synchronous callbacks invoked by Fleck.
12+
/// </summary>
13+
public static class WebsocketDispatcher
14+
{
15+
16+
/// <summary>
17+
/// Schedules the given piece of work on a background
18+
/// thread and logs any error using the regular Fleck
19+
/// logging mechanism.
20+
/// </summary>
21+
/// <param name="work">The actual piece of work to be executed</param>
22+
public static void Schedule(Func<Task> work)
23+
{
24+
_ = Task.Run(async () =>
25+
{
26+
try
27+
{
28+
await work().ConfigureAwait(false);
29+
}
30+
catch (Exception e)
31+
{
32+
FleckLog.Error("Failed to run asynchronous event handler.", e);
33+
}
34+
});
35+
}
36+
37+
}

Modules/Websockets/Handler/WebsocketHandler.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@ public sealed class WebsocketHandler : IHandler
1010

1111
#region Get-/Setters
1212

13-
public Action<IWebsocketConnection>? OnOpen { get; }
13+
public Func<IWebsocketConnection, Task>? OnOpen { get; }
1414

15-
public Action<IWebsocketConnection>? OnClose { get; }
15+
public Func<IWebsocketConnection, Task>? OnClose { get; }
1616

17-
public Action<IWebsocketConnection, string>? OnMessage { get; }
17+
public Func<IWebsocketConnection, string, Task>? OnMessage { get; }
1818

19-
public Action<IWebsocketConnection, byte[]>? OnBinary { get; }
19+
public Func<IWebsocketConnection, byte[], Task>? OnBinary { get; }
2020

21-
public Action<IWebsocketConnection, byte[]>? OnPing { get; }
21+
public Func<IWebsocketConnection, byte[], Task>? OnPing { get; }
2222

23-
public Action<IWebsocketConnection, byte[]>? OnPong { get; }
23+
public Func<IWebsocketConnection, byte[], Task>? OnPong { get; }
2424

25-
public Action<IWebsocketConnection, Exception>? OnError { get; }
25+
public Func<IWebsocketConnection, Exception, Task>? OnError { get; }
2626

2727
public List<string> SupportedProtocols { get; }
2828

@@ -31,13 +31,13 @@ public sealed class WebsocketHandler : IHandler
3131
#region Initialization
3232

3333
public WebsocketHandler(List<string> supportedProtocols,
34-
Action<IWebsocketConnection>? onOpen,
35-
Action<IWebsocketConnection>? onClose,
36-
Action<IWebsocketConnection, string>? onMessage,
37-
Action<IWebsocketConnection, byte[]>? onBinary,
38-
Action<IWebsocketConnection, byte[]>? onPing,
39-
Action<IWebsocketConnection, byte[]>? onPong,
40-
Action<IWebsocketConnection, Exception>? onError)
34+
Func<IWebsocketConnection, Task>? onOpen,
35+
Func<IWebsocketConnection, Task>? onClose,
36+
Func<IWebsocketConnection, string, Task>? onMessage,
37+
Func<IWebsocketConnection, byte[], Task>? onBinary,
38+
Func<IWebsocketConnection, byte[], Task>? onPing,
39+
Func<IWebsocketConnection, byte[], Task>? onPong,
40+
Func<IWebsocketConnection, Exception, Task>? onError)
4141
{
4242
SupportedProtocols = supportedProtocols;
4343

Modules/Websockets/Handler/WebsocketHandlerBuilder.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ public class WebsocketHandlerBuilder : IHandlerBuilder<WebsocketHandlerBuilder>
88

99
private readonly List<string> _SupportedProtocols = [];
1010

11-
private Action<IWebsocketConnection>? _OnOpen;
12-
private Action<IWebsocketConnection>? _OnClose;
13-
private Action<IWebsocketConnection, string>? _OnMessage;
14-
private Action<IWebsocketConnection, byte[]>? _OnBinary;
15-
private Action<IWebsocketConnection, byte[]>? _OnPing;
16-
private Action<IWebsocketConnection, byte[]>? _OnPong;
17-
private Action<IWebsocketConnection, Exception>? _OnError;
11+
private Func<IWebsocketConnection, Task>? _OnOpen;
12+
private Func<IWebsocketConnection, Task>? _OnClose;
13+
private Func<IWebsocketConnection, string, Task>? _OnMessage;
14+
private Func<IWebsocketConnection, byte[], Task>? _OnBinary;
15+
private Func<IWebsocketConnection, byte[], Task>? _OnPing;
16+
private Func<IWebsocketConnection, byte[], Task>? _OnPong;
17+
private Func<IWebsocketConnection, Exception, Task>? _OnError;
1818

1919
#region Functionality
2020

@@ -38,7 +38,7 @@ public WebsocketHandlerBuilder Protocol(string supportedProtocol)
3838
/// Will be executed if a new websocket client connected.
3939
/// </summary>
4040
/// <param name="handler">The method to be executed</param>
41-
public WebsocketHandlerBuilder OnOpen(Action<IWebsocketConnection> handler)
41+
public WebsocketHandlerBuilder OnOpen(Func<IWebsocketConnection, Task> handler)
4242
{
4343
_OnOpen = handler;
4444
return this;
@@ -48,7 +48,7 @@ public WebsocketHandlerBuilder OnOpen(Action<IWebsocketConnection> handler)
4848
/// Will be executed if a websocket client disconnects.
4949
/// </summary>
5050
/// <param name="handler">The method to be executed</param>
51-
public WebsocketHandlerBuilder OnClose(Action<IWebsocketConnection> handler)
51+
public WebsocketHandlerBuilder OnClose(Func<IWebsocketConnection, Task> handler)
5252
{
5353
_OnClose = handler;
5454
return this;
@@ -58,7 +58,7 @@ public WebsocketHandlerBuilder OnClose(Action<IWebsocketConnection> handler)
5858
/// Will be executed if a string message has been received from the client.
5959
/// </summary>
6060
/// <param name="handler">The method to be executed</param>
61-
public WebsocketHandlerBuilder OnMessage(Action<IWebsocketConnection, string> handler)
61+
public WebsocketHandlerBuilder OnMessage(Func<IWebsocketConnection, string, Task> handler)
6262
{
6363
_OnMessage = handler;
6464
return this;
@@ -68,7 +68,7 @@ public WebsocketHandlerBuilder OnMessage(Action<IWebsocketConnection, string> ha
6868
/// Will be executed if a binary message has been received from the client.
6969
/// </summary>
7070
/// <param name="handler">The method to be executed</param>
71-
public WebsocketHandlerBuilder OnBinary(Action<IWebsocketConnection, byte[]> handler)
71+
public WebsocketHandlerBuilder OnBinary(Func<IWebsocketConnection, byte[], Task> handler)
7272
{
7373
_OnBinary = handler;
7474
return this;
@@ -78,7 +78,7 @@ public WebsocketHandlerBuilder OnBinary(Action<IWebsocketConnection, byte[]> han
7878
/// Will be executed if the client sends a ping request.
7979
/// </summary>
8080
/// <param name="handler">The method to be executed</param>
81-
public WebsocketHandlerBuilder OnPing(Action<IWebsocketConnection, byte[]> handler)
81+
public WebsocketHandlerBuilder OnPing(Func<IWebsocketConnection, byte[], Task> handler)
8282
{
8383
_OnPing = handler;
8484
return this;
@@ -88,7 +88,7 @@ public WebsocketHandlerBuilder OnPing(Action<IWebsocketConnection, byte[]> handl
8888
/// Will be executed if the client sends a pong request.
8989
/// </summary>
9090
/// <param name="handler">The method to be executed</param>
91-
public WebsocketHandlerBuilder OnPong(Action<IWebsocketConnection, byte[]> handler)
91+
public WebsocketHandlerBuilder OnPong(Func<IWebsocketConnection, byte[], Task> handler)
9292
{
9393
_OnPong = handler;
9494
return this;
@@ -98,7 +98,7 @@ public WebsocketHandlerBuilder OnPong(Action<IWebsocketConnection, byte[]> handl
9898
/// Will be executed if there is some client connection error.
9999
/// </summary>
100100
/// <param name="handler">The method to be executed</param>
101-
public WebsocketHandlerBuilder OnError(Action<IWebsocketConnection, Exception> handler)
101+
public WebsocketHandlerBuilder OnError(Func<IWebsocketConnection, Exception, Task> handler)
102102
{
103103
_OnError = handler;
104104
return this;

Modules/Websockets/IWebsocketConnection.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,25 @@ public interface IWebsocketConnection
2222
/// Sends a text message to the connected client.
2323
/// </summary>
2424
/// <param name="message">The message to be sent</param>
25-
Task Send(string message);
25+
Task SendAsync(string message);
2626

2727
/// <summary>
2828
/// Sends a binary message to the connected client.
2929
/// </summary>
3030
/// <param name="message">The message to be sent</param>
31-
Task Send(byte[] message);
31+
Task SendAsync(byte[] message);
3232

3333
/// <summary>
3434
/// Sends a ping message to the connected client.
3535
/// </summary>
3636
/// <param name="message">The message to be sent</param>
37-
Task SendPing(byte[] message);
37+
Task SendPingAsync(byte[] message);
3838

3939
/// <summary>
4040
/// Sends a pong message to the connected client.
4141
/// </summary>
4242
/// <param name="message">The message to be sent</param>
43-
Task SendPong(byte[] message);
43+
Task SendPongAsync(byte[] message);
4444

4545
/// <summary>
4646
/// Gracefully closes the connection to the client.
@@ -52,7 +52,7 @@ public interface IWebsocketConnection
5252
/// </summary>
5353
/// <param name="code">The code to be sent to the connected client</param>
5454
/// <remarks>
55-
/// See Flex.WebSocketStatusCodes.
55+
/// See <see cref="Fleck.WebSocketStatusCodes" />.
5656
/// </remarks>
5757
void Close(int code);
5858

Testing/Acceptance/Modules/Websockets/ErrorHandlingTests.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Net;
2-
using Microsoft.VisualStudio.TestTools.UnitTesting;
2+
3+
using Websocket.Client;
34

45
using WS = GenHTTP.Modules.Websockets.Websocket;
56

@@ -19,4 +20,19 @@ public async Task TestInvalidRequest()
1920
Assert.AreEqual(HttpStatusCode.BadRequest, response.StatusCode);
2021
}
2122

23+
[TestMethod]
24+
public async Task TestErrorHandling()
25+
{
26+
var server = WS.Create()
27+
.OnOpen(_ => throw new InvalidOperationException("Ooops"));
28+
29+
await using var host = await TestHost.RunAsync(server);
30+
31+
using var client = new WebsocketClient(new Uri("ws://localhost:" + host.Port));
32+
33+
await client.Start();
34+
35+
await Task.Delay(1000);
36+
}
37+
2238
}

Testing/Acceptance/Modules/Websockets/InitializerTest.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using Microsoft.VisualStudio.TestTools.UnitTesting;
21
using WS = GenHTTP.Modules.Websockets;
32

43
namespace GenHTTP.Testing.Acceptance.Modules.Websockets;
@@ -11,13 +10,13 @@ public class InitializerTest
1110
public void InitializeAll()
1211
{
1312
WS.Websocket.Create()
14-
.OnOpen(s => { })
15-
.OnClose(s => { })
16-
.OnPing((s, b) => { s.SendPong(b); })
17-
.OnPong((s, b) => { })
18-
.OnMessage((s, x) => { })
19-
.OnBinary((s, x) => { })
20-
.OnError((s, x) => { })
13+
.OnOpen(s => Task.CompletedTask)
14+
.OnClose(s => Task.CompletedTask)
15+
.OnPing(async (s, b) => { await s.SendPongAsync(b); })
16+
.OnPong((s, b) => Task.CompletedTask)
17+
.OnMessage((s, x) => Task.CompletedTask)
18+
.OnBinary((s, x) => Task.CompletedTask)
19+
.OnError((s, x) => Task.CompletedTask)
2120
.Protocol("chat");
2221
}
2322

0 commit comments

Comments
 (0)