-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathconnection.rb
More file actions
316 lines (257 loc) · 9.29 KB
/
Copy pathconnection.rb
File metadata and controls
316 lines (257 loc) · 9.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2019, by William T. Nelson.
# Copyright, 2021, by Aurora Nockert.
require_relative "framer"
module Protocol
module WebSocket
# Wraps a framer and implements for implementing connection specific interactions like reading and writing text.
class Connection
# @parameter mask [String] 4-byte mask to be used for frames generated by this connection.
def initialize(framer, mask: nil, **options)
@framer = framer
@mask = mask
@state = :open
@frames = []
@reserved = Frame::RESERVED
@reader = self
@writer = self
end
# @attribute [Framer] The framer which is used for reading and writing frames.
attr :framer
# @attribute [String | Boolean | Nil] The optional mask which is used when generating frames.
attr :mask
# @attribute [Integer] The allowed reserved bits.
attr :reserved
# @attribute [Array(Frame)] Buffered frames which form part of a complete message.
attr_accessor :frames
# @attribute [Object] The reader which is used to unpack frames into messages.
attr_accessor :reader
# @attribute [Object] The writer which is used to pack messages into frames.
attr_accessor :writer
# Reserve a bit in the reserved flags for an extension.
# @parameter bit [Integer] The bit to reserve, see {Frame::RESERVED} for more details.
def reserve!(bit)
if (@reserved & bit).zero?
raise ArgumentError, "Unable to use #{bit}!"
end
@reserved &= ~bit
return true
end
# Flush the underlying framer to ensure all buffered data is written to the connection.
def flush
@framer.flush
end
# Transition the connection to the open state (the default for new connections).
def open!
@state = :open
return self
end
# If not already closed, transition the connection to the closed state and send a close frame.
# Will try to send a close frame with the specified code and reason, but will ignore any errors that occur while sending.
def close!(...)
unless @state == :closed
@state = :closed
begin
send_close(...)
rescue
# Ignore errors.
end
end
return self
end
# @returns [Boolean] if the connection is in the closed state.
def closed?
@state == :closed
end
# Immediately transition the connection to the closed state *and* close the underlying connection. Any data not yet read will be lost.
def close(...)
close!(...)
@framer.close
end
# Close the connection gracefully, sending a close frame with the specified error code and reason. If an error occurs while sending the close frame, the connection will be closed immediately. You may continue to read data from the connection after calling this method, but you should not write any more data.
#
# @parameter error [Error | Nil] The error that occurred, if any.
def close_write(error = nil)
if error
send_close(Error::INTERNAL_ERROR, error.message)
else
send_close
end
rescue
@state = :closed
end
# Close the connection gracefully. This will send a close frame and wait for the remote end to respond with a close frame. Any data received after the close frame is sent will be ignored. If you want to process this data, use {#close_write} instead, and read the data before calling {#close}.
def shutdown
send_close unless @state == :closed
# `read_frame` will return nil after receiving a close frame:
while read_frame
# Drain the connection.
end
end
# Read a frame from the framer, and apply it to the connection.
def read_frame
return nil if closed?
frame = @framer.read_frame
unless (frame.flags & @reserved).zero?
raise ProtocolError, "Received frame with reserved flags set!"
end
yield frame if block_given?
frame.apply(self)
return frame
rescue ProtocolError => error
close(error.code, error.message)
raise
rescue => error
close(Error::PROTOCOL_ERROR, error.message)
raise
end
# Write a frame to the framer.
# Note: This does not immediately write the frame to the connection, you must call {#flush} to ensure the frame is written.
def write_frame(frame)
@framer.write_frame(frame)
return frame
end
# Receive a text frame from the connection.
def receive_text(frame)
if @frames.empty?
@frames << frame
else
raise ProtocolError, "Received text, but expecting continuation!"
end
end
# Receive a binary frame for the connection.
def receive_binary(frame)
if @frames.empty?
@frames << frame
else
raise ProtocolError, "Received binary, but expecting continuation!"
end
end
# Receive a continuation frame for the connection.
def receive_continuation(frame)
if @frames.any?
@frames << frame
else
raise ProtocolError, "Received unexpected continuation!"
end
end
# Receive a close frame from the connection.
def receive_close(frame)
code, reason = frame.unpack
# On receiving a close frame, we must enter the closed state:
close!(code, reason)
if code and code != Error::NO_ERROR
raise ClosedError.new reason, code
end
end
# Send a ping frame with the specified data.
# @parameter data [String] The data to send in the ping frame.
def send_ping(data = "")
if @state != :closed
frame = PingFrame.new(mask: @mask)
frame.pack(data)
write_frame(frame)
else
raise ProtocolError, "Cannot send ping in state #{@state}"
end
end
# Receive a ping frame from the connection.
def receive_ping(frame)
if @state != :closed
write_frame(frame.reply(mask: @mask))
else
raise ProtocolError, "Cannot receive ping in state #{@state}"
end
end
# Receive a pong frame from the connection. By default, this method does nothing.
def receive_pong(frame)
# Ignore.
end
# Receive a frame that is not a control frame. By default, this method raises a {ProtocolError}.
def receive_frame(frame)
raise ProtocolError, "Unhandled frame: #{frame}"
end
# Pack a text frame with the specified buffer. This is used by the {#writer} interface.
# @parameter buffer [String] The text to pack into the frame.
# @returns [TextFrame] The packed frame.
def pack_text_frame(buffer, **options)
frame = TextFrame.new(mask: @mask)
frame.pack(buffer)
return frame
end
# Send a text frame with the specified buffer.
# @parameter buffer [String] The text to send.
def send_text(buffer, **options)
write_frame(@writer.pack_text_frame(buffer, **options))
end
# Pack a binary frame with the specified buffer. This is used by the {#writer} interface.
# @parameter buffer [String] The binary data to pack into the frame.
# @returns [BinaryFrame] The packed frame.
def pack_binary_frame(buffer, **options)
frame = BinaryFrame.new(mask: @mask)
frame.pack(buffer)
return frame
end
# Send a binary frame with the specified buffer.
# @parameter buffer [String] The binary data to send.
def send_binary(buffer, **options)
write_frame(@writer.pack_binary_frame(buffer, **options))
end
# Send a control frame with data containing a specified control sequence to begin the closing handshake. Does not close the connection, until the remote end responds with a close frame.
# @parameter code [Integer] The close code to send.
# @parameter reason [String] The reason for closing the connection.
def send_close(code = Error::NO_ERROR, reason = "")
frame = CloseFrame.new(mask: @mask)
frame.pack(code, reason)
self.write_frame(frame)
self.flush
end
# Write a message to the connection.
# @parameter message [Message] The message to send.
def write(message, **options)
case message
when String
# This is a compatibility shim for the previous implementation. We may want to eventually deprecate this use case... or maybe it's convenient enough to leave it around.
if message.encoding == Encoding::UTF_8
return send_text(message, **options)
else
return send_binary(message, **options)
end
when Message
message.send(self, **options)
else
raise ArgumentError, "Unsupported message type: #{message}"
end
end
# The default implementation for reading a message buffer. This is used by the {#reader} interface.
def unpack_frames(frames)
if frames.size == 1
frames[0].unpack
else
frames.map(&:unpack).join("")
end
end
# Read a message from the connection. If an error occurs while reading the message, the connection will be closed.
#
# If the message is fragmented, this method will buffer the frames until a complete message is received.
#
# @returns message [Message] The received message.
def read(**options)
@framer.flush
while read_frame
if @frames.last&.finished?
frames = @frames
@frames = []
buffer = @reader.unpack_frames(frames, **options)
return frames.first.read_message(buffer)
end
end
rescue ProtocolError => error
close(error.code, error.message)
raise
end
end
end
end