-
Notifications
You must be signed in to change notification settings - Fork 855
Expand file tree
/
Copy pathHttp2CommonSession.cc
More file actions
503 lines (425 loc) · 17.1 KB
/
Http2CommonSession.cc
File metadata and controls
503 lines (425 loc) · 17.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/** @file
Http2CommonSession.cc
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "proxy/http2/Http2CommonSession.h"
#include "proxy/http/HttpDebugNames.h"
namespace
{
DbgCtl dbg_ctl_http2_cs{"http2_cs"};
#define REMEMBER(e, r) \
{ \
this->remember(MakeSourceLocation(), e, r); \
}
#define STATE_ENTER(state_name, event) \
do { \
REMEMBER(event, this->recursion) \
Dbg(dbg_ctl_http2_cs, "[%" PRId64 "] [%s, %s]", this->get_connection_id(), #state_name, \
HttpDebugNames::get_event_name(event)); \
} while (0)
#define Http2SsnDebug(fmt, ...) Dbg(dbg_ctl_http2_cs, "[%" PRId64 "] " fmt, this->get_connection_id(), ##__VA_ARGS__)
#define HTTP2_SET_SESSION_HANDLER(handler) \
do { \
REMEMBER(NO_EVENT, this->recursion); \
this->session_handler = (handler); \
} while (0)
// memcpy the requested bytes from the IOBufferReader, returning how many were
// actually copied.
inline unsigned
copy_from_buffer_reader(void *dst, IOBufferReader *reader, unsigned nbytes)
{
char *end;
end = reader->memcpy(dst, nbytes, 0 /* offset */);
return end - static_cast<char *>(dst);
}
} // end anonymous namespace
void
Http2CommonSession::remember(const SourceLocation &location, int event, int reentrant)
{
this->_history.push_back(location, event, reentrant);
}
bool
Http2CommonSession::common_free(ProxySession *ssn)
{
if (this->_reenable_event) {
this->_reenable_event->cancel();
this->_reenable_event = nullptr;
}
// Make sure the we are at the bottom of the stack
if (this->connection_state.is_recursing() || this->recursion != 0) {
// Note that we are ready to be cleaned up
// One of the event handlers will catch it
this->kill_me = true;
return false;
}
REMEMBER(NO_EVENT, this->recursion)
Http2SsnDebug("session free");
// Don't free active ProxySession
ink_release_assert(ssn->is_active() == false);
this->_milestones.mark(Http2SsnMilestone::CLOSE);
ink_hrtime total_time = this->_milestones.elapsed(Http2SsnMilestone::OPEN, Http2SsnMilestone::CLOSE);
// Slow Log
if (Http2::con_slow_log_threshold != 0 && ink_hrtime_from_msec(Http2::con_slow_log_threshold) < total_time) {
Error("[%" PRIu64 "] Slow H2 Connection: open: %" PRIu64 " close: %.3f", ssn->connection_id(),
ink_hrtime_to_msec(this->_milestones[Http2SsnMilestone::OPEN]),
this->_milestones.difference_sec(Http2SsnMilestone::OPEN, Http2SsnMilestone::CLOSE));
}
// Update stats on how we died. May want to eliminate this. Was useful for
// tracking down which cases we were having problems cleaning up. But for general
// use probably not worth the effort
if (cause_of_death != Http2SessionCod::NOT_PROVIDED) {
switch (cause_of_death) {
case Http2SessionCod::HIGH_ERROR_RATE:
Metrics::Counter::increment(http2_rsb.session_die_high_error_rate);
break;
case Http2SessionCod::NOT_PROVIDED:
// Can't happen but this case is here to not have default case.
Metrics::Counter::increment(http2_rsb.session_die_other);
break;
}
} else {
switch (dying_event) {
case VC_EVENT_NONE:
Metrics::Counter::increment(http2_rsb.session_die_default);
break;
case VC_EVENT_ACTIVE_TIMEOUT:
Metrics::Counter::increment(http2_rsb.session_die_active);
break;
case VC_EVENT_INACTIVITY_TIMEOUT:
Metrics::Counter::increment(http2_rsb.session_die_inactive);
break;
case VC_EVENT_ERROR:
Metrics::Counter::increment(http2_rsb.session_die_error);
break;
case VC_EVENT_EOS:
Metrics::Counter::increment(http2_rsb.session_die_eos);
break;
default:
Metrics::Counter::increment(http2_rsb.session_die_other);
break;
}
}
delete _h2_pushed_urls;
_h2_pushed_urls = nullptr;
this->connection_state.destroy();
free_MIOBuffer(this->read_buffer);
this->read_buffer = nullptr;
free_MIOBuffer(this->write_buffer);
this->write_buffer = nullptr;
return true;
}
void
Http2CommonSession::set_half_close_local_flag(bool flag)
{
if (!half_close_local && flag) {
Http2SsnDebug("session half-close local");
}
half_close_local = flag;
}
int64_t
Http2CommonSession::xmit(const Http2TxFrame &frame, bool flush)
{
int64_t len = frame.write_to(this->write_buffer);
this->_pending_sending_data_size += len;
if (!flush) {
// Flush if we already use half of the buffer to avoid adding a new block to the chain.
// A frame size can be 16MB at maximum so blocks can be added, but that's fine.
if (this->_pending_sending_data_size >= this->_write_size_threshold) {
flush = true;
} else {
// Observe that schedule_transmit will only schedule the first time we
// don't flush because the threshold is not met.
this->connection_state.schedule_retransmit(HRTIME_MSECONDS(Http2::write_time_threshold));
}
}
if (flush) {
this->flush();
}
return len;
}
void
Http2CommonSession::flush()
{
this->connection_state.cancel_retransmit();
if (this->_pending_sending_data_size > 0) {
this->_pending_sending_data_size = 0;
this->_write_buffer_last_flush = ink_get_hrtime();
write_reenable();
}
}
int
Http2CommonSession::state_read_connection_preface(int event, void *edata)
{
VIO *vio = static_cast<VIO *>(edata);
STATE_ENTER(&Http2CommonSession::state_read_connection_preface, event);
ink_assert(event == VC_EVENT_READ_COMPLETE || event == VC_EVENT_READ_READY);
if (this->_read_buffer_reader->read_avail() >= static_cast<int64_t>(HTTP2_CONNECTION_PREFACE_LEN)) {
char buf[HTTP2_CONNECTION_PREFACE_LEN];
unsigned nbytes;
nbytes = copy_from_buffer_reader(buf, this->_read_buffer_reader, sizeof(buf));
ink_release_assert(nbytes == HTTP2_CONNECTION_PREFACE_LEN);
if (memcmp(HTTP2_CONNECTION_PREFACE, buf, nbytes) != 0) {
Http2SsnDebug("invalid connection preface");
this->get_proxy_session()->do_io_close();
return 0;
}
// Check whether data is read from early data
if (this->read_from_early_data > 0) {
this->read_from_early_data -= this->read_from_early_data > nbytes ? nbytes : this->read_from_early_data;
}
Http2SsnDebug("received connection preface");
this->_read_buffer_reader->consume(nbytes);
HTTP2_SET_SESSION_HANDLER(&Http2CommonSession::state_start_frame_read);
this->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(Http2::no_activity_timeout_in));
this->get_netvc()->set_active_timeout(HRTIME_SECONDS(Http2::active_timeout_in));
// XXX start the write VIO ...
// If we have unconsumed data, start tranferring frames now.
if (this->_read_buffer_reader->is_read_avail_more_than(0)) {
return this->get_proxy_session()->handleEvent(VC_EVENT_READ_READY, vio);
}
}
// XXX We don't have enough data to check the connection preface. We should
// reset the accept inactivity
// timeout. We should have a maximum timeout to get the session started
// though.
vio->reenable();
return 0;
}
int
Http2CommonSession::state_start_frame_read(int event, void *edata)
{
VIO *vio = static_cast<VIO *>(edata);
STATE_ENTER(&Http2CommonSession::state_start_frame_read, event);
ink_assert(event == VC_EVENT_READ_COMPLETE || event == VC_EVENT_READ_READY);
return do_process_frame_read(event, vio, false);
}
int
Http2CommonSession::do_start_frame_read(Http2ErrorCode &ret_error)
{
ret_error = Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
ink_release_assert(this->_read_buffer_reader->read_avail() >= (int64_t)HTTP2_FRAME_HEADER_LEN);
uint8_t buf[HTTP2_FRAME_HEADER_LEN];
unsigned nbytes;
Http2SsnDebug("receiving frame header");
nbytes = copy_from_buffer_reader(buf, this->_read_buffer_reader, sizeof(buf));
this->cur_frame_from_early_data = false;
if (!http2_parse_frame_header(make_iovec(buf), this->current_hdr)) {
Http2SsnDebug("frame header parse failure");
this->get_proxy_session()->do_io_close();
return -1;
}
// Check whether data is read from early data
if (this->read_from_early_data > 0) {
this->read_from_early_data -= this->read_from_early_data > nbytes ? nbytes : this->read_from_early_data;
this->cur_frame_from_early_data = true;
}
Http2SsnDebug("frame header length=%u, type=%u, flags=0x%x, streamid=%u, early_data=%d", (unsigned)this->current_hdr.length,
(unsigned)this->current_hdr.type, (unsigned)this->current_hdr.flags, this->current_hdr.streamid,
this->cur_frame_from_early_data);
this->_read_buffer_reader->consume(nbytes);
if (!http2_frame_header_is_valid(this->current_hdr, this->connection_state.local_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE))) {
ret_error = Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
return -1;
}
// If we know up front that the payload is too long, nuke this connection.
if (this->current_hdr.length > this->connection_state.local_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE)) {
ret_error = Http2ErrorCode::HTTP2_ERROR_FRAME_SIZE_ERROR;
return -1;
}
// CONTINUATIONs MUST follow behind HEADERS which doesn't have END_HEADERS
Http2StreamId continued_stream_id = this->connection_state.get_continued_stream_id();
if (continued_stream_id != 0 &&
(continued_stream_id != this->current_hdr.streamid || this->current_hdr.type != HTTP2_FRAME_TYPE_CONTINUATION)) {
ret_error = Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
return -1;
}
return 0;
}
int
Http2CommonSession::state_complete_frame_read(int event, void *edata)
{
VIO *vio = static_cast<VIO *>(edata);
STATE_ENTER(&Http2CommonSession::state_complete_frame_read, event);
ink_assert(event == VC_EVENT_READ_COMPLETE || event == VC_EVENT_READ_READY);
if (this->_read_buffer_reader->read_avail() < this->current_hdr.length) {
if (this->_should_do_something_else()) {
if (this->_reenable_event == nullptr) {
vio->disable();
this->_reenable_event = this->get_mutex()->thread_holding->schedule_in(this->get_proxy_session(), HRTIME_MSECONDS(1),
HTTP2_SESSION_EVENT_REENABLE, vio);
} else {
vio->reenable();
}
} else {
vio->reenable();
}
return 0;
}
Http2SsnDebug("completed frame read, %" PRId64 " bytes available", this->_read_buffer_reader->read_avail());
return do_process_frame_read(event, vio, true);
}
int
Http2CommonSession::do_complete_frame_read()
{
// XXX parse the frame and handle it ...
ink_release_assert(this->_read_buffer_reader->read_avail() >= this->current_hdr.length);
Http2Frame frame(this->current_hdr, this->_read_buffer_reader, this->cur_frame_from_early_data);
this->_count_received_frames(frame.header().type);
connection_state.rcv_frame(&frame);
// Check whether data is read from early data
if (this->read_from_early_data > 0) {
this->read_from_early_data -=
this->read_from_early_data > this->current_hdr.length ? this->current_hdr.length : this->read_from_early_data;
}
this->_read_buffer_reader->consume(this->current_hdr.length);
++(this->_n_frame_read);
// Set the event handler if there is no more data to process a new frame
HTTP2_SET_SESSION_HANDLER(&Http2CommonSession::state_start_frame_read);
return 0;
}
int
Http2CommonSession::do_process_frame_read(int /* event ATS_UNUSED */, VIO *vio, bool inside_frame)
{
Http2SsnDebug("do_process_frame_read %" PRId64 " bytes ready", this->_read_buffer_reader->read_avail());
if (inside_frame) {
do_complete_frame_read();
}
while (this->_read_buffer_reader->read_avail() >= static_cast<int64_t>(HTTP2_FRAME_HEADER_LEN)) {
// Cancel reading if there was an error or connection is closed
const auto has_fatal_error_code =
(connection_state.tx_error_code.code != static_cast<uint32_t>(Http2ErrorCode::HTTP2_ERROR_NO_ERROR) &&
connection_state.tx_error_code.code != static_cast<uint32_t>(Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM));
if (has_fatal_error_code || connection_state.is_state_closed()) {
Http2SsnDebug("reading a frame has been canceled (%u)", connection_state.tx_error_code.code);
return 0;
}
if (this->connection_state.get_stream_error_rate() > std::min(1.0, Http2::stream_error_rate_threshold * 2.0) &&
(!this->connection_state.get_goaway_sent() || this->connection_state.get_last_stream_id_tx() == INT32_MAX)) {
ip_port_text_buffer ipb;
const char *peer_ip = ats_ip_ntop(this->get_proxy_session()->get_remote_addr(), ipb, sizeof(ipb));
SiteThrottledWarning("HTTP/2 session error peer_ip=%s session_id=%" PRId64
" closing a connection, because its stream error rate (%f) exceeded the threshold (%f)",
peer_ip, this->get_connection_id(), this->connection_state.get_stream_error_rate(),
Http2::stream_error_rate_threshold);
this->connection_state.send_goaway_frame(this->connection_state.get_latest_stream_id_in(),
Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM);
this->set_half_close_local_flag(true);
}
// Return if there was an error
auto err = Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
if (do_start_frame_read(err) < 0) {
// send an error if specified. Otherwise, just go away
this->connection_state.restart_receiving(nullptr);
if (err > Http2ErrorCode::HTTP2_ERROR_NO_ERROR) {
if (!this->connection_state.is_state_closed()) {
this->connection_state.send_goaway_frame(this->connection_state.get_latest_stream_id_in(), err);
this->set_half_close_local_flag(true);
}
}
return 0;
}
// If there is no more data to finish the frame, set up the event handler and reenable
if (this->_read_buffer_reader->read_avail() < this->current_hdr.length) {
HTTP2_SET_SESSION_HANDLER(&Http2CommonSession::state_complete_frame_read);
break;
}
do_complete_frame_read();
if (this->_should_do_something_else()) {
if (this->_reenable_event == nullptr) {
this->connection_state.restart_receiving(nullptr);
vio->disable();
this->_reenable_event = this->get_mutex()->thread_holding->schedule_in(this->get_proxy_session(), HRTIME_MSECONDS(1),
HTTP2_SESSION_EVENT_REENABLE, vio);
return 0;
}
}
}
// If the client hasn't shut us down, reenable
if (!this->get_proxy_session()->is_peer_closed()) {
vio->reenable();
}
return 0;
}
bool
Http2CommonSession::_should_do_something_else()
{
if (this->get_proxy_session()->is_peer_closed()) {
return false;
}
if (this->_interrupt_reading_frames) {
this->_interrupt_reading_frames = false;
return true;
}
// Do something else every 128 incoming frames if connection state isn't closed
return (this->_n_frame_read & 0x7F) == 0 && !connection_state.is_state_closed();
}
void
Http2CommonSession::interrupt_reading_frames()
{
this->_interrupt_reading_frames = true;
}
int64_t
Http2CommonSession::write_avail()
{
return this->write_buffer->write_avail();
}
bool
Http2CommonSession::is_write_high_water() const
{
return this->write_buffer->high_water();
}
void
Http2CommonSession::write_reenable()
{
if (write_vio) {
// Grab the lock for the write_vio. Holding the lock is
// checked eventually via the reenable logic
SCOPED_MUTEX_LOCK(lock, write_vio->mutex, this_ethread());
write_vio->reenable();
}
}
void
Http2CommonSession::add_url_to_pushed_table(const char *url, int url_len)
{
// Delay std::unordered_set allocation until when it used
if (_h2_pushed_urls == nullptr) {
this->_h2_pushed_urls = new std::unordered_set<std::string>();
this->_h2_pushed_urls->reserve(Http2::push_diary_size);
}
if (_h2_pushed_urls->size() < Http2::push_diary_size) {
_h2_pushed_urls->emplace(url, url_len);
}
}
void
Http2CommonSession::add_session()
{
}
bool
Http2CommonSession::is_outbound() const
{
return false;
}
void
Http2CommonSession::_count_received_frames(uint32_t type)
{
if (type > HTTP2_FRAME_TYPE_MAX) {
type = HTTP2_FRAME_TYPE_MAX;
}
// Global counter
Metrics::Counter::increment(http2_frame_metrics_in[type]);
// Local counter
this->_frame_counts_in[type]++;
}