Skip to content

Commit cc13abf

Browse files
authored
node scalability changes for nvme
1 parent 3fc41a5 commit cc13abf

File tree

4 files changed

+1055
-141
lines changed

4 files changed

+1055
-141
lines changed

frontend/csi/node_server.go

Lines changed: 110 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ const (
7676
maxNodeStageFCPVolumeOperations = 5
7777
maxNodeUnstageFCPVolumeOperations = 10
7878
maxNodePublishFCPVolumeOperations = 10
79+
maxNodeStageNVMeVolumeOperations = 5
80+
maxNodeUnstageNVMeVolumeOperations = 10
81+
maxNodePublishNVMeVolumeOperations = 10
7982
maxNodeExpandVolumeOperations = 10
8083

8184
NodeStageNFSVolume = "NodeStageNFSVolume"
@@ -95,11 +98,19 @@ const (
9598
NodeUnstageFCPVolume = "NodeUnstageFCPVolume"
9699
NodePublishFCPVolume = "NodePublishFCPVolume"
97100

101+
// NVMe Constants
102+
NodeStageNVMeVolume = "NodeStageNVMeVolume"
103+
NodeUnstageNVMeVolume = "NodeUnstageNVMeVolume"
104+
NodePublishNVMeVolume = "NodePublishNVMeVolume"
105+
98106
NodeUnpublishVolume = "NodeUnpublishVolume"
99107
NodeExpandVolume = "NodeExpandVolume"
100108

101-
// LockID Constant for the self-healing global lock.
109+
// LockID Constants for the self-healing global locks.
102110
iSCSISelfHealingSessionLock = "iSCSISelfHealingSessionLock"
111+
nvmeSelfHealingSessionLock = "nvmeSelfHealingSessionLock"
112+
nvmeSubsystemDisconnectLock = "nvmeSubsystemDisconnectLock"
113+
nvmeFlushRetryMapLock = "nvmeFlushRetryMapLock"
103114
)
104115

105116
var (
@@ -128,6 +139,10 @@ var (
128139
// TODO(pshashan): Once the locks package revamp PR is merged, remove both of these var below.
129140
iSCSISelfHealingLock = sync.RWMutex{}
130141
iSCSINodeOperationWaitingCount atomic.Int32
142+
143+
// NVMe Self-healing lock and counter for parallelism
144+
nvmeSelfHealingLock = sync.RWMutex{}
145+
nvmeNodeOperationWaitingCount atomic.Int32
131146
)
132147

133148
const (
@@ -571,21 +586,9 @@ func (p *Plugin) NodeExpandVolume(
571586
}
572587

573588
{
574-
// TODO(pshashan): Remove this POC once the NVMe protocol is parallelized.
575-
// It currently enables parallelization for the iSCSI and FCP protocol while keeping NVMe serialized.
576-
protocol, err := getVolumeProtocolFromPublishInfo(&trackingInfo.VolumePublishInfo)
577-
if err != nil {
578-
return nil, status.Errorf(codes.FailedPrecondition, "unable to read protocol info from publish info; %s", err)
579-
}
589+
// All protocols (iSCSI, FCP, NVMe) now use limiter-based parallelism
580590
lockID := req.GetVolumeId()
581591
lockContext := "NodeExpandVolume"
582-
if protocol == tridentconfig.Block {
583-
switch trackingInfo.VolumePublishInfo.SANType {
584-
case sa.NVMe:
585-
lockID = nodeLockID
586-
lockContext = "NodeExpandVolume-" + req.GetVolumeId()
587-
}
588-
}
589592
defer locks.Unlock(ctx, lockContext, lockID)
590593
if !attemptLock(ctx, lockContext, lockID, csiNodeLockTimeout) {
591594
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -2969,12 +2972,16 @@ func (p *Plugin) nodeStageNVMeVolume(
29692972
Logc(ctx).Debug(">>>> nodeStageNVMeVolume")
29702973
defer Logc(ctx).Debug("<<<< nodeStageNVMeVolume")
29712974

2972-
// Serializing all the parallel requests by relying on the constant var.
2973-
lockContext := "NodeStageSANNVMeVolume-" + req.GetVolumeId()
2974-
defer locks.Unlock(ctx, lockContext, nodeLockID)
2975-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
2976-
return status.Error(codes.Aborted, "request waited too long for the lock")
2975+
if err := p.limiterSharedMap[NodeStageNVMeVolume].Wait(ctx); err != nil {
2976+
return err
29772977
}
2978+
defer p.limiterSharedMap[NodeStageNVMeVolume].Release(ctx)
2979+
2980+
// Acquire self-healing read lock to allow parallel operations
2981+
nvmeNodeOperationWaitingCount.Add(1)
2982+
nvmeSelfHealingLock.RLock()
2983+
defer nvmeSelfHealingLock.RUnlock()
2984+
nvmeNodeOperationWaitingCount.Add(-1)
29782985

29792986
isLUKS := convert.ToBool(req.PublishContext["LUKSEncryption"])
29802987
publishInfo.LUKSEncryption = strconv.FormatBool(isLUKS)
@@ -3030,7 +3037,13 @@ func (p *Plugin) nodeStageNVMeVolume(
30303037
return err
30313038
}
30323039

3040+
lockContext := "nodeStageNVMeVolume.AddSession"
3041+
if !attemptLock(ctx, lockContext, nvmeSelfHealingSessionLock, csiNodeLockTimeout) {
3042+
locks.Unlock(ctx, lockContext, nvmeSelfHealingSessionLock)
3043+
return status.Error(codes.Aborted, "request waited too long for the lock")
3044+
}
30333045
p.nvmeHandler.AddPublishedNVMeSession(&publishedNVMeSessions, publishInfo)
3046+
locks.Unlock(ctx, lockContext, nvmeSelfHealingSessionLock)
30343047
return nil
30353048
}
30363049

@@ -3045,15 +3058,25 @@ func (p *Plugin) nodeUnstageNVMeVolume(
30453058
Logc(ctx).Debug(">>>> nodeUnstageNVMeVolume")
30463059
defer Logc(ctx).Debug("<<<< nodeUnstageNVMeVolume")
30473060

3048-
// Serializing all the parallel requests by relying on the constant var.
3049-
lockContext := "NodeUnstageNVMeVolume-" + req.GetVolumeId()
3050-
defer locks.Unlock(ctx, lockContext, nodeLockID)
3051-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
3052-
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
3061+
if err := p.limiterSharedMap[NodeUnstageNVMeVolume].Wait(ctx); err != nil {
3062+
return nil, err
30533063
}
3064+
defer p.limiterSharedMap[NodeUnstageNVMeVolume].Release(ctx)
3065+
3066+
// Acquire self-healing read lock to allow parallel operations
3067+
nvmeNodeOperationWaitingCount.Add(1)
3068+
nvmeSelfHealingLock.RLock()
3069+
defer nvmeSelfHealingLock.RUnlock()
3070+
nvmeNodeOperationWaitingCount.Add(-1)
30543071

3072+
lockContext := "nodeUnstageNVMeVolume.RemovePublishedNVMeSession"
3073+
if !attemptLock(ctx, lockContext, nvmeSelfHealingSessionLock, csiNodeLockTimeout) {
3074+
locks.Unlock(ctx, lockContext, nvmeSelfHealingSessionLock)
3075+
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
3076+
}
30553077
disconnect := p.nvmeHandler.RemovePublishedNVMeSession(&publishedNVMeSessions, publishInfo.NVMeSubsystemNQN,
30563078
publishInfo.NVMeNamespaceUUID)
3079+
locks.Unlock(ctx, lockContext, nvmeSelfHealingSessionLock)
30573080

30583081
nvmeSubsys := p.nvmeHandler.NewNVMeSubsystem(ctx, publishInfo.NVMeSubsystemNQN)
30593082
// Get the device using 'nvme-cli' commands. Flush the device IOs.
@@ -3105,13 +3128,17 @@ func (p *Plugin) nodeUnstageNVMeVolume(
31053128
if !nvmeDev.IsNil() {
31063129
// If flush fails, give a grace period of 6 minutes (nvmeMaxFlushWaitDuration) before giving up.
31073130
if err := nvmeDev.FlushDevice(ctx, p.unsafeDetach, force); err != nil {
3131+
locks.Lock(ctx, "nodeUnstageNVMeVolume.FlushRetryCheck", nvmeFlushRetryMapLock)
31083132
if NVMeNamespacesFlushRetry[publishInfo.NVMeNamespaceUUID].IsZero() {
31093133
NVMeNamespacesFlushRetry[publishInfo.NVMeNamespaceUUID] = time.Now()
3134+
locks.Unlock(ctx, "nodeUnstageNVMeVolume.FlushRetryCheck", nvmeFlushRetryMapLock)
31103135
return nil, fmt.Errorf("failed to flush NVMe device; %v", err)
31113136
}
31123137

31133138
// If the max wait time for flush isn't hit yet, fail and let the CSI node agent call again.
31143139
elapsed := time.Since(NVMeNamespacesFlushRetry[publishInfo.NVMeNamespaceUUID])
3140+
locks.Unlock(ctx, "nodeUnstageNVMeVolume.FlushRetryCheck", nvmeFlushRetryMapLock)
3141+
31153142
if elapsed <= nvmeMaxFlushWaitDuration {
31163143
Logc(ctx).WithFields(LogFields{
31173144
"devicePath": devicePath,
@@ -3128,27 +3155,17 @@ func (p *Plugin) nodeUnstageNVMeVolume(
31283155
}).Warn("Could not flush device within expected time period.")
31293156
}
31303157

3158+
locks.Lock(ctx, "nodeUnstageNVMeVolume.FlushRetryDelete", nvmeFlushRetryMapLock)
31313159
delete(NVMeNamespacesFlushRetry, publishInfo.NVMeNamespaceUUID)
3160+
locks.Unlock(ctx, "nodeUnstageNVMeVolume.FlushRetryDelete", nvmeFlushRetryMapLock)
31323161
}
31333162

3134-
// Get the number of namespaces associated with the subsystem
3135-
numNs, err := nvmeSubsys.GetNamespaceCount(ctx)
3136-
if err != nil {
3137-
Logc(ctx).WithField(
3138-
"subsystem", publishInfo.NVMeSubsystemNQN,
3139-
).WithError(err).Debug("Error getting Namespace count.")
3163+
// Disconnect the subsystem if needed (handled under lock to prevent race conditions)
3164+
if err := p.disconnectNVMeSubsystemIfNeeded(ctx, nvmeSubsys, publishInfo, disconnect); err != nil {
3165+
Logc(ctx).WithError(err).Warn("Error during subsystem disconnect check.")
3166+
// Continue with cleanup even if disconnect fails
31403167
}
31413168

3142-
// If number of namespaces is more than 1, don't disconnect the subsystem. If we get any issues while getting the
3143-
// number of namespaces through the CLI, we can rely on the disconnect flag from NVMe self-healing sessions (if
3144-
// NVMe self-healing is enabled), which keeps track of namespaces associated with the subsystem.
3145-
if (err == nil && numNs <= 1) || (p.nvmeSelfHealingInterval > 0 && err != nil && disconnect) {
3146-
if err := nvmeSubsys.Disconnect(ctx); err != nil {
3147-
Logc(ctx).WithField(
3148-
"subsystem", publishInfo.NVMeSubsystemNQN,
3149-
).WithError(err).Debug("Error disconnecting subsystem.")
3150-
}
3151-
}
31523169
if err := afterNvmeDisconnect.Inject(); err != nil {
31533170
return nil, err
31543171
}
@@ -3195,12 +3212,10 @@ func (p *Plugin) nodePublishNVMeVolume(
31953212
Logc(ctx).Debug(">>>> nodePublishNVMeVolume")
31963213
defer Logc(ctx).Debug("<<<< nodePublishNVMeVolume")
31973214

3198-
// Serializing all the parallel requests by relying on the constant var.
3199-
lockContext := "NodePublishNVMeVolume-" + req.GetVolumeId()
3200-
defer locks.Unlock(ctx, lockContext, nodeLockID)
3201-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
3202-
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
3215+
if err := p.limiterSharedMap[NodePublishNVMeVolume].Wait(ctx); err != nil {
3216+
return nil, err
32033217
}
3218+
defer p.limiterSharedMap[NodePublishNVMeVolume].Release(ctx)
32043219

32053220
var err error
32063221

@@ -3319,8 +3334,15 @@ func (p *Plugin) nodeStageSANVolume(
33193334
// performNVMeSelfHealing inspects the desired state of the NVMe sessions with the current state and accordingly
33203335
// identifies candidate sessions that require remediation. This function is invoked periodically.
33213336
func (p *Plugin) performNVMeSelfHealing(ctx context.Context) {
3322-
locks.Lock(ctx, nvmeSelfHealingLockContext, nodeLockID)
3323-
defer locks.Unlock(ctx, nvmeSelfHealingLockContext, nodeLockID)
3337+
nvmeSelfHealingLock.Lock()
3338+
defer nvmeSelfHealingLock.Unlock()
3339+
3340+
lockContext := "performNVMeSelfHealing.SessionLock"
3341+
defer locks.Unlock(ctx, lockContext, nvmeSelfHealingSessionLock)
3342+
if !attemptLock(ctx, lockContext, nvmeSelfHealingSessionLock, csiNodeLockTimeout) {
3343+
Logc(ctx).WithError(fmt.Errorf("request waited too long for the lock"))
3344+
return
3345+
}
33243346

33253347
defer func() {
33263348
if r := recover(); r != nil {
@@ -3367,13 +3389,52 @@ func (p *Plugin) fixNVMeSessions(ctx context.Context, stopAt time.Time, subsyste
33673389
}
33683390

33693391
// 1. We should fix at least one subsystem in a single self-healing thread.
3370-
// 2. If there's another thread waiting for the node lock and if we have exceeded our 60 secs lock, we should
3392+
// 2. If there's another thread waiting and if we have exceeded our 60 secs lock, we should
33713393
// stop NVMe self-healing.
3372-
if index > 0 && locks.WaitQueueSize(nodeLockID) > 0 && time.Now().After(stopAt) {
3394+
if index > 0 && nvmeNodeOperationWaitingCount.Load() > 0 && time.Now().After(stopAt) {
33733395
Logc(ctx).Info("Self-healing has exceeded maximum runtime; preempting NVMe session self-healing.")
33743396
break
33753397
}
33763398

33773399
p.nvmeHandler.RectifyNVMeSession(ctx, sub, &publishedNVMeSessions)
33783400
}
33793401
}
3402+
3403+
// disconnectNVMeSubsystemIfNeeded checks if the subsystem should be disconnected and performs the disconnect
3404+
// operation under lock to prevent race conditions with concurrent unstage operations.
3405+
// This lock serializes GetNamespaceCount() and Disconnect() operations to ensure accurate namespace counting
3406+
// and prevent race conditions where multiple threads might see the same count simultaneously.
3407+
func (p *Plugin) disconnectNVMeSubsystemIfNeeded(
3408+
ctx context.Context, nvmeSubsys nvme.NVMeSubsystemInterface, publishInfo *models.VolumePublishInfo, disconnect bool,
3409+
) error {
3410+
// Acquire lock specific to disconnect operations to serialize GetNamespaceCount and Disconnect calls.
3411+
// This prevents race conditions where multiple concurrent unstage operations might read the same
3412+
// namespace count and make incorrect disconnect decisions.
3413+
lockContext := "disconnectNVMeSubsystemIfNeeded"
3414+
if !attemptLock(ctx, lockContext, nvmeSubsystemDisconnectLock, csiNodeLockTimeout) {
3415+
locks.Unlock(ctx, lockContext, nvmeSubsystemDisconnectLock)
3416+
return status.Error(codes.Aborted, "request waited too long for the lock")
3417+
}
3418+
defer locks.Unlock(ctx, lockContext, nvmeSubsystemDisconnectLock)
3419+
3420+
// Get the number of namespaces associated with the subsystem (inside lock to avoid race conditions)
3421+
numNs, err := nvmeSubsys.GetNamespaceCount(ctx)
3422+
if err != nil {
3423+
Logc(ctx).WithField(
3424+
"subsystem", publishInfo.NVMeSubsystemNQN,
3425+
).WithError(err).Debug("Error getting Namespace count.")
3426+
}
3427+
3428+
// If number of namespaces is more than 1, don't disconnect the subsystem. If we get any issues while getting the
3429+
// number of namespaces through the CLI, we can rely on the disconnect flag from NVMe self-healing sessions (if
3430+
// NVMe self-healing is enabled), which keeps track of namespaces associated with the subsystem.
3431+
if (err == nil && numNs <= 1) || (p.nvmeSelfHealingInterval > 0 && err != nil && disconnect) {
3432+
if err := nvmeSubsys.Disconnect(ctx); err != nil {
3433+
Logc(ctx).WithField(
3434+
"subsystem", publishInfo.NVMeSubsystemNQN,
3435+
).WithError(err).Debug("Error disconnecting subsystem.")
3436+
return err
3437+
}
3438+
}
3439+
return nil
3440+
}

0 commit comments

Comments
 (0)