Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 174 additions & 72 deletions internal/nvmeof/controller/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/ghodss/yaml"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -167,7 +168,7 @@ func (cs *Server) CreateVolume(
}
}()

nvmeofData, err = cs.createNVMeoFResources(ctx, req, rbdPoolName, rbdRadosNameSpace, rbdImageName)
nvmeofData, err = cs.createNVMeoFResources(ctx, req, rbdPoolName, rbdRadosNameSpace, rbdImageName, volumeID)
if err != nil {
log.ErrorLog(ctx, "NVMe-oF resource setup failed for volumeID %s: %v", volumeID, err)

Expand Down Expand Up @@ -353,8 +354,21 @@ func (cs *Server) ControllerModifyVolume(

return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err)
}
hostsList, err := parseHostsParameters(params)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF hosts parameters: %v", err)

return nil, status.Errorf(codes.InvalidArgument, "failed to parse hosts parameters: %v", err)
}
if nvmeofQoS != nil {
return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS)
if err := cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS); err != nil {
return nil, err
}
}
if hostsList != nil {
if err := cs.modifyNVMeoFHosts(ctx, req, hostsList); err != nil {
return nil, err
}
}
Comment thread
gadididi marked this conversation as resolved.

return &csi.ControllerModifyVolumeResponse{}, nil
Expand Down Expand Up @@ -423,14 +437,10 @@ func validateDHCHAPParameter(dhchapMode string) error {
func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
// Validate required parameters
params := req.GetParameters()
requiredParams := []string{
"subsystemNQN", "nvmeofGatewayAddress",
}
for _, param := range requiredParams {
if params[param] == "" {
return fmt.Errorf("missing required parameter: %s", param)
}
if params["nvmeofGatewayAddress"] == "" {
return errors.New("missing required parameter nvmeofGatewayAddress")
}

// Validate listeners JSON if provided
countOfListeners, err := validateListenersParameter(params["listeners"])
if err != nil {
Expand Down Expand Up @@ -465,6 +475,11 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err != nil {
return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err)
}

_, err = parseHostsParameters(mutableParams)
if err != nil {
return fmt.Errorf("invalid NVMe-oF hosts parameters (for external clients): %w", err)
}
err = validateDHCHAPParameter(params["dhchapMode"])
if err != nil {
return err
Expand Down Expand Up @@ -531,72 +546,52 @@ func validateNetworkMask(networkMask string) error {
return nil
}

// parseQoSParameters extracts and parses QoS parameters from the given map.
func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) {
qos := &nvmeof.NVMeoFQosVolume{}
hasAnyQoS := false

parseParam := func(key, name string, dest **uint64) error {
if val, exists := params[key]; exists && val != "" {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s: %w", name, err)
}
*dest = &parsed
hasAnyQoS = true
}

return nil
}

if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil {
return nil, err
// parseHostsParameters parses the hosts yaml list parameter and validates its contents.
// It returns a slice of hostNQNs or an error if the YAML is invalid.
// Returns nil if the key is absent (caller should not modify hosts).
// Returns empty slice if the key is present but empty (caller should remove all hosts).
func parseHostsParameters(params map[string]string) ([]string, error) {
allowHostNQNs, exists := params[AllowHostNQNs]
if !exists {
return nil, nil // Key absent: don't modify existing hosts
}
if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil {
return nil, err
if allowHostNQNs == "" {
return []string{}, nil // Key present but empty: remove all hosts
}
if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil {
return nil, err
}

if !hasAnyQoS {
return nil, nil
var allowHostsList []string
if err := yaml.Unmarshal([]byte(allowHostNQNs), &allowHostsList); err != nil {
return nil, fmt.Errorf("invalid %s: must be a YAML list of strings: %w", AllowHostNQNs, err)
}

return qos, nil
return allowHostsList, nil
Comment on lines +549 to +566
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification.
func (cs *Server) modifyNVMeoFQoS(
// withGatewayConnection is a helper that manages the common pattern of:
// 1. Getting secrets (with fallback to k8s secret)
// 2. Getting NVMe-oF metadata
// 3. Connecting to gateway with proper cleanup
// 4. Executing the provided operation function.
func (cs *Server) withGatewayConnection(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
qos *nvmeof.NVMeoFQosVolume,
) (*csi.ControllerModifyVolumeResponse, error) {
volumeID := req.GetVolumeId()

volumeID string,
fn func(context.Context, *nvmeof.GatewayRpcClient, *nvmeof.NVMeoFVolumeData) error,
) error {
// Step 1: Get secrets

// Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets
// because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !,
// the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet.
// TODO: change the call to GetControllerExpandSecretRef once it is implemented.
secrets := req.GetSecrets()
if secrets == nil {
secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret reference: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err)
return status.Errorf(codes.Internal, "failed to get secret reference: %v", err)
}

secrets, err = k8s.GetSecret(secretName, secretNamespace)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err)
return status.Errorf(codes.Internal, "failed to get secret: %v", err)
}
}

Expand All @@ -605,7 +600,7 @@ func (cs *Server) modifyNVMeoFQoS(
if err != nil {
log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err)

return nil, nvmeoferrors.ToGRPCError(err)
return nvmeoferrors.ToGRPCError(err)
}

// Step 3: Connect to gateway
Expand All @@ -617,35 +612,115 @@ func (cs *Server) modifyNVMeoFQoS(
if err != nil {
log.ErrorLog(ctx, "Gateway connection failed: %v", err)

return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err)
return status.Errorf(codes.Unavailable, "gateway connection failed: %v", err)
}
defer func() {
if closeErr := gateway.Destroy(); closeErr != nil {
log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr)
}
}()

// Step 4: Apply NVMe-oF QoS via gateway
log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)
// Step 4: Execute the operation
return fn(ctx, gateway, nvmeofData)
}

err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos)
if err != nil {
// Check if error is EEXIST (RBD QoS already set)
if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) {
log.ErrorLog(ctx, "RBD QoS already configured on volume")
// modifyNVMeoFHosts handles adding or removing hosts from the subsystem based on the provided list of host NQNs.
func (cs *Server) modifyNVMeoFHosts(ctx context.Context, req *csi.ControllerModifyVolumeRequest, hosts []string) error {
volumeID := req.GetVolumeId()

return cs.withGatewayConnection(ctx, req, volumeID, func(
ctx context.Context,
gateway *nvmeof.GatewayRpcClient,
nvmeofData *nvmeof.NVMeoFVolumeData,
) error {
log.DebugLog(ctx, "Modifying hosts for subsystem=%s, nsid=%d: desired hosts=%v",
nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, hosts)

return nil, status.Error(codes.InvalidArgument,
"RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS")
err := gateway.UpdateHostsForSubsystem(ctx, nvmeofData.SubsystemNQN, hosts)
if err != nil {
log.ErrorLog(ctx, "Failed to update hosts for subsystem: %v", err)

return status.Errorf(codes.Internal, "failed to update hosts for subsystem: %v", err)
}

log.ErrorLog(ctx, "Failed to set QoS limits: %v", err)
log.DebugLog(ctx, "Successfully modified hosts for volume %s", volumeID)

return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err)
return nil
})
}

