From 408ece68ce54d6e96071e2ab46769c9c1db98dd7 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Thu, 23 Oct 2025 13:14:23 -0700 Subject: [PATCH 1/6] MVP of connection close --- packages/shared/pkg/proxy/pool/client.go | 16 +++++++++++-- packages/shared/pkg/proxy/pool/pool.go | 5 +++- packages/shared/pkg/proxy/proxy.go | 2 +- .../shared/pkg/proxy/tracking/connection.go | 23 +++++++++++++++++-- .../shared/pkg/proxy/tracking/listener.go | 2 +- 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/packages/shared/pkg/proxy/pool/client.go b/packages/shared/pkg/proxy/pool/client.go index ba769c5a06..2abce51cad 100644 --- a/packages/shared/pkg/proxy/pool/client.go +++ b/packages/shared/pkg/proxy/pool/client.go @@ -13,12 +13,15 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/proxy/template" "github.com/e2b-dev/infra/packages/shared/pkg/proxy/tracking" + "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) type ProxyClient struct { httputil.ReverseProxy transport *http.Transport + + activeConnections *smap.Map[*tracking.Connection] } func newProxyClient( @@ -30,6 +33,8 @@ func newProxyClient( currentConnsCounter *atomic.Int64, logger *log.Logger, ) *ProxyClient { + activeConnections := smap.New[*tracking.Connection]() + transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, // Limit the max connection per host to avoid exhausting the number of available ports to one host. @@ -56,7 +61,7 @@ func newProxyClient( if err == nil { totalConnsCounter.Add(1) - return tracking.NewConnection(conn, currentConnsCounter), nil + return tracking.NewConnection(conn, currentConnsCounter, activeConnections), nil } if ctx.Err() != nil { @@ -82,7 +87,8 @@ func newProxyClient( } return &ProxyClient{ - transport: transport, + transport: transport, + activeConnections: activeConnections, ReverseProxy: httputil.ReverseProxy{ Transport: transport, Rewrite: func(r *httputil.ProxyRequest) { @@ -167,3 +173,9 @@ func newProxyClient( func (p *ProxyClient) closeIdleConnections() { p.transport.CloseIdleConnections() } + +func (p *ProxyClient) closeAllConnections() { + for _, conn := range p.activeConnections.Items() { + conn.Close() + } +} diff --git a/packages/shared/pkg/proxy/pool/pool.go b/packages/shared/pkg/proxy/pool/pool.go index cbb042daf3..becb1d38c7 100644 --- a/packages/shared/pkg/proxy/pool/pool.go +++ b/packages/shared/pkg/proxy/pool/pool.go @@ -69,10 +69,13 @@ func (p *ProxyPool) Get(d *Destination) *ProxyClient { }) } -func (p *ProxyPool) Close(connectionKey string) { +func (p *ProxyPool) Close(connectionKey string, force bool) { p.pool.RemoveCb(connectionKey, func(_ string, proxy *ProxyClient, _ bool) bool { if proxy != nil { proxy.closeIdleConnections() + if force { + proxy.closeAllConnections() + } } return true diff --git a/packages/shared/pkg/proxy/proxy.go b/packages/shared/pkg/proxy/proxy.go index 9e585899a4..3e1349ec50 100644 --- a/packages/shared/pkg/proxy/proxy.go +++ b/packages/shared/pkg/proxy/proxy.go @@ -75,7 +75,7 @@ func (p *Proxy) CurrentPoolConnections() int64 { } func (p *Proxy) RemoveFromPool(connectionKey string) { - p.pool.Close(connectionKey) + p.pool.Close(connectionKey, true) } func (p *Proxy) ListenAndServe(ctx context.Context) error { diff --git a/packages/shared/pkg/proxy/tracking/connection.go b/packages/shared/pkg/proxy/tracking/connection.go index da88c453d1..b3ff3390d0 100644 --- a/packages/shared/pkg/proxy/tracking/connection.go +++ b/packages/shared/pkg/proxy/tracking/connection.go @@ -3,21 +3,36 @@ package tracking import ( "net" "sync/atomic" + + "github.com/e2b-dev/infra/packages/shared/pkg/smap" + + "github.com/google/uuid" ) type Connection struct { net.Conn counter *atomic.Int64 + key string + + m *smap.Map[*Connection] } -func NewConnection(conn net.Conn, counter *atomic.Int64) *Connection { +func NewConnection(conn net.Conn, counter *atomic.Int64, m *smap.Map[*Connection]) *Connection { counter.Add(1) - return &Connection{ + c := &Connection{ Conn: conn, counter: counter, + m: m, + key: uuid.New().String(), + } + + if m != nil { + m.Insert(c.key, c) } + + return c } func (c *Connection) Close() error { @@ -28,5 +43,9 @@ func (c *Connection) Close() error { c.counter.Add(-1) + if c.m != nil { + c.m.Remove(c.key) + } + return nil } diff --git a/packages/shared/pkg/proxy/tracking/listener.go b/packages/shared/pkg/proxy/tracking/listener.go index e792732f78..985c920672 100644 --- a/packages/shared/pkg/proxy/tracking/listener.go +++ b/packages/shared/pkg/proxy/tracking/listener.go @@ -24,5 +24,5 @@ func (l *Listener) Accept() (net.Conn, error) { return nil, err } - return NewConnection(conn, l.counter), nil + return NewConnection(conn, l.counter, nil), nil } From b631aa75f96659866b8352651633f211d8570e5d Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Tue, 28 Oct 2025 12:23:50 -0700 Subject: [PATCH 2/6] Improve error handling --- packages/orchestrator/internal/proxy/proxy.go | 4 ++-- .../orchestrator/internal/server/sandboxes.go | 6 +++++- packages/shared/pkg/proxy/pool/client.go | 12 +++++++++-- packages/shared/pkg/proxy/pool/pool.go | 8 +++---- packages/shared/pkg/proxy/proxy.go | 4 ++-- .../shared/pkg/proxy/tracking/connection.go | 21 ++++++++++++++++--- 6 files changed, 41 insertions(+), 14 deletions(-) diff --git a/packages/orchestrator/internal/proxy/proxy.go b/packages/orchestrator/internal/proxy/proxy.go index 2d3b5ac642..cfc82a4b67 100644 --- a/packages/orchestrator/internal/proxy/proxy.go +++ b/packages/orchestrator/internal/proxy/proxy.go @@ -121,8 +121,8 @@ func (p *SandboxProxy) Close(ctx context.Context) error { return nil } -func (p *SandboxProxy) RemoveFromPool(connectionKey string) { - p.proxy.RemoveFromPool(connectionKey) +func (p *SandboxProxy) RemoveFromPool(connectionKey string) error { + return p.proxy.RemoveFromPool(connectionKey) } func (p *SandboxProxy) GetAddr() string { diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index bfec76bc1b..b8aa5bb998 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -156,7 +156,11 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID) // Remove the proxies assigned to the sandbox from the pool to prevent them from being reused. - s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) + cleanupErr = s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) + if cleanupErr != nil { + // Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own. + sbxlogger.I(sbx).Warn("error during removing sandbox proxy pool", zap.Error(cleanupErr)) + } sbxlogger.E(sbx).Info("Sandbox killed") }() diff --git a/packages/shared/pkg/proxy/pool/client.go b/packages/shared/pkg/proxy/pool/client.go index 2abce51cad..9df9769570 100644 --- a/packages/shared/pkg/proxy/pool/client.go +++ b/packages/shared/pkg/proxy/pool/client.go @@ -2,6 +2,7 @@ package pool import ( "context" + "errors" "log" "net" "net/http" @@ -174,8 +175,15 @@ func (p *ProxyClient) closeIdleConnections() { p.transport.CloseIdleConnections() } -func (p *ProxyClient) closeAllConnections() { +func (p *ProxyClient) resetAllConnections() error { + var errs []error + for _, conn := range p.activeConnections.Items() { - conn.Close() + err := conn.Reset() + if err != nil { + errs = append(errs, err) + } } + + return errors.Join(errs...) } diff --git a/packages/shared/pkg/proxy/pool/pool.go b/packages/shared/pkg/proxy/pool/pool.go index becb1d38c7..56b15ef8b0 100644 --- a/packages/shared/pkg/proxy/pool/pool.go +++ b/packages/shared/pkg/proxy/pool/pool.go @@ -69,17 +69,17 @@ func (p *ProxyPool) Get(d *Destination) *ProxyClient { }) } -func (p *ProxyPool) Close(connectionKey string, force bool) { +func (p *ProxyPool) Close(connectionKey string) (err error) { p.pool.RemoveCb(connectionKey, func(_ string, proxy *ProxyClient, _ bool) bool { if proxy != nil { proxy.closeIdleConnections() - if force { - proxy.closeAllConnections() - } + err = proxy.resetAllConnections() } return true }) + + return err } func (p *ProxyPool) TotalConnections() uint64 { diff --git a/packages/shared/pkg/proxy/proxy.go b/packages/shared/pkg/proxy/proxy.go index 3e1349ec50..817757dced 100644 --- a/packages/shared/pkg/proxy/proxy.go +++ b/packages/shared/pkg/proxy/proxy.go @@ -74,8 +74,8 @@ func (p *Proxy) CurrentPoolConnections() int64 { return p.pool.CurrentConnections() } -func (p *Proxy) RemoveFromPool(connectionKey string) { - p.pool.Close(connectionKey, true) +func (p *Proxy) RemoveFromPool(connectionKey string) error { + return p.pool.Close(connectionKey) } func (p *Proxy) ListenAndServe(ctx context.Context) error { diff --git a/packages/shared/pkg/proxy/tracking/connection.go b/packages/shared/pkg/proxy/tracking/connection.go index b3ff3390d0..81ecb71108 100644 --- a/packages/shared/pkg/proxy/tracking/connection.go +++ b/packages/shared/pkg/proxy/tracking/connection.go @@ -4,9 +4,9 @@ import ( "net" "sync/atomic" - "github.com/e2b-dev/infra/packages/shared/pkg/smap" - "github.com/google/uuid" + + "github.com/e2b-dev/infra/packages/shared/pkg/smap" ) type Connection struct { @@ -25,16 +25,31 @@ func NewConnection(conn net.Conn, counter *atomic.Int64, m *smap.Map[*Connection Conn: conn, counter: counter, m: m, - key: uuid.New().String(), } if m != nil { + c.key = uuid.New().String() + m.Insert(c.key, c) } return c } +func (c *Connection) Reset() error { + err := c.Conn.(*net.TCPConn).SetLinger(0) + if err != nil { + return err + } + + err = c.Close() + if err != nil { + return err + } + + return nil +} + func (c *Connection) Close() error { err := c.Conn.Close() if err != nil { From 6ea9060ed8ad6c510e10f1867f0c0bc0d89e6473 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Thu, 30 Oct 2025 14:33:43 -0700 Subject: [PATCH 3/6] Improve warning logging when manual connection close fails --- packages/orchestrator/internal/server/sandboxes.go | 6 +++--- .../internal/template/build/layer/layer_executor.go | 7 ++++++- packages/shared/pkg/proxy/tracking/connection.go | 10 +++++++--- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index b8aa5bb998..2a5f50570f 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -156,10 +156,10 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID) // Remove the proxies assigned to the sandbox from the pool to prevent them from being reused. - cleanupErr = s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) - if cleanupErr != nil { + closeErr := s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) + if closeErr != nil { // Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own. - sbxlogger.I(sbx).Warn("error during removing sandbox proxy pool", zap.Error(cleanupErr)) + sbxlogger.I(sbx).Warn("errors when manually closing connections to sandbox", zap.Error(closeErr)) } sbxlogger.E(sbx).Info("Sandbox killed") diff --git a/packages/orchestrator/internal/template/build/layer/layer_executor.go b/packages/orchestrator/internal/template/build/layer/layer_executor.go index 13e57bc580..209177bbce 100644 --- a/packages/orchestrator/internal/template/build/layer/layer_executor.go +++ b/packages/orchestrator/internal/template/build/layer/layer_executor.go @@ -82,7 +82,12 @@ func (lb *LayerExecutor) BuildLayer( lb.sandboxes.Insert(sbx) defer func() { lb.sandboxes.Remove(sbx.Runtime.SandboxID) - lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) + + closeErr := lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID) + if closeErr != nil { + // Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own. + lb.logger.Warn("errors when manually closing connections to sandbox", zap.Error(closeErr)) + } }() // Update envd binary to the latest version diff --git a/packages/shared/pkg/proxy/tracking/connection.go b/packages/shared/pkg/proxy/tracking/connection.go index 81ecb71108..7d7f9d1b5c 100644 --- a/packages/shared/pkg/proxy/tracking/connection.go +++ b/packages/shared/pkg/proxy/tracking/connection.go @@ -1,6 +1,7 @@ package tracking import ( + "errors" "net" "sync/atomic" @@ -37,17 +38,20 @@ func NewConnection(conn net.Conn, counter *atomic.Int64, m *smap.Map[*Connection } func (c *Connection) Reset() error { + var errs []error + + // This forces the connection to close with RST. err := c.Conn.(*net.TCPConn).SetLinger(0) if err != nil { - return err + errs = append(errs, err) } err = c.Close() if err != nil { - return err + errs = append(errs, err) } - return nil + return errors.Join(errs...) } func (c *Connection) Close() error { From d16c3dd2d007dc6978a016d3bcbf9c5b2eb35922 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Thu, 30 Oct 2025 15:58:47 -0700 Subject: [PATCH 4/6] Add comments --- packages/shared/pkg/proxy/proxy.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/shared/pkg/proxy/proxy.go b/packages/shared/pkg/proxy/proxy.go index 817757dced..fbc122e85e 100644 --- a/packages/shared/pkg/proxy/proxy.go +++ b/packages/shared/pkg/proxy/proxy.go @@ -58,10 +58,12 @@ func New( } } +// TotalPoolConnections returns the total number of connections that have been established across whole pool. func (p *Proxy) TotalPoolConnections() uint64 { return p.pool.TotalConnections() } +// CurrentServerConnections returns the current number of connections that are alive across whole pool. func (p *Proxy) CurrentServerConnections() int64 { return p.currentServerConnsCounter.Load() } From 7379d24684b2ac69ac37d14b33f36fb5e8b9e39b Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Thu, 30 Oct 2025 18:53:57 -0700 Subject: [PATCH 5/6] Add proxy close tests --- packages/shared/pkg/proxy/proxy_test.go | 201 +++++++++++++++++++++++- 1 file changed, 199 insertions(+), 2 deletions(-) diff --git a/packages/shared/pkg/proxy/proxy_test.go b/packages/shared/pkg/proxy/proxy_test.go index 6baa17d866..c14406a8dc 100644 --- a/packages/shared/pkg/proxy/proxy_test.go +++ b/packages/shared/pkg/proxy/proxy_test.go @@ -34,6 +34,8 @@ func (b *testBackend) RequestCount() uint64 { return b.requestCount.Load() } +const bodyWriteDelayHeader = "body-write-delay" + // newTestBackend creates a new test backend server func newTestBackend(listener net.Listener, id string) (*testBackend, error) { var requestCount atomic.Uint64 @@ -42,7 +44,7 @@ func newTestBackend(listener net.Listener, id string) (*testBackend, error) { backend := &testBackend{ server: &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { select { case <-ctx.Done(): w.WriteHeader(http.StatusBadGateway) @@ -54,6 +56,21 @@ func newTestBackend(listener net.Listener, id string) (*testBackend, error) { requestCount.Add(1) w.WriteHeader(http.StatusOK) + + // Flush the headers, so we can read the headers and body separately after .Do() returns. + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + // Check for "body-write-delay" header (interpreted as seconds) + delayHeader := r.Header.Get(bodyWriteDelayHeader) + + if delayHeader != "" { + if n, err := time.ParseDuration(delayHeader); err == nil { + time.Sleep(n) + } + } + w.Write([]byte(id)) }), }, @@ -70,7 +87,6 @@ func newTestBackend(listener net.Listener, id string) (*testBackend, error) { backendURL, err := url.Parse(fmt.Sprintf("http://%s", listener.Addr().String())) if err != nil { listener.Close() - return nil, fmt.Errorf("failed to parse backend URL: %w", err) } backend.url = backendURL @@ -101,11 +117,22 @@ func assertBackendOutput(t *testing.T, backend *testBackend, resp *http.Response t.Helper() assert.Equal(t, resp.StatusCode, http.StatusOK, "status code should be 200") + body, err := io.ReadAll(resp.Body) require.NoError(t, err) + assert.Equal(t, string(body), backend.id, "backend id should be the same") } +func assertStreamError(t *testing.T, resp *http.Response) { + t.Helper() + + assert.Equal(t, resp.StatusCode, http.StatusOK, "status code should be 200") + + _, err := io.ReadAll(resp.Body) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) +} + // newTestProxy creates a new proxy server for testing func newTestProxy(t *testing.T, getDestination func(r *http.Request) (*pool.Destination, error)) (*Proxy, uint, error) { t.Helper() @@ -175,11 +202,29 @@ func TestProxyRoutesToTargetServer(t *testing.T) { func httpGet(t *testing.T, proxyURL string) (*http.Response, error) { t.Helper() + return httpGetWithHeaders(t, proxyURL, nil) +} + +func httpGetWithBodyWriteDelay(t *testing.T, proxyURL string, bodyWriteDelay time.Duration) (*http.Response, error) { + t.Helper() + + return httpGetWithHeaders(t, proxyURL, http.Header{bodyWriteDelayHeader: {bodyWriteDelay.String()}}) +} + +func httpGetWithHeaders(t *testing.T, proxyURL string, headers http.Header) (*http.Response, error) { + t.Helper() + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, proxyURL, nil) if err != nil { return nil, err } + for key, values := range headers { + for _, value := range values { + req.Header.Add(key, value) + } + } + rsp, err := (&http.Client{}).Do(req) if err != nil { return nil, err @@ -188,6 +233,50 @@ func httpGet(t *testing.T, proxyURL string) (*http.Response, error) { return rsp, nil } +type instrumentedConn struct { + net.Conn + listener *instrumentedListener +} + +func (c *instrumentedConn) Read(b []byte) (int, error) { + n, err := c.Conn.Read(b) + if err != nil { + c.listener.AddReadError(err) + } + + return n, err +} + +func (l *instrumentedListener) AddReadError(err error) { + l.readErrsMutex.Lock() + defer l.readErrsMutex.Unlock() + + l.readErrs = append(l.readErrs, err) +} + +func (l *instrumentedListener) ReadErrors() []error { + l.readErrsMutex.Lock() + defer l.readErrsMutex.Unlock() + + return l.readErrs +} + +type instrumentedListener struct { + net.Listener + + readErrs []error + readErrsMutex sync.Mutex +} + +func (l *instrumentedListener) Accept() (net.Conn, error) { + conn, err := l.Listener.Accept() + if err != nil { + return nil, err + } + + return &instrumentedConn{Conn: conn, listener: l}, nil +} + func TestProxyReusesConnections(t *testing.T) { var lisCfg net.ListenConfig listener, err := lisCfg.Listen(t.Context(), "tcp", "127.0.0.1:0") @@ -233,6 +322,114 @@ func TestProxyReusesConnections(t *testing.T) { assert.Equal(t, proxy.TotalPoolConnections(), uint64(1), "proxy should have used one connection") } +func TestProxyCloseIdleConnectionsFromPool(t *testing.T) { + var lisCfg net.ListenConfig + listener, err := lisCfg.Listen(t.Context(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + + backend, err := newTestBackend(listener, "backend-1") + require.NoError(t, err) + defer backend.Close() + + getDestination := func(*http.Request) (*pool.Destination, error) { + return &pool.Destination{ + Url: backend.url, + SandboxId: "test-sandbox", + RequestLogger: zap.NewNop(), + ConnectionKey: backend.id, + }, nil + } + + proxy, port, err := newTestProxy(t, getDestination) + require.NoError(t, err) + defer proxy.Close() + + // Make a request to the proxy + proxyURL := fmt.Sprintf("http://127.0.0.1:%d/hello", port) + resp, err := httpGet(t, proxyURL) + require.NoError(t, err) + defer resp.Body.Close() + + assertBackendOutput(t, backend, resp) + + assert.Equal(t, proxy.TotalPoolConnections(), uint64(1), "proxy should have established one connection") + assert.Equal(t, proxy.CurrentPoolConnections(), int64(1), "proxy should have established one connection that is still alive") + assert.Equal(t, backend.RequestCount(), uint64(1), "backend should have been called once") + + // Remove the connection from the pool + err = proxy.RemoveFromPool(backend.id) + require.NoError(t, err) + + assert.Equal(t, proxy.TotalPoolConnections(), uint64(1), "proxy should have still one connection in the pool") + assert.Equal(t, proxy.CurrentPoolConnections(), int64(0), "proxy should have removed the connection from the pool that is still alive") +} + +func TestProxyResetAliveConnectionsFromPool(t *testing.T) { + var lisCfg net.ListenConfig + + listener, err := lisCfg.Listen(t.Context(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + + instrumentedListener := &instrumentedListener{Listener: listener} + + backend, err := newTestBackend(instrumentedListener, "backend-1") + require.NoError(t, err) + defer backend.Close() + + getDestination := func(*http.Request) (*pool.Destination, error) { + return &pool.Destination{ + Url: backend.url, + SandboxId: "test-sandbox", + RequestLogger: zap.NewNop(), + ConnectionKey: backend.id, + }, nil + } + + proxy, port, err := newTestProxy(t, getDestination) + require.NoError(t, err) + defer proxy.Close() + + requestEnded := make(chan struct{}, 1) + + go func() { + defer close(requestEnded) + + // Make a request to the proxy + proxyURL := fmt.Sprintf("http://127.0.0.1:%d/hello", port) + resp, err := httpGetWithBodyWriteDelay(t, proxyURL, 10*time.Second) + require.NoError(t, err) + defer resp.Body.Close() + + assertStreamError(t, resp) + }() + + // Wait for the request to start being processed by the backend + time.Sleep(1 * time.Second) + + assert.Equal(t, proxy.TotalPoolConnections(), uint64(1), "proxy should have established one connection") + assert.Equal(t, proxy.CurrentPoolConnections(), int64(1), "proxy should have established one connection that is still alive") + assert.Equal(t, backend.RequestCount(), uint64(1), "backend should have been called once") + + // Remove the connection from the pool + err = proxy.RemoveFromPool(backend.id) + require.NoError(t, err) + + assert.Equal(t, proxy.TotalPoolConnections(), uint64(1), "proxy should have still one connection in the pool") + assert.Equal(t, proxy.CurrentPoolConnections(), int64(0), "proxy should have removed the connection from the pool that is still alive") + + select { + case <-requestEnded: + case <-t.Context().Done(): + t.Fatalf("request timed out: %v", t.Context().Err()) + } + + require.Equal(t, len(instrumentedListener.ReadErrors()), 1, "server connection should have one read error") + // io.EOF is returned for the FIN packet. + require.NotErrorIs(t, instrumentedListener.ReadErrors()[0], io.EOF, "server connection should have read error other than EOF") + + require.ErrorContains(t, instrumentedListener.ReadErrors()[0], "connection reset by peer") +} + // This is a test that verify that the proxy reuse fails when the backend changes. func TestProxyReuseConnectionsWhenBackendChangesFails(t *testing.T) { // Create first backend From ae4f5897aa5e82d9534b592dfad6abbf3d52b74d Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Thu, 30 Oct 2025 18:57:08 -0700 Subject: [PATCH 6/6] Fix lint --- packages/shared/pkg/proxy/proxy_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/shared/pkg/proxy/proxy_test.go b/packages/shared/pkg/proxy/proxy_test.go index c14406a8dc..9da5a839c1 100644 --- a/packages/shared/pkg/proxy/proxy_test.go +++ b/packages/shared/pkg/proxy/proxy_test.go @@ -87,6 +87,7 @@ func newTestBackend(listener net.Listener, id string) (*testBackend, error) { backendURL, err := url.Parse(fmt.Sprintf("http://%s", listener.Addr().String())) if err != nil { listener.Close() + return nil, fmt.Errorf("failed to parse backend URL: %w", err) } backend.url = backendURL @@ -130,7 +131,7 @@ func assertStreamError(t *testing.T, resp *http.Response) { assert.Equal(t, resp.StatusCode, http.StatusOK, "status code should be 200") _, err := io.ReadAll(resp.Body) - require.ErrorIs(t, err, io.ErrUnexpectedEOF) + assert.ErrorType(t, err, io.ErrUnexpectedEOF) } // newTestProxy creates a new proxy server for testing @@ -235,6 +236,7 @@ func httpGetWithHeaders(t *testing.T, proxyURL string, headers http.Header) (*ht type instrumentedConn struct { net.Conn + listener *instrumentedListener } @@ -397,7 +399,7 @@ func TestProxyResetAliveConnectionsFromPool(t *testing.T) { // Make a request to the proxy proxyURL := fmt.Sprintf("http://127.0.0.1:%d/hello", port) resp, err := httpGetWithBodyWriteDelay(t, proxyURL, 10*time.Second) - require.NoError(t, err) + assert.NilError(t, err) defer resp.Body.Close() assertStreamError(t, resp) @@ -423,7 +425,7 @@ func TestProxyResetAliveConnectionsFromPool(t *testing.T) { t.Fatalf("request timed out: %v", t.Context().Err()) } - require.Equal(t, len(instrumentedListener.ReadErrors()), 1, "server connection should have one read error") + require.Len(t, instrumentedListener.ReadErrors(), 1, "server connection should have one read error") // io.EOF is returned for the FIN packet. require.NotErrorIs(t, instrumentedListener.ReadErrors()[0], io.EOF, "server connection should have read error other than EOF")