Skip to content
4 changes: 2 additions & 2 deletions packages/orchestrator/internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (p *SandboxProxy) Close(ctx context.Context) error {
return nil
}

func (p *SandboxProxy) RemoveFromPool(connectionKey string) {
p.proxy.RemoveFromPool(connectionKey)
func (p *SandboxProxy) RemoveFromPool(connectionKey string) error {
return p.proxy.RemoveFromPool(connectionKey)
}

func (p *SandboxProxy) GetAddr() string {
Expand Down
6 changes: 5 additions & 1 deletion packages/orchestrator/internal/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID)

// Remove the proxies assigned to the sandbox from the pool to prevent them from being reused.
s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
closeErr := s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
if closeErr != nil {
// Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own.
sbxlogger.I(sbx).Warn("errors when manually closing connections to sandbox", zap.Error(closeErr))
}

sbxlogger.E(sbx).Info("Sandbox killed")
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ func (lb *LayerExecutor) BuildLayer(
lb.sandboxes.Insert(sbx)
defer func() {
lb.sandboxes.Remove(sbx.Runtime.SandboxID)
lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)

closeErr := lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
if closeErr != nil {
// Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own.
lb.logger.Warn("errors when manually closing connections to sandbox", zap.Error(closeErr))
Comment thread
ValentaTomas marked this conversation as resolved.
}
}()

// Update envd binary to the latest version
Expand Down
24 changes: 22 additions & 2 deletions packages/shared/pkg/proxy/pool/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"errors"
"log"
"net"
"net/http"
Expand All @@ -13,12 +14,15 @@ import (

"github.com/e2b-dev/infra/packages/shared/pkg/proxy/template"
"github.com/e2b-dev/infra/packages/shared/pkg/proxy/tracking"
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
)

type ProxyClient struct {
httputil.ReverseProxy

transport *http.Transport

activeConnections *smap.Map[*tracking.Connection]
}

func newProxyClient(
Expand All @@ -30,6 +34,8 @@ func newProxyClient(
currentConnsCounter *atomic.Int64,
logger *log.Logger,
) *ProxyClient {
activeConnections := smap.New[*tracking.Connection]()

transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
// Limit the max connection per host to avoid exhausting the number of available ports to one host.
Expand All @@ -56,7 +62,7 @@ func newProxyClient(
if err == nil {
totalConnsCounter.Add(1)

return tracking.NewConnection(conn, currentConnsCounter), nil
return tracking.NewConnection(conn, currentConnsCounter, activeConnections), nil
}

if ctx.Err() != nil {
Expand All @@ -82,7 +88,8 @@ func newProxyClient(
}

return &ProxyClient{
transport: transport,
transport: transport,
activeConnections: activeConnections,
ReverseProxy: httputil.ReverseProxy{
Transport: transport,
Rewrite: func(r *httputil.ProxyRequest) {
Expand Down Expand Up @@ -167,3 +174,16 @@ func newProxyClient(
func (p *ProxyClient) closeIdleConnections() {
p.transport.CloseIdleConnections()
}

func (p *ProxyClient) resetAllConnections() error {
var errs []error

for _, conn := range p.activeConnections.Items() {
err := conn.Reset()
if err != nil {
errs = append(errs, err)
}
}

return errors.Join(errs...)
}
5 changes: 4 additions & 1 deletion packages/shared/pkg/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ func (p *ProxyPool) Get(d *Destination) *ProxyClient {
})
}

func (p *ProxyPool) Close(connectionKey string) {
func (p *ProxyPool) Close(connectionKey string) (err error) {
p.pool.RemoveCb(connectionKey, func(_ string, proxy *ProxyClient, _ bool) bool {
if proxy != nil {
proxy.closeIdleConnections()
err = proxy.resetAllConnections()
}

return true
})

return err
}

func (p *ProxyPool) TotalConnections() uint64 {
Expand Down
6 changes: 4 additions & 2 deletions packages/shared/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ func New(
}
}

// TotalPoolConnections returns the total number of connections that have been established across whole pool.
func (p *Proxy) TotalPoolConnections() uint64 {
return p.pool.TotalConnections()
}

// CurrentServerConnections returns the current number of connections that are alive across whole pool.
func (p *Proxy) CurrentServerConnections() int64 {
return p.currentServerConnsCounter.Load()
}
Expand All @@ -74,8 +76,8 @@ func (p *Proxy) CurrentPoolConnections() int64 {
return p.pool.CurrentConnections()
}

func (p *Proxy) RemoveFromPool(connectionKey string) {
p.pool.Close(connectionKey)
func (p *Proxy) RemoveFromPool(connectionKey string) error {
return p.pool.Close(connectionKey)
}

func (p *Proxy) ListenAndServe(ctx context.Context) error {
Expand Down
Loading
Loading