-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathWebSocketClient.swift
More file actions
456 lines (396 loc) · 13.9 KB
/
WebSocketClient.swift
File metadata and controls
456 lines (396 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
import Foundation
import NIO
import NIOHTTP1
import NIOWebSocket
import Dispatch
import NIOFoundationCompat
import NIOSSL
public let WEBSOCKET_LOCKER_QUEUE = "SyncLocker"
public let WEBSOCKET_THREAD_QUEUE = "ThreadLocker"
public let WEBSOCKET_CHANNEL_QUEUE = "ChannelLocker"
/// Creates and manages connections to a WebSocket server.
///
/// Creates a connection to the remote host and allows setting callbacks for messages sent from the WebSocket server.
public class WebSocketClient {
// MARK: - Properties
let host: String
let port: Int
let uri: String
let query: String
let headers: HTTPHeaders
let frameKey: String
public private(set) var maxFrameSize: Int
var tlsEnabled: Bool = false
var closeSent: Bool = false
private let locker = DispatchQueue(label: WEBSOCKET_LOCKER_QUEUE, qos: .background)
private let channelQueue = DispatchQueue(label: WEBSOCKET_CHANNEL_QUEUE)
private let threadGroupQueue = DispatchQueue(label: WEBSOCKET_THREAD_QUEUE)
var channel: Channel? {
get {
return channelQueue.sync { _channel }
}
set {
channelQueue.sync { _channel = newValue }
}
}
private var _channel: Channel? = nil
var threadGroup: MultiThreadedEventLoopGroup? {
get {
return threadGroupQueue.sync { _threadGroup }
}
set {
threadGroupQueue.sync { _threadGroup = newValue }
}
}
private var _threadGroup: MultiThreadedEventLoopGroup?
weak var delegate: WebSocketClientDelegate? = nil
/// Is this client currently connected to a WebSocket
public var isConnected: Bool {
channel?.isActive ?? false
}
// MARK: - Stored callbacks
private var _openCallback: (Channel) -> Void = { _ in }
var onOpen: (Channel) -> Void {
get {
return locker.sync {
return _openCallback
}
}
set {
locker.sync {
_openCallback = newValue
}
}
}
private var _closeCallback: (Channel, Data) -> Void = { _,_ in }
var onClose: (Channel, Data) -> Void {
get {
return locker.sync {
return _closeCallback
}
}
set {
locker.sync {
_closeCallback = newValue
}
}
}
private var _textCallback: (String) -> Void = { _ in }
var onTextMessage: (String) -> Void {
get {
return locker.sync {
return _textCallback
}
}
set {
locker.sync {
_textCallback = newValue
}
}
}
private var _binaryCallback: (Data) -> Void = { _ in }
var onBinaryMessage: (Data) -> Void {
get {
return locker.sync {
return _binaryCallback
}
}
set {
locker.sync {
_binaryCallback = newValue
}
}
}
private var _errorCallBack: (Swift.Error?, HTTPResponseStatus?) -> Void = { _,_ in }
var onError: (Swift.Error?, HTTPResponseStatus?) -> Void {
get {
return locker.sync {
return _errorCallBack
}
}
set {
locker.sync {
_errorCallBack = newValue
}
}
}
// MARK: - Callback setters
/// Set a callback to be fired when a WebSocket connection is opened.
///
/// - parameters:
/// - callback: Callback to fie when a WebSocket connection is opened
public func onOpen(_ callback: @escaping (Channel) -> Void) {
onOpen = callback
}
/// Set a callback to be fired when a WebSocket text message is received.
///
/// - parameters:
/// - callback: Callback to fie when a WebSocket text message is received
public func onMessage(_ callback: @escaping (String) -> Void) {
onTextMessage = callback
}
/// Set a callback to be fired when a WebSocket binary message is received.
///
/// - parameters:
/// - callback: Callback to fie when a WebSocket binary message is received
public func onMessage(_ callback: @escaping (Data) -> Void) {
onBinaryMessage = callback
}
/// Set a callback to be fired when a WebSocket close message is received.
///
/// - parameters:
/// - callback: Callback to fie when a WebSocket close message is received
public func onClose(_ callback: @escaping (Channel, Data) -> Void) {
onClose = callback
}
/// Set a callback to be fired when a WebSocket error occurs.
///
/// - parameters:
/// - callback: Callback to fie when a WebSocket error occurs
public func onError(_ callback: @escaping (Swift.Error?, HTTPResponseStatus?) -> Void) {
onError = callback
}
// MARK: - Constructors
/// Create a new `WebSocketClient`.
///
/// - parameters:
/// - host: Host name of the remote server
/// - port: Port number on which the remote server is listening
/// - uri: The "Request-URI" of the GET method, it is used to identify the endpoint of the WebSocket connection
/// - frameKey: The key sent by client which server has to include while building it's response.
/// - maxFrameSize: Maximum allowable frame size of WebSocket client is configured using this parameter.
/// - tlsEnabled: Is TLS enabled for this client.
/// - delegate: Delegate to handle message and error callbacks
public init?(
host: String,
port: Int,
uri: String,
query: String,
frameKey: String,
headers: HTTPHeaders = HTTPHeaders(),
maxFrameSize: Int = 14,
tlsEnabled: Bool = true,
delegate: WebSocketClientDelegate? = nil
) {
self.host = host
self.port = port
self.uri = uri
self.query = query
self.headers = headers
self.frameKey = frameKey
self.maxFrameSize = maxFrameSize
self.tlsEnabled = tlsEnabled
self.delegate = delegate
DispatchQueue.global(qos: .background).async {
self.threadGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
}
}
/// Create a new `WebSocketClient`.
///
/// - parameters:
/// - url: The absolute URL of the GET method used to identify the WebSocket endpoint
/// - delegate: Delegate to handle message and error callbacks.
public init?(
_ url: String,
tlsEnabled: Bool = true,
headers: HTTPHeaders = HTTPHeaders(),
delegate: WebSocketClientDelegate? = nil
) {
let rawUrl = URL(string: url)
self.frameKey = "tergregfgbsfdgfdsfgdbv=="
self.host = rawUrl?.host ?? "localhost"
self.port = rawUrl?.port ?? (tlsEnabled ? 443 : 80)
self.uri = rawUrl?.path ?? "/"
self.query = rawUrl?.query ?? ""
self.headers = headers
self.maxFrameSize = 24
self.tlsEnabled = tlsEnabled
self.delegate = delegate
DispatchQueue.global(qos: .background).async {
self.threadGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
}
}
// MARK: - Open connection
/// Open a connection to the configured host and attempt to upgrade the connection to a WebSocket. If successful the `onOpen` callback will fire, otherwise a connection error will be thrown from here.
public func connect() async throws {
let socketOptions = ChannelOptions.socket(
SocketOptionLevel(SOL_SOCKET),
SO_REUSEPORT
)
while(threadGroup == nil) {
try? await Task.sleep(nanoseconds: 10_000_000)
}
let bootstrap = ClientBootstrap(group: threadGroup!)
.channelOption(socketOptions, value: 1)
.channelInitializer {
self.openChannel(channel: $0)
}
_ = try await bootstrap
.connect(host: self.host,port: self.port)
.get()
}
private func openChannel(channel: Channel) -> EventLoopFuture<Void> {
let httpHandler = HTTPHandler(client: self, headers: headers)
let basicUpgrader = NIOWebSocketClientUpgrader(
requestKey: self.frameKey,
upgradePipelineHandler: { channel, response in
self.upgradePipelineHandler(channel: channel, response: response)
}
)
let config: NIOHTTPClientUpgradeConfiguration = (upgraders: [basicUpgrader], completionHandler: { context in
context.channel.pipeline.removeHandler(httpHandler, promise: nil)
})
return channel.pipeline.addHTTPClientHandlers(withClientUpgrade: config).flatMap { _ in
return channel.pipeline.addHandler(httpHandler).flatMap { _ in
if self.tlsEnabled {
let tlsConfig = TLSConfiguration.makeClientConfiguration()
let sslContext = try! NIOSSLContext(configuration: tlsConfig)
let sslHandler = try! NIOSSLClientHandler(context: sslContext, serverHostname: self.host)
return channel.pipeline.addHandler(sslHandler, position: .first)
} else {
return channel.eventLoop.makeSucceededFuture(())
}
}
}
}
private func upgradePipelineHandler(channel: Channel, response: HTTPResponseHead) -> EventLoopFuture<Void> {
let handler = MessageHandler(client: self)
if response.status == .switchingProtocols {
self.channel = channel
}
return channel.pipeline.addHandler(handler).map {
if let delegate = self.delegate {
delegate.onOpen(channel: channel)
} else {
self.onOpen(channel)
}
}
}
// MARK: - Close connection
/// Closes the connection
///
/// - parameters:
/// - data: Close frame payload
public func close(
data: Data = Data(),
promise: EventLoopPromise<Void>? = nil
) {
closeSent = true
var buffer = ByteBufferAllocator()
.buffer(capacity: data.count)
buffer.writeBytes(data)
send(
data: buffer,
opcode: .connectionClose,
finalFrame: true,
promise: promise
)
}
// MARK: - Send data
/// Sends binary-formatted data to the connected server in multiple frames.
///
/// - parameters:
/// - raw: Raw data to be sent in the frame
/// - opcode: Websocket opcode indicating type of the frame
/// - finalFrame: Whether the frame to be sent is the last one
public func send(
data: Data,
opcode: WebSocketOpcode,
finalFrame: Bool = true,
promise: EventLoopPromise<Void>? = nil
) {
var buffer = ByteBufferAllocator()
.buffer(capacity: data.count)
buffer.writeBytes(data)
if opcode == .connectionClose {
self.closeSent = true
}
send(
data: buffer,
opcode: opcode,
finalFrame: finalFrame,
promise: promise
)
}
/// Sends text-formatted data to the connected server in multiple frames.
///
/// - parameters:
/// - raw: Raw text to be sent in the frame
/// - opcode: Websocket opcode indicating type of the frame
/// - finalFrame: Whether the frame to be sent is the last one
public func send(
text: String,
opcode: WebSocketOpcode = .text,
finalFrame: Bool = true,
promise: EventLoopPromise<Void>? = nil
) {
var buffer = ByteBufferAllocator()
.buffer(capacity: text.count)
buffer.writeString(text)
send(
data: buffer,
opcode: opcode,
finalFrame: finalFrame,
promise: promise
)
}
/// Sends the JSON representation of the given model to the connected server in multiple frames.
///
/// - parameters:
/// - model: The model to encode and send
/// - opcode: Websocket opcode indicating type of the frame
/// - finalFrame: Whether the frame to be sent is the last one
public func send<T: Codable>(
model: T,
opcode: WebSocketOpcode = .text,
finalFrame: Bool = true,
promise: EventLoopPromise<Void>? = nil
) {
let jsonEncoder = JSONEncoder()
do {
let jsonData = try jsonEncoder.encode(model)
let string = String(data: jsonData, encoding: .utf8)!
var buffer = ByteBufferAllocator()
.buffer(capacity: string.count)
buffer.writeString(string)
send(
data: buffer,
opcode: opcode,
finalFrame: finalFrame,
promise: promise
)
} catch let error {
print(error)
}
}
/// Sends buffered bytes to the connected server in multiple frames.
///
/// - parameters:
/// - model: The model to encode and send
/// - opcode: Websocket opcode indicating type of the frame
/// - finalFrame: Whether the frame to be sent is the last one
public func send(
data: ByteBuffer,
opcode: WebSocketOpcode,
finalFrame: Bool,
promise: EventLoopPromise<Void>? = nil
) {
let frame = WebSocketFrame(
fin: finalFrame,
opcode: opcode,
maskKey: nil,
data: data
)
guard let channel = channel else {
return
}
if finalFrame {
channel.writeAndFlush(frame, promise: promise)
} else {
channel.write(frame, promise: promise)
}
if opcode == .connectionClose {
channel.close(mode: .all, promise: promise)
}
}
}