-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathConnection.cs
More file actions
542 lines (474 loc) · 19.1 KB
/
Copy pathConnection.cs
File metadata and controls
542 lines (474 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Reflection;
using System.Security.Cryptography;
using System.Threading.Tasks;
using System.Threading;
namespace WSNet2
{
class Connection
{
/// <summary>受信バッファ拡張単位</summary>
/// <remarks>Ethernet frame payload size</remarks>
const int EvBufExpandSize = 1500;
/// <summary>Closeメッセージを送り返すときのタイムアウト</summary>
/// <remarks>サーバ側のconn.Close()までの猶予時間に合わせる</remarks>
const int SendCloseTimeout = 3000;
static AuthDataGenerator authgen = new AuthDataGenerator();
public MsgPool msgPool { get; private set; }
CancellationTokenSource canceller;
DateTime reconnectLimit;
Exception reconnectLimitException;
Room room;
string appId;
string clientId;
Uri uri;
string authKey;
HMAC hmac;
volatile int pingInterval;
volatile uint lastPingTime;
CancellationTokenSource pingerDelayCanceller;
SemaphoreSlim sendSemaphore;
bool closed;
TaskCompletionSource<Task> senderTaskSource;
TaskCompletionSource<Task> pingerTaskSource;
BlockingCollection<byte[]> evBufPool;
uint evSeqNum;
Logger logger;
/// <summary>
/// コンストラクタ
/// </summary>
public Connection(Room room, string clientId, HMAC hmac, JoinedRoom joined, Logger logger)
{
this.logger = logger;
this.canceller = new CancellationTokenSource();
this.reconnectLimit = DateTime.Now.AddSeconds(room.ClientDeadline);
this.reconnectLimitException = null;
this.room = room;
this.appId = joined.roomInfo.appId;
this.clientId = clientId;
this.uri = new Uri(joined.url);
this.authKey = joined.authKey;
this.hmac = hmac;
this.pingInterval = calcPingInterval(room.ClientDeadline);
this.pingerDelayCanceller = new CancellationTokenSource();
this.sendSemaphore = new SemaphoreSlim(1, 1);
this.closed = false;
this.evSeqNum = 0;
this.evBufPool = new BlockingCollection<byte[]>(
new ConcurrentStack<byte[]>(), WSNet2Settings.EvPoolSize);
for (var i = 0; i < WSNet2Settings.EvPoolSize; i++)
{
evBufPool.Add(new byte[WSNet2Settings.EvBufInitialSize]);
}
this.msgPool = new MsgPool(WSNet2Settings.MsgPoolSize, WSNet2Settings.MsgBufInitialSize, hmac);
}
/// <summary>
/// 強制切断
/// </summary>
public void Cancel()
{
canceller.Cancel();
logger?.Debug("connection canceled");
}
/// <summary>
/// websocket接続をはじめる
/// </summary>
/// <remarks>
/// <para>
/// NormalClosure or EndpointUnavailable まで自動再接続する
/// もしくはクライアントからの強制切断
/// </para>
/// </remarks>
public async Task Start()
{
bool connected = false;
int reconnection = 0;
while (true)
{
Exception lastException;
var retryInterval = Task.Delay(WSNet2Settings.RetryIntervalMilliSec);
if (canceller.IsCancellationRequested)
{
return;
}
var cts = CancellationTokenSource.CreateLinkedTokenSource(canceller.Token);
// Receiverの中でEvPeerReadyを受け取ったらSender/Pingerを起動する
// SenderのTaskをawaitしたいのでこれで受け取る
senderTaskSource = new TaskCompletionSource<Task>(TaskCreationOptions.RunContinuationsAsynchronously);
pingerTaskSource = new TaskCompletionSource<Task>(TaskCreationOptions.RunContinuationsAsynchronously);
var receiverTask = Task.CompletedTask;
try
{
using var ws = await Connect(cts.Token);
connected = true;
room.ConnectionStateChanged(true);
receiverTask = Task.Run(async () => await Receiver(ws, cts.Token));
await await Task.WhenAny(
receiverTask,
Task.Run(async () => await await senderTaskSource.Task),
Task.Run(async () => await await pingerTaskSource.Task));
// finish task without exception: unreconnectable. don't retry.
return;
}
catch (Exception e)
{
logger?.Warning(e, "connection exception: {0}", e);
// retry
lastException = e;
}
finally
{
senderTaskSource.TrySetCanceled();
pingerTaskSource.TrySetCanceled();
cts.Cancel();
if (connected)
{
connected = false;
room.ConnectionStateChanged(false);
}
}
if (canceller.IsCancellationRequested)
{
return;
}
room.handleError(lastException);
await retryInterval;
try
{
await receiverTask; // recconectLimitへの書き込みがなくなるのを待つ
}
catch { }
if (reconnectLimitException != null)
{
// 再接続時間切れの次の試行で失敗したら終わり
throw new AggregateException($"Gave up on Reconnection", reconnectLimitException, lastException);
}
if (DateTime.Now > reconnectLimit)
{
// 再接続時間切れしてももう一回試す
reconnectLimitException = lastException;
}
reconnection++;
logger?.Info("reconnect now:{0}, limit:{1}, count:{2}", DateTime.Now, reconnectLimit, reconnection);
}
}
/// <summary>
/// Room.handleEvent()に渡したEventを使い終わったら返却する.
/// </summary>
public void ReturnEventBuffer(Event ev)
{
if (ev.BufferArray != null)
{
evBufPool.Add(ev.BufferArray);
}
}
/// <summary>
/// Ping間隔を更新する
/// </summary>
public void UpdatePingInterval(uint deadline)
{
var canceller = pingerDelayCanceller;
pingInterval = calcPingInterval(deadline);
pingerDelayCanceller = new CancellationTokenSource();
canceller.Cancel();
}
/// <summary>
/// Websocketで接続する
/// </summary>
private async Task<ClientWebSocket> Connect(CancellationToken ct)
{
var ws = new ClientWebSocket();
var authdata = authgen.GenerateBearer(authKey, clientId);
ws.Options.SetRequestHeader("Authorization", authdata);
ws.Options.SetRequestHeader("Wsnet2-App", appId);
ws.Options.SetRequestHeader("Wsnet2-User", clientId);
ws.Options.SetRequestHeader("Wsnet2-LastEventSeq", evSeqNum.ToString());
ws.Options.AddSubProtocol("wsnet2");
logger?.Info("connecting to {0}", uri);
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(WSNet2Settings.ConnectTimeoutMilliSec);
NetworkInformer.OnRoomConnectRequest(room, uri.AbsoluteUri);
await ws.ConnectAsync(uri, cts.Token);
SetTcpNoDelay(ws);
logger?.Info("connected");
return ws;
}
/// <summary>
/// Event受信ループ
/// </summary>
private async Task Receiver(ClientWebSocket ws, CancellationToken ct)
{
try
{
while (true)
{
ct.ThrowIfCancellationRequested();
var ev = await ReceiveEvent(ws, ct);
this.reconnectLimit = DateTime.Now.AddSeconds(room.ClientDeadline);
this.reconnectLimitException = null;
if (ev.IsRegular)
{
if (ev.SequenceNum != evSeqNum + 1)
{
evBufPool.Add(ev.BufferArray);
throw new Exception($"invalid event sequence number: {ev.SequenceNum} wants {evSeqNum + 1}");
}
evSeqNum++;
}
switch (ev.Type)
{
case EvType.Closed:
// 正常終了。もう再接続しない。
room.handleEvent(ev);
return;
case EvType.PeerReady:
var evpr = ev as EvPeerReady;
logger?.Info("receive peer-ready: lastMsgSeqNum={0}", evpr.LastMsgSeqNum);
var sender = Task.Run(async () => await Sender(ws, evpr.LastMsgSeqNum + 1, ct));
var pinger = Task.Run(async () => await Pinger(ws, ct));
senderTaskSource.TrySetResult(sender);
pingerTaskSource.TrySetResult(pinger);
break;
case EvType.Pong:
onPong(ev as EvPong);
room.handleEvent(ev);
break;
default:
room.handleEvent(ev);
break;
}
}
}
catch (OperationCanceledException)
{
// ctのキャンセルはループを抜けて終了
}
}
/// <summary>
/// Eventの受信
/// </summary>
private async Task<Event> ReceiveEvent(ClientWebSocket ws, CancellationToken ct)
{
var buf = evBufPool.Take(ct);
try
{
var pos = 0;
while (true)
{
var seg = new ArraySegment<byte>(buf, pos, buf.Length - pos);
var ret = await ws.ReceiveAsync(seg, ct);
if (ret.MessageType == WebSocketMessageType.Close)
{
// iOSでごく稀にws.CloseAsync()が返ってこないことがあるので別Taskで実行
// Semaphoreを握りっぱなしになるけどこの接続は終了するので基本的には問題ない
Task.Run(async () => await SendClose(ws, ret.CloseStatusDescription, ct));
switch (ret.CloseStatus)
{
case WebSocketCloseStatus.NormalClosure:
case WebSocketCloseStatus.EndpointUnavailable: // server: CloseGoingAway
// unreconnectable states.
evBufPool.Add(buf);
return new EvClosed(ret.CloseStatusDescription);
default:
throw new Exception("ws status:(" + ret.CloseStatus.Value + ") " + ret.CloseStatusDescription);
}
}
pos += ret.Count;
if (ret.EndOfMessage)
{
break;
}
// バッファの空きが少ないときは拡張して続きを受信
if (buf.Length - pos < EvBufExpandSize)
{
var expandSize = (buf.Length < EvBufExpandSize) ? buf.Length : EvBufExpandSize;
Array.Resize(ref buf, buf.Length + expandSize);
}
}
var ev = Event.Parse(new ArraySegment<byte>(buf, 0, pos));
NetworkInformer.OnRoomReceive(room, pos, ev);
return ev;
}
catch (Exception)
{
evBufPool.Add(buf);
throw;
}
}
/// <summary>
/// Msg送信ループ
/// </summary>
/// <param name="seqNum">開始Msg通し番号</param>
/// <param name="ct">ループ停止するトークン</param>
private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct)
{
while (true)
{
msgPool.Wait(ct);
ArraySegment<byte>? msg;
while ((msg = msgPool.Take(seqNum)).HasValue)
{
if (ct.IsCancellationRequested)
{
return; // ctのキャンセルで終了
}
await Send(ws, msg.Value, ct);
seqNum++;
}
}
}
/// <summary>
/// Ping送信ループ
/// </summary>
private async Task Pinger(ClientWebSocket ws, CancellationToken ct)
{
var msg = new MsgPing(hmac);
while (true)
{
if (ct.IsCancellationRequested)
{
return; // ctのキャンセルで終了
}
var interval = Task.Delay(pingInterval, pingerDelayCanceller.Token);
var time = (uint)msg.SetTimestamp();
lastPingTime = time;
await Send(ws, msg.Value, ct);
try
{
await interval;
// 対応するPongが返ってきていたらlastPingTimeは書き換わっている
if (lastPingTime == time)
{
throw new Exception("Pong unreceived");
}
}
catch (TaskCanceledException)
{
// pingerDelayCancellerによるcancelは無視
}
}
}
/// <summary>
/// websocketメッセージを送信
/// </summary>
private async Task Send(ClientWebSocket ws, ArraySegment<byte> msg, CancellationToken ct)
{
if (closed)
{
// SendCloseがsemaphore握りっぱなしになる対策で先にチェック
// ここすり抜けてsemaphore待ってしまったらご愁傷さま……
return;
}
await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}
NetworkInformer.OnRoomSend(room, msg);
await ws.SendAsync(msg, WebSocketMessageType.Binary, true, ct);
}
finally
{
sendSemaphore.Release();
}
}
private async Task SendClose(ClientWebSocket ws, string msg, CancellationToken ct)
{
await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}
closed = true;
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(SendCloseTimeout);
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token);
}
finally
{
sendSemaphore.Release();
}
}
/// <summary>
/// Pong受信時の処理
/// </summary>
/// <remarks>
/// <para>
/// lastPingTimeのPingに対応するPongを受け取ったらlastPingTimeを異なる値に変更
/// </para>
/// </remarks>
private void onPong(EvPong ev)
{
var time = (uint)ev.PingTimestamp;
if (lastPingTime == time)
{
lastPingTime ^= uint.MaxValue;
}
}
/// <summary>
/// サーバから通知されたdeadline(秒)からPing間隔(ミリ秒)を計算
/// </summary>
/// <remarks>
/// <para>
/// 旧wsnetではdeadlineは通常25秒、Ping間隔は5秒固定だったので、
/// deadline/5を基準として最大最小間隔を超えない値にする。
/// </para>
/// </remarks>
private int calcPingInterval(uint deadline)
{
var ms = (int)deadline * 1000 / 5;
return (ms < WSNet2Settings.MinPingIntervalMilliSec) ? WSNet2Settings.MinPingIntervalMilliSec
: (ms > WSNet2Settings.MaxPingIntervalMilliSec) ? WSNet2Settings.MaxPingIntervalMilliSec : ms;
}
/// <summary>
/// TCP_NODELAYを有効にする
/// </summary>
/// UnityのC# (.NET Framework/.NET Standard 2.1) ではデフォルト無効のため。
/// <remarks>
/// ClientWebSocketからTCPのSocketにアクセスする手段がないためReflectionを使います
/// </remarks>
[Conditional("NET_4_6"), Conditional("NET_STANDARD_2_0")]
private void SetTcpNoDelay(ClientWebSocket ws)
{
var fieldChain = new string[]{
"_innerWebSocket", // System.Net.WebSockets.WebSocketHandle
"_webSocket", // System.Net.WebSockets.ManagedWebSocket
"_stream", // System.Net.Sockets.NetworkStream
"_streamSocket", // System.Net.Sockets.Socket
};
object obj = ws;
foreach (var fieldName in fieldChain)
{
var type = obj.GetType();
var field = type.GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
if (field == null)
{
logger?.Warning("Field not found: {0}.{1}", type, fieldName);
return;
}
obj = field.GetValue(obj);
if (obj == null)
{
logger?.Warning("Field value is null: {0}.{1}", type, fieldName);
return;
}
}
var socket = obj as Socket;
if (socket == null)
{
logger?.Warning("object is not a Socket: {0}", obj.GetType());
return;
}
socket.NoDelay = true;
logger?.Info("TCP_NODELAY = {0}", socket.NoDelay);
}
}
}