11import logging
22import socket
3+ from enum import Enum
34from typing import Iterable , Callable
45
56from h2 .frame_buffer import FrameBuffer
@@ -63,6 +64,11 @@ def apply_http2_patches_for_grpc_support(
6364 )
6465 patched_connection = True
6566
67+ class ForwardingState (Enum ):
68+ UNDECIDED = "undecided"
69+ FORWARDING = "forwarding"
70+ PASSTHROUGH = "passthrough"
71+
6672 class ForwardingBuffer :
6773 """
6874 A buffer atop the HTTP2 client connection, that will hold
@@ -72,7 +78,7 @@ class ForwardingBuffer:
7278
7379 backend : TcpForwarder
7480 buffer : list
75- proxying : bool | None
81+ state : ForwardingState
7682
7783 def __init__ (self , http_response_stream ):
7884 self .http_response_stream = http_response_stream
@@ -81,7 +87,7 @@ def __init__(self, http_response_stream):
8187 )
8288 self .backend = TcpForwarder (target_port , host = target_host )
8389 self .buffer = []
84- self .proxying = None
90+ self .state = ForwardingState . UNDECIDED
8591 reactor .getThreadPool ().callInThread (
8692 self .backend .receive_loop , self .received_from_backend
8793 )
@@ -91,35 +97,30 @@ def received_from_backend(self, data):
9197 self .http_response_stream .write (data )
9298
9399 def received_from_http2_client (self , data , default_handler : Callable ):
94- if self .proxying is False :
95- # Note: Return here only if `proxying` is `False` (a value of `None` indicates
96- # that the headers have not fully been received yet)
97- return default_handler (data )
98-
99- if self .proxying :
100- assert not self .buffer
101- # Keep sending data to the backend for the lifetime of this connection
102- self .backend .send (data )
103- return
104-
105- self .buffer .append (data )
106-
107- if not (headers := get_headers_from_data_stream (self .buffer )):
108- # If no headers received yet, then return (method will be called again for next chunk of data)
109- return
110-
111- self .proxying = should_proxy_request (headers )
112-
113- buffered_data = b"" .join (self .buffer )
114- self .buffer = []
115-
116- if not self .proxying :
117- # if this is not a target request, then call the default handler
118- default_handler (buffered_data )
119- return
120-
121- LOG .debug (f"Forwarding { len (buffered_data )} bytes to backend" )
122- self .backend .send (buffered_data )
100+ match self .state :
101+ case ForwardingState .PASSTHROUGH :
102+ default_handler (data )
103+ case ForwardingState .FORWARDING :
104+ assert not self .buffer
105+ # Keep sending data to the backend for the lifetime of this connection
106+ self .backend .send (data )
107+ case ForwardingState .UNDECIDED :
108+ self .buffer .append (data )
109+
110+ if headers := get_headers_from_data_stream (self .buffer ):
111+ buffered_data = b"" .join (self .buffer )
112+ self .buffer = []
113+
114+ if should_proxy_request (headers ):
115+ self .state = ForwardingState .FORWARDING
116+ LOG .debug (
117+ f"Forwarding { len (buffered_data )} bytes to backend"
118+ )
119+ self .backend .send (buffered_data )
120+ else :
121+ self .state = ForwardingState .PASSTHROUGH
122+ # if this is not a target request, then call the default handler
123+ default_handler (buffered_data )
123124
124125 def close (self ):
125126 self .backend .close ()
0 commit comments