Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions Modules/Websockets/Handler/WebsocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,51 +47,47 @@ public sealed class WebsocketConnection : IWebSocketConnection, IWebsocketConnec
#region Initialization

public WebsocketConnection(ISocket socket, IRequest request, List<string> supportedProtocols,
Action<IWebsocketConnection>? onOpen,
Action<IWebsocketConnection>? onClose,
Action<IWebsocketConnection, string>? onMessage,
Action<IWebsocketConnection, byte[]>? onBinary,
Action<IWebsocketConnection, byte[]>? onPing,
Action<IWebsocketConnection, byte[]>? onPong,
Action<IWebsocketConnection, Exception>? onError)
Func<IWebsocketConnection, Task>? onOpen,
Func<IWebsocketConnection, Task>? onClose,
Func<IWebsocketConnection, string, Task>? onMessage,
Func<IWebsocketConnection, byte[], Task>? onBinary,
Func<IWebsocketConnection, byte[], Task>? onPing,
Func<IWebsocketConnection, byte[], Task>? onPong,
Func<IWebsocketConnection, Exception, Task>? onError)
{
Socket = socket;
Request = request;

SupportedProtocols = supportedProtocols;

OnOpen = (onOpen != null) ? () => onOpen(this) : () => { };
OnClose = (onClose != null) ? () => onClose(this) : () => { };
OnMessage = (onMessage != null) ? x => onMessage(this, x) : x => { };
OnBinary = (onBinary != null) ? x => onBinary(this, x) : x => { };
OnPing = (onPing != null) ? x => onPing(this, x) : x => SendPong(x);
OnPong = (onPong != null) ? x => onPong(this, x) : x => { };
OnError = (onError != null) ? x => onError(this, x) : x => { };
OnOpen = (onOpen != null) ? () => WebsocketDispatcher.Schedule(() => onOpen(this)) : () => { };
OnClose = (onClose != null) ? () => WebsocketDispatcher.Schedule(() => onClose(this)) : () => { };
OnMessage = (onMessage != null) ? x => WebsocketDispatcher.Schedule(() => onMessage(this, x)) : x => { };
OnBinary = (onBinary != null) ? x => WebsocketDispatcher.Schedule(() => onBinary(this, x)) : x => { };
OnPing = (onPing != null) ? x => WebsocketDispatcher.Schedule(() => onPing(this, x)) : x => WebsocketDispatcher.Schedule(() => SendPongAsync(x));
OnPong = (onPong != null) ? x => WebsocketDispatcher.Schedule(() => onPong(this, x)) : x => { };
OnError = (onError != null) ? x => WebsocketDispatcher.Schedule(() => onError(this, x)) : x => { };
}

#endregion

#region Functionality

public Task Send(string message)
{
return Send(message, GetHandler().FrameText);
}
public Task Send(string message) => SendAsync(message);

public Task Send(byte[] message)
{
return Send(message, GetHandler().FrameBinary);
}
public Task Send(byte[] message) => SendAsync(message);

public Task SendPing(byte[] message)
{
return Send(message, GetHandler().FramePing);
}
public Task SendPing(byte[] message) => SendPingAsync(message);

public Task SendPong(byte[] message)
{
return Send(message, GetHandler().FramePong);
}
public Task SendPong(byte[] message) => SendPongAsync(message);

public Task SendAsync(string message) => Send(message, GetHandler().FrameText);

public Task SendAsync(byte[] message) => Send(message, GetHandler().FrameBinary);

public Task SendPingAsync(byte[] message) => Send(message, GetHandler().FramePing);

public Task SendPongAsync(byte[] message) => Send(message, GetHandler().FramePong);

private Task Send<T>(T message, Func<T, byte[]> createFrame)
{
Expand All @@ -109,6 +105,7 @@ private Task Send<T>(T message, Func<T, byte[]> createFrame)
}

var bytes = createFrame(message);

return SendBytes(bytes);
}

Expand Down
37 changes: 37 additions & 0 deletions Modules/Websockets/Handler/WebsocketDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Fleck;

namespace GenHTTP.Modules.Websockets.Handler;

/// <summary>
/// Flecks uses synchronous callbacks for integration of user
/// logic, but provides asynchronous methods for interacting
/// with the websocket connection, which is counter-intuitive
/// and leads to errors. GenHTTP´s integration forces asynchronous
/// callbacks to be supplied by the user which requires us to dispatch
/// them on the synchronous callbacks invoked by Fleck.
/// </summary>
public static class WebsocketDispatcher
{

/// <summary>
/// Schedules the given piece of work on a background
/// thread and logs any error using the regular Fleck
/// logging mechanism.
/// </summary>
/// <param name="work">The actual piece of work to be executed</param>
public static void Schedule(Func<Task> work)
{
_ = Task.Run(async () =>
{
try
{
await work().ConfigureAwait(false);
}
catch (Exception e)
{
FleckLog.Error("Failed to run asynchronous event handler.", e);
}
});
}

}
28 changes: 14 additions & 14 deletions Modules/Websockets/Handler/WebsocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ public sealed class WebsocketHandler : IHandler

