From 276ca2f7cc71bf8f9179c9f9842a2503c3497703 Mon Sep 17 00:00:00 2001 From: Jakub Novak Date: Mon, 11 May 2026 13:36:19 +0000 Subject: [PATCH 1/4] refactor(orchestrator): make startingSandboxes limit configurable via feature flag Replace the hardcoded maxStartingInstancesPerNode=3 weighted semaphore with an AdjustableSemaphore driven by the MaxStartingInstancesPerNode feature flag, so per-node start/resume concurrency can be tuned at runtime without a redeploy. A background refresher resizes the semaphore every 30s. --- packages/orchestrator/pkg/factories/run.go | 2 +- packages/orchestrator/pkg/server/main.go | 53 +++++++++++++++++-- packages/orchestrator/pkg/server/sandboxes.go | 3 +- packages/shared/pkg/featureflags/flags.go | 4 ++ 4 files changed, 55 insertions(+), 7 deletions(-) diff --git a/packages/orchestrator/pkg/factories/run.go b/packages/orchestrator/pkg/factories/run.go index c418e25bfd..0a963000df 100644 --- a/packages/orchestrator/pkg/factories/run.go +++ b/packages/orchestrator/pkg/factories/run.go @@ -557,7 +557,7 @@ func run(config cfg.Config, opts Options) (success bool) { return nil }}) - orchestratorService, err := server.New(server.ServiceConfig{ + orchestratorService, err := server.New(ctx, server.ServiceConfig{ Config: config, SandboxFactory: sandboxFactory, Tel: tel, diff --git a/packages/orchestrator/pkg/server/main.go b/packages/orchestrator/pkg/server/main.go index fb53dee8ce..c4729b7c4d 100644 --- a/packages/orchestrator/pkg/server/main.go +++ b/packages/orchestrator/pkg/server/main.go @@ -5,11 +5,12 @@ package server import ( "context" "fmt" + "sync" "time" "github.com/jellydator/ttlcache/v3" "go.opentelemetry.io/otel/metric" - "golang.org/x/sync/semaphore" + "go.uber.org/zap" "github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg" "github.com/e2b-dev/infra/packages/orchestrator/pkg/events" @@ -22,14 +23,20 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/pkg/service" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) // Matches the template cache TTL so entries live as long as the // templates they refer to and are cleaned up automatically. const uploadedBuildsTTL = 1 * time.Hour +// startingSandboxesLimitRefreshInterval is how often we re-read the +// MaxStartingInstancesPerNode feature flag and resize the semaphore. +const startingSandboxesLimitRefreshInterval = 30 * time.Second + type Server struct { orchestrator.UnimplementedSandboxServiceServer orchestrator.UnimplementedChunkServiceServer @@ -44,11 +51,14 @@ type Server struct { persistence storage.StorageProvider featureFlags *featureflags.Client sbxEventsService *events.EventsService - startingSandboxes *semaphore.Weighted + startingSandboxes *utils.AdjustableSemaphore peerRegistry peerclient.Registry uploadedBuilds *ttlcache.Cache[string, struct{}] uploads *sandbox.Uploads sandboxCreateDuration metric.Int64Histogram + + done chan struct{} + closeOnce sync.Once } type ServiceConfig struct { @@ -67,12 +77,18 @@ type ServiceConfig struct { Uploads *sandbox.Uploads } -func New(cfg ServiceConfig) (*Server, error) { +func New(ctx context.Context, cfg ServiceConfig) (*Server, error) { uploadedBuilds := ttlcache.New[string, struct{}]( ttlcache.WithTTL[string, struct{}](uploadedBuildsTTL), ) go uploadedBuilds.Start() + startingLimit := cfg.FeatureFlags.IntFlag(ctx, featureflags.MaxStartingInstancesPerNode) + startingSandboxes, err := utils.NewAdjustableSemaphore(int64(startingLimit)) + if err != nil { + return nil, fmt.Errorf("failed to create starting sandboxes semaphore: %w", err) + } + server := &Server{ config: cfg.Config, sandboxFactory: cfg.SandboxFactory, @@ -84,12 +100,15 @@ func New(cfg ServiceConfig) (*Server, error) { persistence: cfg.Persistence, featureFlags: cfg.FeatureFlags, sbxEventsService: cfg.SbxEventsService, - startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode), + startingSandboxes: startingSandboxes, peerRegistry: cfg.PeerRegistry, uploadedBuilds: uploadedBuilds, uploads: cfg.Uploads, + done: make(chan struct{}), } + go server.refreshStartingSandboxesLimit(ctx) + meter := cfg.Tel.MeterProvider.Meter("github.com/e2b-dev/infra/packages/orchestrator/pkg/server") sandboxCreateDuration, err := telemetry.GetHistogram(meter, telemetry.OrchestratorSandboxCreateDurationName) @@ -111,7 +130,33 @@ func New(cfg ServiceConfig) (*Server, error) { } func (s *Server) Close() error { + s.closeOnce.Do(func() { + close(s.done) + }) + s.uploadedBuilds.Stop() return nil } + +func (s *Server) refreshStartingSandboxesLimit(ctx context.Context) { + ticker := time.NewTicker(startingSandboxesLimitRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-s.done: + return + case <-ticker.C: + limit := s.featureFlags.IntFlag(ctx, featureflags.MaxStartingInstancesPerNode) + if limit <= 0 { + continue + } + + if err := s.startingSandboxes.SetLimit(int64(limit)); err != nil { + logger.L().Error(ctx, "failed to adjust starting sandboxes semaphore", + zap.Int("limit", limit), zap.Error(err)) + } + } + } +} diff --git a/packages/orchestrator/pkg/server/sandboxes.go b/packages/orchestrator/pkg/server/sandboxes.go index 60966a5f37..b2216e0d19 100644 --- a/packages/orchestrator/pkg/server/sandboxes.go +++ b/packages/orchestrator/pkg/server/sandboxes.go @@ -44,8 +44,7 @@ var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/pkg/ser const ( requestTimeout = 60 * time.Second // acquireTimeout is the max time to wait for a semaphore for resuming sandboxes snapshot. - acquireTimeout = 15 * time.Second - maxStartingInstancesPerNode = 3 + acquireTimeout = 15 * time.Second // uploadTimeout is the max time allowed for uploading snapshot files to // remote storage. diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index fc05423926..6d75252a05 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -202,6 +202,10 @@ var ( // Reserved blocks are only usable by root (uid 0), protecting the guest OS from disk-full conditions. BuildReservedDiskSpaceMB = NewIntFlag("build-reserved-disk-space-mb", 0) + // MaxStartingInstancesPerNode limits concurrent sandbox start/resume operations on a single orchestrator node. + // Must be > 0. + MaxStartingInstancesPerNode = NewIntFlag("max-starting-instances-per-node", 3) + // MaxConcurrentSnapshotUpserts limits concurrent UpsertSnapshot calls (pause + snapshot template paths). // 0 or negative disables throttling (unlimited concurrency). MaxConcurrentSnapshotUpserts = NewIntFlag("max-concurrent-snapshot-upserts", 0) From a12d9c750c399ff590730fb27d222790721d43e8 Mon Sep 17 00:00:00 2001 From: Jakub Novak Date: Mon, 11 May 2026 13:59:44 +0000 Subject: [PATCH 2/4] fix: move goroutine after initiazation --- packages/orchestrator/pkg/server/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/orchestrator/pkg/server/main.go b/packages/orchestrator/pkg/server/main.go index c4729b7c4d..db0a872908 100644 --- a/packages/orchestrator/pkg/server/main.go +++ b/packages/orchestrator/pkg/server/main.go @@ -107,8 +107,6 @@ func New(ctx context.Context, cfg ServiceConfig) (*Server, error) { done: make(chan struct{}), } - go server.refreshStartingSandboxesLimit(ctx) - meter := cfg.Tel.MeterProvider.Meter("github.com/e2b-dev/infra/packages/orchestrator/pkg/server") sandboxCreateDuration, err := telemetry.GetHistogram(meter, telemetry.OrchestratorSandboxCreateDurationName) @@ -126,6 +124,8 @@ func New(ctx context.Context, cfg ServiceConfig) (*Server, error) { return nil, fmt.Errorf("failed to register sandbox count metric: %w", err) } + go server.refreshStartingSandboxesLimit(ctx, int64(startingLimit)) + return server, nil } From 26ebd0359f30900878aa10ca4e7957f7fce70e37 Mon Sep 17 00:00:00 2001 From: Jakub Novak Date: Mon, 11 May 2026 14:00:17 +0000 Subject: [PATCH 3/4] chore: don't broadcast if no change --- packages/shared/pkg/utils/resizable_semaphore.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/shared/pkg/utils/resizable_semaphore.go b/packages/shared/pkg/utils/resizable_semaphore.go index f37e01d23e..db1542335d 100644 --- a/packages/shared/pkg/utils/resizable_semaphore.go +++ b/packages/shared/pkg/utils/resizable_semaphore.go @@ -82,6 +82,10 @@ func (s *AdjustableSemaphore) SetLimit(limit int64) error { s.mu.Lock() defer s.mu.Unlock() + if limit == s.limit { + return nil // no change + } + s.limit = limit s.cond.Broadcast() From 7fcede7bba36ae418703d9502ba7d1fe90060ad4 Mon Sep 17 00:00:00 2001 From: Jakub Novak Date: Mon, 11 May 2026 14:09:16 +0000 Subject: [PATCH 4/4] fix: remove an extra argument in server.refreshStartingSandboxesLimit --- packages/orchestrator/pkg/server/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/orchestrator/pkg/server/main.go b/packages/orchestrator/pkg/server/main.go index db0a872908..9f00a5405e 100644 --- a/packages/orchestrator/pkg/server/main.go +++ b/packages/orchestrator/pkg/server/main.go @@ -124,7 +124,7 @@ func New(ctx context.Context, cfg ServiceConfig) (*Server, error) { return nil, fmt.Errorf("failed to register sandbox count metric: %w", err) } - go server.refreshStartingSandboxesLimit(ctx, int64(startingLimit)) + go server.refreshStartingSandboxesLimit(ctx) return server, nil }