// parseQoSParameters extracts and parses QoS parameters from the given map.
func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) {
qos := &nvmeof.NVMeoFQosVolume{}
hasAnyQoS := false

parseParam := func(key, name string, dest **uint64) error {
if val, exists := params[key]; exists && val != "" {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s: %w", name, err)
}
*dest = &parsed
hasAnyQoS = true
}

return nil
}

log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID)
if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil {
return nil, err
}

return &csi.ControllerModifyVolumeResponse{}, nil
if !hasAnyQoS {
return nil, nil
}

return qos, nil
}

// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification.
func (cs *Server) modifyNVMeoFQoS(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
qos *nvmeof.NVMeoFQosVolume,
) error {
volumeID := req.GetVolumeId()

return cs.withGatewayConnection(ctx, req, volumeID, func(
ctx context.Context,
gateway *nvmeof.GatewayRpcClient,
nvmeofData *nvmeof.NVMeoFVolumeData,
) error {
log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)

err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos)
if err != nil {
// Check if error is EEXIST (RBD QoS already set)
if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) {
log.ErrorLog(ctx, "RBD QoS already configured on volume")

return status.Error(codes.InvalidArgument,
"RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS")
}

log.ErrorLog(ctx, "Failed to set QoS limits: %v", err)

return status.Errorf(codes.Internal, "failed to set QoS limits: %v", err)
}