#region Get-/Setters

public Action<IWebsocketConnection>? OnOpen { get; }
public Func<IWebsocketConnection, Task>? OnOpen { get; }

public Action<IWebsocketConnection>? OnClose { get; }
public Func<IWebsocketConnection, Task>? OnClose { get; }

public Action<IWebsocketConnection, string>? OnMessage { get; }
public Func<IWebsocketConnection, string, Task>? OnMessage { get; }

public Action<IWebsocketConnection, byte[]>? OnBinary { get; }
public Func<IWebsocketConnection, byte[], Task>? OnBinary { get; }

public Action<IWebsocketConnection, byte[]>? OnPing { get; }
public Func<IWebsocketConnection, byte[], Task>? OnPing { get; }

public Action<IWebsocketConnection, byte[]>? OnPong { get; }
public Func<IWebsocketConnection, byte[], Task>? OnPong { get; }

public Action<IWebsocketConnection, Exception>? OnError { get; }
public Func<IWebsocketConnection, Exception, Task>? OnError { get; }

public List<string> SupportedProtocols { get; }

Expand All @@ -31,13 +31,13 @@ public sealed class WebsocketHandler : IHandler
#region Initialization

public WebsocketHandler(List<string> supportedProtocols,
Action<IWebsocketConnection>? onOpen,
Action<IWebsocketConnection>? onClose,
Action<IWebsocketConnection, string>? onMessage,
Action<IWebsocketConnection, byte[]>? onBinary,
Action<IWebsocketConnection, byte[]>? onPing,
Action<IWebsocketConnection, byte[]>? onPong,
Action<IWebsocketConnection, Exception>? onError)
Func<IWebsocketConnection, Task>? onOpen,
Func<IWebsocketConnection, Task>? onClose,
Func<IWebsocketConnection, string, Task>? onMessage,
Func<IWebsocketConnection, byte[], Task>? onBinary,
Func<IWebsocketConnection, byte[], Task>? onPing,
Func<IWebsocketConnection, byte[], Task>? onPong,
Func<IWebsocketConnection, Exception, Task>? onError)
{
SupportedProtocols = supportedProtocols;

Expand Down
28 changes: 14 additions & 14 deletions Modules/Websockets/Handler/WebsocketHandlerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ public class WebsocketHandlerBuilder : IHandlerBuilder<WebsocketHandlerBuilder>

private readonly List<string> _SupportedProtocols = [];

private Action<IWebsocketConnection>? _OnOpen;
private Action<IWebsocketConnection>? _OnClose;
private Action<IWebsocketConnection, string>? _OnMessage;
private Action<IWebsocketConnection, byte[]>? _OnBinary;
private Action<IWebsocketConnection, byte[]>? _OnPing;
private Action<IWebsocketConnection, byte[]>? _OnPong;
private Action<IWebsocketConnection, Exception>? _OnError;
private Func<IWebsocketConnection, Task>? _OnOpen;
private Func<IWebsocketConnection, Task>? _OnClose;
private Func<IWebsocketConnection, string, Task>? _OnMessage;
private Func<IWebsocketConnection, byte[], Task>? _OnBinary;
private Func<IWebsocketConnection, byte[], Task>? _OnPing;
private Func<IWebsocketConnection, byte[], Task>? _OnPong;
private Func<IWebsocketConnection, Exception, Task>? _OnError;

#region Functionality

Expand All @@ -38,7 +38,7 @@ public WebsocketHandlerBuilder Protocol(string supportedProtocol)
/// Will be executed if a new websocket client connected.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnOpen(Action<IWebsocketConnection> handler)
public WebsocketHandlerBuilder OnOpen(Func<IWebsocketConnection, Task> handler)
{
_OnOpen = handler;
return this;
Expand All @@ -48,7 +48,7 @@ public WebsocketHandlerBuilder OnOpen(Action<IWebsocketConnection> handler)
/// Will be executed if a websocket client disconnects.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnClose(Action<IWebsocketConnection> handler)
public WebsocketHandlerBuilder OnClose(Func<IWebsocketConnection, Task> handler)
{
_OnClose = handler;
return this;
Expand All @@ -58,7 +58,7 @@ public WebsocketHandlerBuilder OnClose(Action<IWebsocketConnection> handler)
/// Will be executed if a string message has been received from the client.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnMessage(Action<IWebsocketConnection, string> handler)
public WebsocketHandlerBuilder OnMessage(Func<IWebsocketConnection, string, Task> handler)
{
_OnMessage = handler;
return this;
Expand All @@ -68,7 +68,7 @@ public WebsocketHandlerBuilder OnMessage(Action<IWebsocketConnection, string> ha
/// Will be executed if a binary message has been received from the client.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnBinary(Action<IWebsocketConnection, byte[]> handler)
public WebsocketHandlerBuilder OnBinary(Func<IWebsocketConnection, byte[], Task> handler)
{
_OnBinary = handler;
return this;
Expand All @@ -78,7 +78,7 @@ public WebsocketHandlerBuilder OnBinary(Action<IWebsocketConnection, byte[]> han
/// Will be executed if the client sends a ping request.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnPing(Action<IWebsocketConnection, byte[]> handler)
public WebsocketHandlerBuilder OnPing(Func<IWebsocketConnection, byte[], Task> handler)
{
_OnPing = handler;
return this;
Expand All @@ -88,7 +88,7 @@ public WebsocketHandlerBuilder OnPing(Action<IWebsocketConnection, byte[]> handl
/// Will be executed if the client sends a pong request.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnPong(Action<IWebsocketConnection, byte[]> handler)
public WebsocketHandlerBuilder OnPong(Func<IWebsocketConnection, byte[], Task> handler)
{
_OnPong = handler;
return this;
Expand All @@ -98,7 +98,7 @@ public WebsocketHandlerBuilder OnPong(Action<IWebsocketConnection, byte[]> handl
/// Will be executed if there is some client connection error.
/// </summary>
/// <param name="handler">The method to be executed</param>
public WebsocketHandlerBuilder OnError(Action<IWebsocketConnection, Exception> handler)
public WebsocketHandlerBuilder OnError(Func<IWebsocketConnection, Exception, Task> handler)
{
_OnError = handler;
return this;
Expand Down
10 changes: 5 additions & 5 deletions Modules/Websockets/IWebsocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ public interface IWebsocketConnection
/// Sends a text message to the connected client.
/// </summary>
/// <param name="message">The message to be sent</param>
Task Send(string message);
Task SendAsync(string message);

/// <summary>
/// Sends a binary message to the connected client.
/// </summary>
/// <param name="message">The message to be sent</param>
Task Send(byte[] message);
Task SendAsync(byte[] message);

/// <summary>
/// Sends a ping message to the connected client.
/// </summary>
/// <param name="message">The message to be sent</param>
Task SendPing(byte[] message);
Task SendPingAsync(byte[] message);

/// <summary>
/// Sends a pong message to the connected client.
/// </summary>
/// <param name="message">The message to be sent</param>
Task SendPong(byte[] message);
Task SendPongAsync(byte[] message);

/// <summary>
/// Gracefully closes the connection to the client.
Expand All @@ -52,7 +52,7 @@ public interface IWebsocketConnection
/// </summary>
/// <param name="code">The code to be sent to the connected client</param>
/// <remarks>
/// See Flex.WebSocketStatusCodes.
/// See <see cref="Fleck.WebSocketStatusCodes" />.
/// </remarks>
void Close(int code);

Expand Down
18 changes: 17 additions & 1 deletion Testing/Acceptance/Modules/Websockets/ErrorHandlingTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Net;
using Microsoft.VisualStudio.TestTools.UnitTesting;

using Websocket.Client;

using WS = GenHTTP.Modules.Websockets.Websocket;

Expand All @@ -19,4 +20,19 @@ public async Task TestInvalidRequest()
Assert.AreEqual(HttpStatusCode.BadRequest, response.StatusCode);
}

[TestMethod]
public async Task TestErrorHandling()
{
var server = WS.Create()
.OnOpen(_ => throw new InvalidOperationException("Ooops"));

await using var host = await TestHost.RunAsync(server);

using var client = new WebsocketClient(new Uri("ws://localhost:" + host.Port));

await client.Start();

await Task.Delay(1000);
}

}
15 changes: 7 additions & 8 deletions Testing/Acceptance/Modules/Websockets/InitializerTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using WS = GenHTTP.Modules.Websockets;

namespace GenHTTP.Testing.Acceptance.Modules.Websockets;
Expand All @@ -11,13 +10,13 @@ public class InitializerTest
public void InitializeAll()
{
WS.Websocket.Create()
.OnOpen(s => { })
.OnClose(s => { })
.OnPing((s, b) => { s.SendPong(b); })
.OnPong((s, b) => { })
.OnMessage((s, x) => { })
.OnBinary((s, x) => { })
.OnError((s, x) => { })
.OnOpen(s => Task.CompletedTask)
.OnClose(s => Task.CompletedTask)
.OnPing(async (s, b) => { await s.SendPongAsync(b); })
.OnPong((s, b) => Task.CompletedTask)
.OnMessage((s, x) => Task.CompletedTask)
.OnBinary((s, x) => Task.CompletedTask)
.OnError((s, x) => Task.CompletedTask)
.Protocol("chat");
}

Expand Down
Loading
Loading