Skip to content

Commit 4470d13

Browse files
Glue asynchronous handlers with synchronous callbacks provided by Fleck
1 parent d5f2e64 commit 4470d13

4 files changed

Lines changed: 70 additions & 38 deletions

File tree

Modules/Websockets/Handler/WebsocketConnection.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,26 @@ public sealed class WebsocketConnection : IWebSocketConnection, IWebsocketConnec
4747
#region Initialization
4848

4949
public WebsocketConnection(ISocket socket, IRequest request, List<string> supportedProtocols,
50-
Action<IWebsocketConnection>? onOpen,
51-
Action<IWebsocketConnection>? onClose,
52-
Action<IWebsocketConnection, string>? onMessage,
53-
Action<IWebsocketConnection, byte[]>? onBinary,
54-
Action<IWebsocketConnection, byte[]>? onPing,
55-
Action<IWebsocketConnection, byte[]>? onPong,
56-
Action<IWebsocketConnection, Exception>? onError)
50+
Func<IWebsocketConnection, Task>? onOpen,
51+
Func<IWebsocketConnection, Task>? onClose,
52+
Func<IWebsocketConnection, string, Task>? onMessage,
53+
Func<IWebsocketConnection, byte[], Task>? onBinary,
54+
Func<IWebsocketConnection, byte[], Task>? onPing,
55+
Func<IWebsocketConnection, byte[], Task>? onPong,
56+
Func<IWebsocketConnection, Exception, Task>? onError)
5757
{
5858
Socket = socket;
5959
Request = request;
6060

6161
SupportedProtocols = supportedProtocols;
6262

63-
OnOpen = (onOpen != null) ? () => onOpen(this) : () => { };
64-
OnClose = (onClose != null) ? () => onClose(this) : () => { };
65-
OnMessage = (onMessage != null) ? x => onMessage(this, x) : x => { };
66-
OnBinary = (onBinary != null) ? x => onBinary(this, x) : x => { };
67-
OnPing = (onPing != null) ? x => onPing(this, x) : x => SendPongAsync(x);
68-
OnPong = (onPong != null) ? x => onPong(this, x) : x => { };
69-
OnError = (onError != null) ? x => onError(this, x) : x => { };
63+
OnOpen = (onOpen != null) ? () => WebsocketDispatcher.Schedule(() => onOpen(this)) : () => { };
64+
OnClose = (onClose != null) ? () => WebsocketDispatcher.Schedule(() => onClose(this)) : () => { };
65+
OnMessage = (onMessage != null) ? x => WebsocketDispatcher.Schedule(() => onMessage(this, x)) : x => { };
66+
OnBinary = (onBinary != null) ? x => WebsocketDispatcher.Schedule(() => onBinary(this, x)) : x => { };
67+
OnPing = (onPing != null) ? x => WebsocketDispatcher.Schedule(() => onPing(this, x)) : x => WebsocketDispatcher.Schedule(() => SendPongAsync(x));
68+
OnPong = (onPong != null) ? x => WebsocketDispatcher.Schedule(() => onPong(this, x)) : x => { };
69+
OnError = (onError != null) ? x => WebsocketDispatcher.Schedule(() => onError(this, x)) : x => { };
7070
}
7171

7272
#endregion
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using Fleck;
2+
3+
namespace GenHTTP.Modules.Websockets.Handler;
4+
5+
/// <summary>
6+
/// Flecks uses synchronous callbacks for integration of user
7+
/// logic, but provides asynchronous methods for interacting
8+
/// with the websocket connection, which is counter-intuitive
9+
/// and leads to errors. GenHTTP´s integration forces asynchronous
10+
/// callbacks to be supplied by the user which requires us to dispatch
11+
/// them on the synchronous callbacks invoked by Fleck.
12+
/// </summary>
13+
public static class WebsocketDispatcher
14+
{
15+
16+
/// <summary>
17+
/// Schedules the given piece of work on a background
18+
/// thread and logs any error using the regular Fleck
19+
/// logging mechanism.
20+
/// </summary>
21+
/// <param name="work">The actual piece of work to be executed</param>
22+
public static void Schedule(Func<Task> work)
23+
{
24+
_ = Task.Run(async () =>
25+
{
26+
try
27+
{
28+
await work().ConfigureAwait(false);
29+
}
30+
catch (Exception e)
31+
{
32+
FleckLog.Error("Failed to run asynchronous event handler", e);
33+
}
34+
});
35+
}
36+
37+
}

Testing/Acceptance/Modules/Websockets/InitializerTest.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using Microsoft.VisualStudio.TestTools.UnitTesting;
21
using WS = GenHTTP.Modules.Websockets;
32

43
namespace GenHTTP.Testing.Acceptance.Modules.Websockets;
@@ -11,13 +10,13 @@ public class InitializerTest
1110
public void InitializeAll()
1211
{
1312
WS.Websocket.Create()
14-
.OnOpen(s => { })
15-
.OnClose(s => { })
16-
.OnPing((s, b) => { s.SendPongAsync(b); })
17-
.OnPong((s, b) => { })
18-
.OnMessage((s, x) => { })
19-
.OnBinary((s, x) => { })
20-
.OnError((s, x) => { })
13+
.OnOpen(s => Task.CompletedTask)
14+
.OnClose(s => Task.CompletedTask)
15+
.OnPing(async (s, b) => { await s.SendPongAsync(b); })
16+
.OnPong((s, b) => Task.CompletedTask)
17+
.OnMessage((s, x) => Task.CompletedTask)
18+
.OnBinary((s, x) => Task.CompletedTask)
19+
.OnError((s, x) => Task.CompletedTask)
2120
.Protocol("chat");
2221
}
2322

Testing/Acceptance/Modules/Websockets/IntegrationTest.cs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
using GenHTTP.Testing.Acceptance.Utilities;
2-
32
using WS = GenHTTP.Modules.Websockets;
4-
53
using Microsoft.VisualStudio.TestTools.UnitTesting;
6-
74
using Websocket.Client;
85

96
namespace GenHTTP.Testing.Acceptance.Modules.Websockets;
@@ -20,10 +17,12 @@ public async Task TestServer()
2017
var length = 0;
2118

2219
var server = WS.Websocket.Create()
23-
.OnMessage((socket, msg) =>
20+
.OnMessage(async (socket, msg) =>
2421
{
2522
length += msg.Length;
26-
socket.SendAsync(msg);
23+
24+
await socket.SendAsync(msg);
25+
2726
socket.Close();
2827
});
2928

@@ -54,23 +53,20 @@ public async Task TestDataTypes()
5453
var waitEvent = new ManualResetEvent(false);
5554

5655
var server = WS.Websocket.Create()
57-
.OnOpen((socket) =>
56+
.OnOpen(async (socket) =>
5857
{
59-
Task.Run(async () =>
60-
{
61-
await socket.SendPingAsync([42]);
62-
await socket.SendPongAsync([42]);
58+
await socket.SendPingAsync([42]);
59+
await socket.SendPongAsync([42]);
6360

64-
await socket.SendAsync([42]);
61+
await socket.SendAsync([42]);
6562

66-
Assert.IsTrue(socket.IsAvailable);
63+
Assert.IsTrue(socket.IsAvailable);
6764

68-
Assert.IsTrue(socket.Request.Headers.Count > 0);
65+
Assert.IsTrue(socket.Request.Headers.Count > 0);
6966

70-
socket.Close(42);
67+
socket.Close(42);
7168

72-
waitEvent.Set();
73-
});
69+
waitEvent.Set();
7470
});
7571

7672
await using var host = await TestHost.RunAsync(server);

0 commit comments

Comments
 (0)