-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebSocketHandlerMiddleware.cs
More file actions
79 lines (66 loc) · 2.86 KB
/
WebSocketHandlerMiddleware.cs
File metadata and controls
79 lines (66 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json.Serialization;
namespace Amega.BinanceService.Webapi
{
public class WebSocketHandlerMiddleware
{
private static readonly ConcurrentDictionary<string, WebSocket> WebSockets = new ConcurrentDictionary<string, WebSocket>();
private readonly RequestDelegate _next;
private readonly ILogger<WebSocketHandlerMiddleware> _logger;
public WebSocketHandlerMiddleware(RequestDelegate next, ILogger<WebSocketHandlerMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context)
{
try
{
if (context.WebSockets.IsWebSocketRequest)
{
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var receivedBytes = new ArraySegment<byte>(new byte[1024]);
var result = await webSocket.ReceiveAsync(receivedBytes, CancellationToken.None);
var instrument = System.Text.Json.JsonSerializer.Deserialize<SubscriptionRequest>(Encoding.UTF8.GetString(receivedBytes.Array, 0, result.Count));
var socketId = Guid.NewGuid().ToString();
WebSockets[socketId] = webSocket;
await ListenWebSocketAsync(socketId, webSocket, instrument);
}
}
catch (Exception ex)
{
_logger.LogError($"Middleware error: '{ex.Message}'.");
}
finally
{
await _next.Invoke(context);
}
}
private async Task ListenWebSocketAsync(string socketId, WebSocket webSocket, SubscriptionRequest instrument)
{
var cancellationToken = new CancellationToken();
while (webSocket.State == WebSocketState.Open)
{
if (BinanceDataProvider.Instance.CurrentReceivedMessage is null) continue;
await Task.Delay(800);
var currency = await BinanceDataProvider.Instance.GetCurrentCurrencyByName(instrument.Instrument);
var updateMessage = Encoding.UTF8.GetBytes(currency.SourceMessage);
await webSocket.SendAsync(new ArraySegment<byte>(updateMessage), WebSocketMessageType.Text, true, cancellationToken);
}
}
private class SubscriptionRequest
{
[JsonPropertyName("instrument")]
public string Instrument { get; set; }
}
}
public static class WebSocketHandlerMiddlewareExtensions
{
public static IApplicationBuilder UseWebSocketHandler(this IApplicationBuilder builder)
{
return builder.UseMiddleware<WebSocketHandlerMiddleware>();
}
}
}