log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID)

return nil
})
}

// ensureSubsystem checks if the subsystem exists, and creates it if not.
Expand Down Expand Up @@ -743,15 +818,16 @@ func (cs *Server) createNVMeoFResources(
req *csi.CreateVolumeRequest,
rbdPoolName,
rbdRadosNameSpace,
rbdImageName string,
rbdImageName,
volumeID string,
) (*nvmeof.NVMeoFVolumeData, error) {
// Step 1: Extract parameters (already validated)
params := req.GetParameters()

networkMask := params["networkMask"]
nvmeofData := &nvmeof.NVMeoFVolumeData{}

if err := nvmeofData.SetFromParameters(params); err != nil {
if err := nvmeofData.SetFromParameters(params, volumeID); err != nil {
return nil, fmt.Errorf("failed to set NVMe-oF volume data: %w", err)
}

Expand All @@ -765,7 +841,15 @@ func (cs *Server) createNVMeoFResources(

return nil, fmt.Errorf("failed to parse QoS parameters: %w", err)
}
// If VAC with hosts list is given (for external client)
// We need to parse the hosts list and pass it to the gateway for creating host entries
// and adding them to the subsystem.
hosts, err := parseHostsParameters(mutableParams)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF hosts parameters: %v", err)

return nil, fmt.Errorf("failed to parse hosts parameters: %w", err)
}
// Step 2: Connect to gateway
config, err := getGatewayConfigFromRequest(params)
if err != nil {
Expand Down Expand Up @@ -813,7 +897,16 @@ func (cs *Server) createNVMeoFResources(
return nvmeofData, fmt.Errorf("setting QoS limits failed: %w", err)
}
}

if hosts != nil {
log.DebugLog(ctx, "Adding hosts to subsystem: %v", hosts)
for _, host := range hosts {
// TODO - for now we create host with empty DH-CHAP keys,
// in the future we can extend the VAC parameters to allow passing DH-CHAP keys for each host if needed??
if err := gateway.AddHost(ctx, nvmeofData.SubsystemNQN, host, nvmeof.DHCHAPKeys{}); err != nil {
return nvmeofData, fmt.Errorf("adding host %s to subsystem failed: %w", host, err)
}
}
}
// Step 6: If using auto-listeners, query them back for storing in metadata
if networkMask != "" {
autoListeners, err := gateway.ListListeners(ctx, nvmeofData.SubsystemNQN)
Expand Down Expand Up @@ -1013,6 +1106,15 @@ func getHostNQNFromNodeID(nodeID string) (string, error) {
return prefix + nodeID, nil
}

// AllowHostNQNs is the VolumeAttributesClass mutable parameter key for specifying
// a YAML list of host NQNs to allow access to a volume. Use "*" to allow any host.
// Example:
//
// allowHostNQNs: |
// - nqn.2014-08.org.nvmexpress:host1
// - nqn.2014-08.org.nvmexpress:host2
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure this example is included in the StorageClass too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it 🤔
the PVC may have VAC (with allowHostNQNs) , but we dont add this param into the StorageClass

const AllowHostNQNs = "allowHostNQNs"

// VolumeContext metadata keys.
const (
// NVMe-oF resource info.
Expand Down
Loading
Loading