Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/orchestrator/pkg/factories/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func run(config cfg.Config, opts Options) (success bool) {
return nil
}})

orchestratorService, err := server.New(server.ServiceConfig{
orchestratorService, err := server.New(ctx, server.ServiceConfig{
Config: config,
SandboxFactory: sandboxFactory,
Tel: tel,
Expand Down
53 changes: 49 additions & 4 deletions packages/orchestrator/pkg/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package server
import (
"context"
"fmt"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/semaphore"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg"
"github.com/e2b-dev/infra/packages/orchestrator/pkg/events"
Expand All @@ -22,14 +23,20 @@ import (
"github.com/e2b-dev/infra/packages/orchestrator/pkg/service"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

// Matches the template cache TTL so entries live as long as the
// templates they refer to and are cleaned up automatically.
const uploadedBuildsTTL = 1 * time.Hour

// startingSandboxesLimitRefreshInterval is how often we re-read the
// MaxStartingInstancesPerNode feature flag and resize the semaphore.
const startingSandboxesLimitRefreshInterval = 30 * time.Second

type Server struct {
orchestrator.UnimplementedSandboxServiceServer
orchestrator.UnimplementedChunkServiceServer
Expand All @@ -44,11 +51,14 @@ type Server struct {
persistence storage.StorageProvider
featureFlags *featureflags.Client
sbxEventsService *events.EventsService
startingSandboxes *semaphore.Weighted
startingSandboxes *utils.AdjustableSemaphore
peerRegistry peerclient.Registry
uploadedBuilds *ttlcache.Cache[string, struct{}]
uploads *sandbox.Uploads
sandboxCreateDuration metric.Int64Histogram

done chan struct{}
closeOnce sync.Once
}

type ServiceConfig struct {
Expand All @@ -67,12 +77,18 @@ type ServiceConfig struct {
Uploads *sandbox.Uploads
}

func New(cfg ServiceConfig) (*Server, error) {
func New(ctx context.Context, cfg ServiceConfig) (*Server, error) {
uploadedBuilds := ttlcache.New[string, struct{}](
ttlcache.WithTTL[string, struct{}](uploadedBuildsTTL),
)
go uploadedBuilds.Start()

startingLimit := cfg.FeatureFlags.IntFlag(ctx, featureflags.MaxStartingInstancesPerNode)
startingSandboxes, err := utils.NewAdjustableSemaphore(int64(startingLimit))
if err != nil {
return nil, fmt.Errorf("failed to create starting sandboxes semaphore: %w", err)
Comment on lines +86 to +89

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Fall back when startup flag value is non-positive

New now constructs startingSandboxes directly from MaxStartingInstancesPerNode, but if LaunchDarkly is misconfigured to 0 or a negative value, NewAdjustableSemaphore returns an error and the orchestrator fails to start. This is a regression from the previous hardcoded limit path and is especially problematic because the periodic refresher already treats <= 0 as invalid and skips applying it, so startup should likewise clamp or fall back instead of hard-failing the whole service.

Useful? React with 👍 / 👎.

}

server := &Server{
config: cfg.Config,
sandboxFactory: cfg.SandboxFactory,
Expand All @@ -84,10 +100,11 @@ func New(cfg ServiceConfig) (*Server, error) {
persistence: cfg.Persistence,
featureFlags: cfg.FeatureFlags,
sbxEventsService: cfg.SbxEventsService,
startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode),
startingSandboxes: startingSandboxes,
peerRegistry: cfg.PeerRegistry,
uploadedBuilds: uploadedBuilds,
uploads: cfg.Uploads,
done: make(chan struct{}),
}

meter := cfg.Tel.MeterProvider.Meter("github.com/e2b-dev/infra/packages/orchestrator/pkg/server")
Expand All @@ -107,11 +124,39 @@ func New(cfg ServiceConfig) (*Server, error) {
return nil, fmt.Errorf("failed to register sandbox count metric: %w", err)
}

go server.refreshStartingSandboxesLimit(ctx)

return server, nil
}

func (s *Server) Close() error {
s.closeOnce.Do(func() {
close(s.done)
})

s.uploadedBuilds.Stop()

return nil
}

func (s *Server) refreshStartingSandboxesLimit(ctx context.Context) {
ticker := time.NewTicker(startingSandboxesLimitRefreshInterval)
defer ticker.Stop()

for {
select {
case <-s.done:
return
case <-ticker.C:
limit := s.featureFlags.IntFlag(ctx, featureflags.MaxStartingInstancesPerNode)
if limit <= 0 {
continue
}

if err := s.startingSandboxes.SetLimit(int64(limit)); err != nil {
Comment thread
jakubno marked this conversation as resolved.
logger.L().Error(ctx, "failed to adjust starting sandboxes semaphore",
zap.Int("limit", limit), zap.Error(err))
}
}
}
}
3 changes: 1 addition & 2 deletions packages/orchestrator/pkg/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/pkg/ser
const (
requestTimeout = 60 * time.Second
// acquireTimeout is the max time to wait for a semaphore for resuming sandboxes snapshot.
acquireTimeout = 15 * time.Second
maxStartingInstancesPerNode = 3
acquireTimeout = 15 * time.Second

// uploadTimeout is the max time allowed for uploading snapshot files to
// remote storage.
Expand Down
4 changes: 4 additions & 0 deletions packages/shared/pkg/featureflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ var (
// Reserved blocks are only usable by root (uid 0), protecting the guest OS from disk-full conditions.
BuildReservedDiskSpaceMB = NewIntFlag("build-reserved-disk-space-mb", 0)

// MaxStartingInstancesPerNode limits concurrent sandbox start/resume operations on a single orchestrator node.
// Must be > 0.
MaxStartingInstancesPerNode = NewIntFlag("max-starting-instances-per-node", 3)

// MaxConcurrentSnapshotUpserts limits concurrent UpsertSnapshot calls (pause + snapshot template paths).
// 0 or negative disables throttling (unlimited concurrency).
MaxConcurrentSnapshotUpserts = NewIntFlag("max-concurrent-snapshot-upserts", 0)
Expand Down
4 changes: 4 additions & 0 deletions packages/shared/pkg/utils/resizable_semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (s *AdjustableSemaphore) SetLimit(limit int64) error {
s.mu.Lock()
defer s.mu.Unlock()

if limit == s.limit {
return nil // no change
}

s.limit = limit
s.cond.Broadcast()

Expand Down
Loading