Skip to content

Commit 8d24e90

Browse files
authored
RST sandbox network connections on sandbox close (#1414)
1 parent 249e984 commit 8d24e90

9 files changed

Lines changed: 284 additions & 13 deletions

File tree

packages/orchestrator/internal/proxy/proxy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ func (p *SandboxProxy) Close(ctx context.Context) error {
127127
return nil
128128
}
129129

130-
func (p *SandboxProxy) RemoveFromPool(connectionKey string) {
131-
p.proxy.RemoveFromPool(connectionKey)
130+
func (p *SandboxProxy) RemoveFromPool(connectionKey string) error {
131+
return p.proxy.RemoveFromPool(connectionKey)
132132
}
133133

134134
func (p *SandboxProxy) GetAddr() string {

packages/orchestrator/internal/server/sandboxes.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
156156
s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID)
157157

158158
// Remove the proxies assigned to the sandbox from the pool to prevent them from being reused.
159-
s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
159+
closeErr := s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
160+
if closeErr != nil {
161+
// Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own.
162+
sbxlogger.I(sbx).Warn("errors when manually closing connections to sandbox", zap.Error(closeErr))
163+
}
160164

161165
sbxlogger.E(sbx).Info("Sandbox killed")
162166
}()

packages/orchestrator/internal/template/build/layer/layer_executor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ func (lb *LayerExecutor) BuildLayer(
8282
lb.sandboxes.Insert(sbx)
8383
defer func() {
8484
lb.sandboxes.Remove(sbx.Runtime.SandboxID)
85-
lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
85+
86+
closeErr := lb.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)
87+
if closeErr != nil {
88+
// Errors here will be from forcefully closing the connections, so we can ignore them—they will at worst timeout on their own.
89+
lb.logger.Warn("errors when manually closing connections to sandbox", zap.Error(closeErr))
90+
}
8691
}()
8792

8893
// Update envd binary to the latest version

packages/shared/pkg/proxy/pool/client.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"errors"
56
"log"
67
"net"
78
"net/http"
@@ -13,12 +14,15 @@ import (
1314

1415
"github.com/e2b-dev/infra/packages/shared/pkg/proxy/template"
1516
"github.com/e2b-dev/infra/packages/shared/pkg/proxy/tracking"
17+
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
1618
)
1719

1820
type ProxyClient struct {
1921
httputil.ReverseProxy
2022

2123
transport *http.Transport
24+
25+
activeConnections *smap.Map[*tracking.Connection]
2226
}
2327

2428
func newProxyClient(
@@ -30,6 +34,8 @@ func newProxyClient(
3034
currentConnsCounter *atomic.Int64,
3135
logger *log.Logger,
3236
) *ProxyClient {
37+
activeConnections := smap.New[*tracking.Connection]()
38+
3339
transport := &http.Transport{
3440
Proxy: http.ProxyFromEnvironment,
3541
// Limit the max connection per host to avoid exhausting the number of available ports to one host.
@@ -56,7 +62,7 @@ func newProxyClient(
5662
if err == nil {
5763
totalConnsCounter.Add(1)
5864

59-
return tracking.NewConnection(conn, currentConnsCounter), nil
65+
return tracking.NewConnection(conn, currentConnsCounter, activeConnections), nil
6066
}
6167

6268
if ctx.Err() != nil {
@@ -82,7 +88,8 @@ func newProxyClient(
8288
}
8389

8490
return &ProxyClient{
85-
transport: transport,
91+
transport: transport,
92+
activeConnections: activeConnections,
8693
ReverseProxy: httputil.ReverseProxy{
8794
Transport: transport,
8895
Rewrite: func(r *httputil.ProxyRequest) {
@@ -167,3 +174,16 @@ func newProxyClient(
167174
func (p *ProxyClient) closeIdleConnections() {
168175
p.transport.CloseIdleConnections()
169176
}
177+
178+
func (p *ProxyClient) resetAllConnections() error {
179+
var errs []error
180+
181+
for _, conn := range p.activeConnections.Items() {
182+
err := conn.Reset()
183+
if err != nil {
184+
errs = append(errs, err)
185+
}
186+
}
187+
188+
return errors.Join(errs...)
189+
}

packages/shared/pkg/proxy/pool/pool.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ func (p *ProxyPool) Get(d *Destination) *ProxyClient {
6969
})
7070
}
7171

72-
func (p *ProxyPool) Close(connectionKey string) {
72+
func (p *ProxyPool) Close(connectionKey string) (err error) {
7373
p.pool.RemoveCb(connectionKey, func(_ string, proxy *ProxyClient, _ bool) bool {
7474
if proxy != nil {
7575
proxy.closeIdleConnections()
76+
err = proxy.resetAllConnections()
7677
}
7778

7879
return true
7980
})
81+
82+
return err
8083
}
8184

8285
func (p *ProxyPool) TotalConnections() uint64 {

packages/shared/pkg/proxy/proxy.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ func New(
5858
}
5959
}
6060

61+
// TotalPoolConnections returns the total number of connections that have been established across whole pool.
6162
func (p *Proxy) TotalPoolConnections() uint64 {
6263
return p.pool.TotalConnections()
6364
}
6465

66+
// CurrentServerConnections returns the current number of connections that are alive across whole pool.
6567
func (p *Proxy) CurrentServerConnections() int64 {
6668
return p.currentServerConnsCounter.Load()
6769
}
@@ -74,8 +76,8 @@ func (p *Proxy) CurrentPoolConnections() int64 {
7476
return p.pool.CurrentConnections()
7577
}
7678

77-
func (p *Proxy) RemoveFromPool(connectionKey string) {
78-
p.pool.Close(connectionKey)
79+
func (p *Proxy) RemoveFromPool(connectionKey string) error {
80+
return p.pool.Close(connectionKey)
7981
}
8082

8183
func (p *Proxy) ListenAndServe(ctx context.Context) error {

0 commit comments

Comments
 (0)