-
Notifications
You must be signed in to change notification settings - Fork 606
nvmeof: Add external client support #6251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import ( | |
| "strconv" | ||
|
|
||
| "github.com/container-storage-interface/spec/lib/go/csi" | ||
| "github.com/ghodss/yaml" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" | ||
|
|
||
|
|
@@ -167,7 +168,7 @@ func (cs *Server) CreateVolume( | |
| } | ||
| }() | ||
|
|
||
| nvmeofData, err = cs.createNVMeoFResources(ctx, req, rbdPoolName, rbdRadosNameSpace, rbdImageName) | ||
| nvmeofData, err = cs.createNVMeoFResources(ctx, req, rbdPoolName, rbdRadosNameSpace, rbdImageName, volumeID) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "NVMe-oF resource setup failed for volumeID %s: %v", volumeID, err) | ||
|
|
||
|
|
@@ -353,8 +354,21 @@ func (cs *Server) ControllerModifyVolume( | |
|
|
||
| return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err) | ||
| } | ||
| hostsList, err := parseHostsParameters(params) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "failed to parse NVMe-oF hosts parameters: %v", err) | ||
|
|
||
| return nil, status.Errorf(codes.InvalidArgument, "failed to parse hosts parameters: %v", err) | ||
| } | ||
| if nvmeofQoS != nil { | ||
| return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS) | ||
| if err := cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| if hostsList != nil { | ||
| if err := cs.modifyNVMeoFHosts(ctx, req, hostsList); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| return &csi.ControllerModifyVolumeResponse{}, nil | ||
|
|
@@ -423,14 +437,10 @@ func validateDHCHAPParameter(dhchapMode string) error { | |
| func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { | ||
| // Validate required parameters | ||
| params := req.GetParameters() | ||
| requiredParams := []string{ | ||
| "subsystemNQN", "nvmeofGatewayAddress", | ||
| } | ||
| for _, param := range requiredParams { | ||
| if params[param] == "" { | ||
| return fmt.Errorf("missing required parameter: %s", param) | ||
| } | ||
| if params["nvmeofGatewayAddress"] == "" { | ||
| return errors.New("missing required parameter nvmeofGatewayAddress") | ||
| } | ||
|
|
||
| // Validate listeners JSON if provided | ||
| countOfListeners, err := validateListenersParameter(params["listeners"]) | ||
| if err != nil { | ||
|
|
@@ -465,6 +475,11 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { | |
| if err != nil { | ||
| return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err) | ||
| } | ||
|
|
||
| _, err = parseHostsParameters(mutableParams) | ||
| if err != nil { | ||
| return fmt.Errorf("invalid NVMe-oF hosts parameters (for external clients): %w", err) | ||
| } | ||
| err = validateDHCHAPParameter(params["dhchapMode"]) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -531,72 +546,52 @@ func validateNetworkMask(networkMask string) error { | |
| return nil | ||
| } | ||
|
|
||
| // parseQoSParameters extracts and parses QoS parameters from the given map. | ||
| func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) { | ||
| qos := &nvmeof.NVMeoFQosVolume{} | ||
| hasAnyQoS := false | ||
|
|
||
| parseParam := func(key, name string, dest **uint64) error { | ||
| if val, exists := params[key]; exists && val != "" { | ||
| parsed, err := strconv.ParseUint(val, 10, 64) | ||
| if err != nil { | ||
| return fmt.Errorf("invalid %s: %w", name, err) | ||
| } | ||
| *dest = &parsed | ||
| hasAnyQoS = true | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil { | ||
| return nil, err | ||
| // parseHostsParameters parses the hosts yaml list parameter and validates its contents. | ||
| // It returns a slice of hostNQNs or an error if the YAML is invalid. | ||
| // Returns nil if the key is absent (caller should not modify hosts). | ||
| // Returns empty slice if the key is present but empty (caller should remove all hosts). | ||
| func parseHostsParameters(params map[string]string) ([]string, error) { | ||
| allowHostNQNs, exists := params[AllowHostNQNs] | ||
| if !exists { | ||
| return nil, nil // Key absent: don't modify existing hosts | ||
| } | ||
| if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| if allowHostNQNs == "" { | ||
| return []string{}, nil // Key present but empty: remove all hosts | ||
| } | ||
| if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if !hasAnyQoS { | ||
| return nil, nil | ||
| var allowHostsList []string | ||
| if err := yaml.Unmarshal([]byte(allowHostNQNs), &allowHostsList); err != nil { | ||
| return nil, fmt.Errorf("invalid %s: must be a YAML list of strings: %w", AllowHostNQNs, err) | ||
| } | ||
|
|
||
| return qos, nil | ||
| return allowHostsList, nil | ||
|
Comment on lines
+549
to
+566
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| } | ||
|
|
||
| // modifyNVMeoFQoS handles NVMe-oF gateway QoS modification. | ||
| func (cs *Server) modifyNVMeoFQoS( | ||
| // withGatewayConnection is a helper that manages the common pattern of: | ||
| // 1. Getting secrets (with fallback to k8s secret) | ||
| // 2. Getting NVMe-oF metadata | ||
| // 3. Connecting to gateway with proper cleanup | ||
| // 4. Executing the provided operation function. | ||
| func (cs *Server) withGatewayConnection( | ||
| ctx context.Context, | ||
| req *csi.ControllerModifyVolumeRequest, | ||
| qos *nvmeof.NVMeoFQosVolume, | ||
| ) (*csi.ControllerModifyVolumeResponse, error) { | ||
| volumeID := req.GetVolumeId() | ||
|
|
||
| volumeID string, | ||
| fn func(context.Context, *nvmeof.GatewayRpcClient, *nvmeof.NVMeoFVolumeData) error, | ||
| ) error { | ||
| // Step 1: Get secrets | ||
|
|
||
| // Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets | ||
| // because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !, | ||
| // the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet. | ||
| // TODO: change the call to GetControllerExpandSecretRef once it is implemented. | ||
| secrets := req.GetSecrets() | ||
| if secrets == nil { | ||
| secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "Failed to get secret reference: %v", err) | ||
|
|
||
| return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err) | ||
| return status.Errorf(codes.Internal, "failed to get secret reference: %v", err) | ||
| } | ||
|
|
||
| secrets, err = k8s.GetSecret(secretName, secretNamespace) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err) | ||
|
|
||
| return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err) | ||
| return status.Errorf(codes.Internal, "failed to get secret: %v", err) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -605,7 +600,7 @@ func (cs *Server) modifyNVMeoFQoS( | |
| if err != nil { | ||
| log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err) | ||
|
|
||
| return nil, nvmeoferrors.ToGRPCError(err) | ||
| return nvmeoferrors.ToGRPCError(err) | ||
| } | ||
|
|
||
| // Step 3: Connect to gateway | ||
|
|
@@ -617,35 +612,115 @@ func (cs *Server) modifyNVMeoFQoS( | |
| if err != nil { | ||
| log.ErrorLog(ctx, "Gateway connection failed: %v", err) | ||
|
|
||
| return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err) | ||
| return status.Errorf(codes.Unavailable, "gateway connection failed: %v", err) | ||
| } | ||
| defer func() { | ||
| if closeErr := gateway.Destroy(); closeErr != nil { | ||
| log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr) | ||
| } | ||
| }() | ||
|
|
||
| // Step 4: Apply NVMe-oF QoS via gateway | ||
| log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) | ||
| // Step 4: Execute the operation | ||
| return fn(ctx, gateway, nvmeofData) | ||
| } | ||
|
|
||
| err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos) | ||
| if err != nil { | ||
| // Check if error is EEXIST (RBD QoS already set) | ||
| if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) { | ||
| log.ErrorLog(ctx, "RBD QoS already configured on volume") | ||
| // modifyNVMeoFHosts handles adding or removing hosts from the subsystem based on the provided list of host NQNs. | ||
| func (cs *Server) modifyNVMeoFHosts(ctx context.Context, req *csi.ControllerModifyVolumeRequest, hosts []string) error { | ||
| volumeID := req.GetVolumeId() | ||
|
|
||
| return cs.withGatewayConnection(ctx, req, volumeID, func( | ||
| ctx context.Context, | ||
| gateway *nvmeof.GatewayRpcClient, | ||
| nvmeofData *nvmeof.NVMeoFVolumeData, | ||
| ) error { | ||
| log.DebugLog(ctx, "Modifying hosts for subsystem=%s, nsid=%d: desired hosts=%v", | ||
| nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, hosts) | ||
|
|
||
| return nil, status.Error(codes.InvalidArgument, | ||
| "RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS") | ||
| err := gateway.UpdateHostsForSubsystem(ctx, nvmeofData.SubsystemNQN, hosts) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "Failed to update hosts for subsystem: %v", err) | ||
|
|
||
| return status.Errorf(codes.Internal, "failed to update hosts for subsystem: %v", err) | ||
| } | ||
|
|
||
| log.ErrorLog(ctx, "Failed to set QoS limits: %v", err) | ||
| log.DebugLog(ctx, "Successfully modified hosts for volume %s", volumeID) | ||
|
|
||
| return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err) | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| // parseQoSParameters extracts and parses QoS parameters from the given map. | ||
| func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) { | ||
| qos := &nvmeof.NVMeoFQosVolume{} | ||
| hasAnyQoS := false | ||
|
|
||
| parseParam := func(key, name string, dest **uint64) error { | ||
| if val, exists := params[key]; exists && val != "" { | ||
| parsed, err := strconv.ParseUint(val, 10, 64) | ||
| if err != nil { | ||
| return fmt.Errorf("invalid %s: %w", name, err) | ||
| } | ||
| *dest = &parsed | ||
| hasAnyQoS = true | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID) | ||
| if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return &csi.ControllerModifyVolumeResponse{}, nil | ||
| if !hasAnyQoS { | ||
| return nil, nil | ||
| } | ||
|
|
||
| return qos, nil | ||
| } | ||
|
|
||
| // modifyNVMeoFQoS handles NVMe-oF gateway QoS modification. | ||
| func (cs *Server) modifyNVMeoFQoS( | ||
| ctx context.Context, | ||
| req *csi.ControllerModifyVolumeRequest, | ||
| qos *nvmeof.NVMeoFQosVolume, | ||
| ) error { | ||
| volumeID := req.GetVolumeId() | ||
|
|
||
| return cs.withGatewayConnection(ctx, req, volumeID, func( | ||
| ctx context.Context, | ||
| gateway *nvmeof.GatewayRpcClient, | ||
| nvmeofData *nvmeof.NVMeoFVolumeData, | ||
| ) error { | ||
| log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) | ||
|
|
||
| err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos) | ||
| if err != nil { | ||
| // Check if error is EEXIST (RBD QoS already set) | ||
| if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) { | ||
| log.ErrorLog(ctx, "RBD QoS already configured on volume") | ||
|
|
||
| return status.Error(codes.InvalidArgument, | ||
| "RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS") | ||
| } | ||
|
|
||
| log.ErrorLog(ctx, "Failed to set QoS limits: %v", err) | ||
|
|
||
| return status.Errorf(codes.Internal, "failed to set QoS limits: %v", err) | ||
| } | ||
|
|
||
| log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID) | ||
|
|
||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| // ensureSubsystem checks if the subsystem exists, and creates it if not. | ||
|
|
@@ -743,15 +818,16 @@ func (cs *Server) createNVMeoFResources( | |
| req *csi.CreateVolumeRequest, | ||
| rbdPoolName, | ||
| rbdRadosNameSpace, | ||
| rbdImageName string, | ||
| rbdImageName, | ||
| volumeID string, | ||
| ) (*nvmeof.NVMeoFVolumeData, error) { | ||
| // Step 1: Extract parameters (already validated) | ||
| params := req.GetParameters() | ||
|
|
||
| networkMask := params["networkMask"] | ||
| nvmeofData := &nvmeof.NVMeoFVolumeData{} | ||
|
|
||
| if err := nvmeofData.SetFromParameters(params); err != nil { | ||
| if err := nvmeofData.SetFromParameters(params, volumeID); err != nil { | ||
| return nil, fmt.Errorf("failed to set NVMe-oF volume data: %w", err) | ||
| } | ||
|
|
||
|
|
@@ -765,7 +841,15 @@ func (cs *Server) createNVMeoFResources( | |
|
|
||
| return nil, fmt.Errorf("failed to parse QoS parameters: %w", err) | ||
| } | ||
| // If VAC with hosts list is given (for external client) | ||
| // We need to parse the hosts list and pass it to the gateway for creating host entries | ||
| // and adding them to the subsystem. | ||
| hosts, err := parseHostsParameters(mutableParams) | ||
| if err != nil { | ||
| log.ErrorLog(ctx, "failed to parse NVMe-oF hosts parameters: %v", err) | ||
|
|
||
| return nil, fmt.Errorf("failed to parse hosts parameters: %w", err) | ||
| } | ||
| // Step 2: Connect to gateway | ||
| config, err := getGatewayConfigFromRequest(params) | ||
| if err != nil { | ||
|
|
@@ -813,7 +897,16 @@ func (cs *Server) createNVMeoFResources( | |
| return nvmeofData, fmt.Errorf("setting QoS limits failed: %w", err) | ||
| } | ||
| } | ||
|
|
||
| if hosts != nil { | ||
| log.DebugLog(ctx, "Adding hosts to subsystem: %v", hosts) | ||
| for _, host := range hosts { | ||
| // TODO - for now we create host with empty DH-CHAP keys, | ||
| // in the future we can extend the VAC parameters to allow passing DH-CHAP keys for each host if needed?? | ||
| if err := gateway.AddHost(ctx, nvmeofData.SubsystemNQN, host, nvmeof.DHCHAPKeys{}); err != nil { | ||
| return nvmeofData, fmt.Errorf("adding host %s to subsystem failed: %w", host, err) | ||
| } | ||
| } | ||
| } | ||
| // Step 6: If using auto-listeners, query them back for storing in metadata | ||
| if networkMask != "" { | ||
| autoListeners, err := gateway.ListListeners(ctx, nvmeofData.SubsystemNQN) | ||
|
|
@@ -1013,6 +1106,15 @@ func getHostNQNFromNodeID(nodeID string) (string, error) { | |
| return prefix + nodeID, nil | ||
| } | ||
|
|
||
| // AllowHostNQNs is the VolumeAttributesClass mutable parameter key for specifying | ||
| // a YAML list of host NQNs to allow access to a volume. Use "*" to allow any host. | ||
| // Example: | ||
| // | ||
| // allowHostNQNs: | | ||
| // - nqn.2014-08.org.nvmexpress:host1 | ||
| // - nqn.2014-08.org.nvmexpress:host2 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make sure this example is included in the StorageClass too
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't get it 🤔 |
||
| const AllowHostNQNs = "allowHostNQNs" | ||
|
|
||
| // VolumeContext metadata keys. | ||
| const ( | ||
| // NVMe-oF resource info. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.