Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/api/internal/handlers/volume_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func isValidVolumeName(name string) bool {
}

func (a *APIStore) createVolume(ctx context.Context, clusterID uuid.UUID, volume queries.Volume) error {
return a.executeOnOrchestratorByClusterID(ctx, clusterID, func(ctx context.Context, client *clusters.GRPCClient) error {
// Broadcast to all nodes so the volume directory exists on every orchestrator
// node. Sandbox placement is independent of volume creation and may schedule
// a sandbox on any node in the cluster.
return a.executeOnAllClusterNodes(ctx, clusterID, func(ctx context.Context, client *clusters.GRPCClient) error {
_, err := client.Volumes.CreateVolume(ctx, &orchestrator.CreateVolumeRequest{
Volume: toVolumeKey(volume),
})
Expand Down
3 changes: 2 additions & 1 deletion packages/api/internal/handlers/volume_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func (a *APIStore) DeleteVolumesVolumeID(c *gin.Context, volumeID api.VolumeID)
}

func (a *APIStore) deleteVolume(ctx context.Context, clusterID uuid.UUID, volume queries.Volume) error {
return a.executeOnOrchestratorByClusterID(ctx, clusterID, func(ctx context.Context, client *clusters.GRPCClient) error {
// Broadcast to all nodes to clean up the volume directory everywhere it was created.
return a.executeOnAllClusterNodes(ctx, clusterID, func(ctx context.Context, client *clusters.GRPCClient) error {
_, err := client.Volumes.DeleteVolume(ctx, &orchestrator.DeleteVolumeRequest{
Volume: toVolumeKey(volume),
})
Expand Down
58 changes: 58 additions & 0 deletions packages/api/internal/handlers/volume_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/status"

"github.com/e2b-dev/infra/packages/api/internal/api"
Expand Down Expand Up @@ -181,3 +182,60 @@ func isRetryableError(err error) bool {

return false
}

// executeOnAllClusterNodes calls fn concurrently on every ready node in the
// cluster and returns a combined error if any node fails. This is used for
// volume operations so that the volume directory exists on every orchestrator
// node, regardless of which node a sandbox is later scheduled on.
//
// All goroutines always run to completion regardless of individual failures so
// that partial state (e.g. a volume directory left on some nodes but not
// others) is avoided.
func (a *APIStore) executeOnAllClusterNodes(
ctx context.Context,
clusterID uuid.UUID,
fn func(context.Context, *clusters.GRPCClient) error,
) error {
nodes := a.orchestrator.GetClusterNodes(clusterID)

if len(nodes) == 0 {
return ErrClusterNotFound
}

// Use a plain errgroup without context so that a failure on one node does
// not cancel in-flight or pending RPCs on other nodes.
var wg errgroup.Group

readyNodeCount := 0

for _, node := range nodes {
if node.Status() != api.NodeStatusReady {
continue
}

readyNodeCount++

wg.Go(func() error {
// Derive a per-call context from the original request context so
// that the caller's deadline/cancellation is still respected, but
// a failure on one node does not affect the others.
c, clientCtx := node.GetClient(ctx)

if err := fn(clientCtx, c); err != nil {
if volumeType, ok := isUnknownVolumeTypeError(err); ok {
return fmt.Errorf("%w: %s", ErrUnknownVolumeType, volumeType)
}

return fmt.Errorf("node %s: %w", node.ID, err)
}

return nil
})
}

if readyNodeCount == 0 {
return ErrNoHealthyOrchestratorFound
}

return wg.Wait()
Comment thread
AdaAibaby marked this conversation as resolved.
}
2 changes: 1 addition & 1 deletion packages/envd/internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (a *API) setupNFS(ctx context.Context, logger zerolog.Logger, lifecycleID *
if !a.isMountingNFS.CompareAndSwap(false, true) {
logger.Debug().Msg("NFS volumes already mounting")

return e
return fmt.Errorf("NFS mount already in progress")
}
defer a.isMountingNFS.Store(false)

Expand Down