@@ -5,11 +5,12 @@ package server
55import (
66 "context"
77 "fmt"
8+ "sync"
89 "time"
910
1011 "github.com/jellydator/ttlcache/v3"
1112 "go.opentelemetry.io/otel/metric"
12- "golang. org/x/sync/semaphore "
13+ "go.uber. org/zap "
1314
1415 "github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg"
1516 "github.com/e2b-dev/infra/packages/orchestrator/pkg/events"
@@ -22,14 +23,20 @@ import (
2223 "github.com/e2b-dev/infra/packages/orchestrator/pkg/service"
2324 "github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
2425 "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
26+ "github.com/e2b-dev/infra/packages/shared/pkg/logger"
2527 "github.com/e2b-dev/infra/packages/shared/pkg/storage"
2628 "github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
29+ "github.com/e2b-dev/infra/packages/shared/pkg/utils"
2730)
2831
2932// Matches the template cache TTL so entries live as long as the
3033// templates they refer to and are cleaned up automatically.
3134const uploadedBuildsTTL = 1 * time .Hour
3235
36+ // startingSandboxesLimitRefreshInterval is how often we re-read the
37+ // MaxStartingInstancesPerNode feature flag and resize the semaphore.
38+ const startingSandboxesLimitRefreshInterval = 30 * time .Second
39+
3340type Server struct {
3441 orchestrator.UnimplementedSandboxServiceServer
3542 orchestrator.UnimplementedChunkServiceServer
@@ -44,11 +51,14 @@ type Server struct {
4451 persistence storage.StorageProvider
4552 featureFlags * featureflags.Client
4653 sbxEventsService * events.EventsService
47- startingSandboxes * semaphore. Weighted
54+ startingSandboxes * utils. AdjustableSemaphore
4855 peerRegistry peerclient.Registry
4956 uploadedBuilds * ttlcache.Cache [string , struct {}]
5057 uploads * sandbox.Uploads
5158 sandboxCreateDuration metric.Int64Histogram
59+
60+ done chan struct {}
61+ closeOnce sync.Once
5262}
5363
5464type ServiceConfig struct {
@@ -67,12 +77,18 @@ type ServiceConfig struct {
6777 Uploads * sandbox.Uploads
6878}
6979
70- func New (cfg ServiceConfig ) (* Server , error ) {
80+ func New (ctx context. Context , cfg ServiceConfig ) (* Server , error ) {
7181 uploadedBuilds := ttlcache.New [string , struct {}](
7282 ttlcache.WithTTL [string , struct {}](uploadedBuildsTTL ),
7383 )
7484 go uploadedBuilds .Start ()
7585
86+ startingLimit := cfg .FeatureFlags .IntFlag (ctx , featureflags .MaxStartingInstancesPerNode )
87+ startingSandboxes , err := utils .NewAdjustableSemaphore (int64 (startingLimit ))
88+ if err != nil {
89+ return nil , fmt .Errorf ("failed to create starting sandboxes semaphore: %w" , err )
90+ }
91+
7692 server := & Server {
7793 config : cfg .Config ,
7894 sandboxFactory : cfg .SandboxFactory ,
@@ -84,10 +100,11 @@ func New(cfg ServiceConfig) (*Server, error) {
84100 persistence : cfg .Persistence ,
85101 featureFlags : cfg .FeatureFlags ,
86102 sbxEventsService : cfg .SbxEventsService ,
87- startingSandboxes : semaphore . NewWeighted ( maxStartingInstancesPerNode ) ,
103+ startingSandboxes : startingSandboxes ,
88104 peerRegistry : cfg .PeerRegistry ,
89105 uploadedBuilds : uploadedBuilds ,
90106 uploads : cfg .Uploads ,
107+ done : make (chan struct {}),
91108 }
92109
93110 meter := cfg .Tel .MeterProvider .Meter ("github.com/e2b-dev/infra/packages/orchestrator/pkg/server" )
@@ -107,11 +124,39 @@ func New(cfg ServiceConfig) (*Server, error) {
107124 return nil , fmt .Errorf ("failed to register sandbox count metric: %w" , err )
108125 }
109126
127+ go server .refreshStartingSandboxesLimit (ctx )
128+
110129 return server , nil
111130}
112131
113132func (s * Server ) Close () error {
133+ s .closeOnce .Do (func () {
134+ close (s .done )
135+ })
136+
114137 s .uploadedBuilds .Stop ()
115138
116139 return nil
117140}
141+
142+ func (s * Server ) refreshStartingSandboxesLimit (ctx context.Context ) {
143+ ticker := time .NewTicker (startingSandboxesLimitRefreshInterval )
144+ defer ticker .Stop ()
145+
146+ for {
147+ select {
148+ case <- s .done :
149+ return
150+ case <- ticker .C :
151+ limit := s .featureFlags .IntFlag (ctx , featureflags .MaxStartingInstancesPerNode )
152+ if limit <= 0 {
153+ continue
154+ }
155+
156+ if err := s .startingSandboxes .SetLimit (int64 (limit )); err != nil {
157+ logger .L ().Error (ctx , "failed to adjust starting sandboxes semaphore" ,
158+ zap .Int ("limit" , limit ), zap .Error (err ))
159+ }
160+ }
161+ }
162+ }
0 commit comments