Skip to content

Commit 82d8068

Browse files
authored
Concurrent core - locks for snapmirror workflow
1 parent 25ef8c9 commit 82d8068

File tree

8 files changed

+2133
-75
lines changed

8 files changed

+2133
-75
lines changed

core/concurrent_core.go

Lines changed: 295 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ConcurrentTridentOrchestrator struct {
5757

5858
var (
5959
_ Orchestrator = &ConcurrentTridentOrchestrator{}
60-
supportedBackends = []string{"ontap-san", "fake"}
60+
supportedBackends = []string{"ontap-san", "fake", "ontap-nas"}
6161
)
6262

6363
func NewConcurrentTridentOrchestrator(client persistentstore.Client) (Orchestrator, error) {
@@ -1084,17 +1084,11 @@ func (o *ConcurrentTridentOrchestrator) GetBackendByBackendUUID(
10841084

10851085
defer recordTiming("backend_get", &err)()
10861086

1087-
results, unlocker, err := db.Lock(db.Query(db.ReadBackend(backendUUID)))
1088-
defer unlocker()
1087+
backend, err := getInconsistentBackendByUUID(backendUUID)
10891088
if err != nil {
10901089
return nil, err
10911090
}
10921091

1093-
backend := results[0].Backend.Read
1094-
if backend == nil {
1095-
return nil, errors.NotFoundError("backend with UUID %v was not found", backendUUID)
1096-
}
1097-
10981092
backendExternal = backend.ConstructExternalWithPoolMap(ctx,
10991093
o.GetStorageClassPoolMap().StorageClassNamesForBackendName(ctx, backend.Name()))
11001094

@@ -1108,6 +1102,24 @@ func (o *ConcurrentTridentOrchestrator) GetBackendByBackendUUID(
11081102
return backendExternal, nil
11091103
}
11101104

1105+
// NOTE: DO NOT USE THIS FUNCTION INSIDE ANOTHER LOCK CONTEXT
1106+
func getInconsistentBackendByUUID(
1107+
backendUUID string,
1108+
) (storage.Backend, error) {
1109+
results, unlocker, err := db.Lock(db.Query(db.InconsistentReadBackend(backendUUID)))
1110+
defer unlocker()
1111+
if err != nil {
1112+
return nil, err
1113+
}
1114+
1115+
backend := results[0].Backend.Read
1116+
if backend == nil {
1117+
return nil, errors.NotFoundError("backend with UUID %v was not found", backendUUID)
1118+
}
1119+
1120+
return backend, nil
1121+
}
1122+
11111123
func (o *ConcurrentTridentOrchestrator) ListBackends(
11121124
ctx context.Context,
11131125
) (backendExternals []*storage.BackendExternal, err error) {
@@ -5642,44 +5654,302 @@ func (o *ConcurrentTridentOrchestrator) DeleteVolumeTransaction(ctx context.Cont
56425654
return o.storeClient.DeleteVolumeTransaction(ctx, volTxn)
56435655
}
56445656

5645-
func (o *ConcurrentTridentOrchestrator) EstablishMirror(ctx context.Context, backendUUID, localInternalVolumeName, remoteVolumeHandle, replicationPolicy, replicationSchedule string) error {
5646-
return fmt.Errorf("EstablishMirror is not implemented in concurrent core")
5657+
func (o *ConcurrentTridentOrchestrator) EstablishMirror(
5658+
ctx context.Context, backendUUID, volumeName, localInternalVolumeName, remoteVolumeHandle, replicationPolicy,
5659+
replicationSchedule string,
5660+
) error {
5661+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5662+
5663+
var err error
5664+
5665+
if o.bootstrapError != nil {
5666+
return o.bootstrapError
5667+
}
5668+
defer recordTiming("mirror_establish", &err)()
5669+
5670+
backend, err := getInconsistentBackendByUUID(backendUUID)
5671+
if err != nil {
5672+
return err
5673+
}
5674+
5675+
if !backend.CanMirror() {
5676+
return fmt.Errorf("backend does not support mirroring")
5677+
}
5678+
5679+
_, unlocker, err := db.Lock(db.Query(
5680+
db.UpsertVolume(volumeName, backendUUID),
5681+
))
5682+
defer unlocker()
5683+
if err != nil {
5684+
Logc(ctx).WithField("volume", volumeName).WithError(err).Error(
5685+
"Failed to lock volume for upsert.")
5686+
return err
5687+
}
5688+
5689+
return backend.EstablishMirror(ctx, localInternalVolumeName, remoteVolumeHandle, replicationPolicy,
5690+
replicationSchedule)
56475691
}
56485692

5649-
func (o *ConcurrentTridentOrchestrator) ReestablishMirror(ctx context.Context, backendUUID, localInternalVolumeName, remoteVolumeHandle, replicationPolicy, replicationSchedule string) error {
5650-
return fmt.Errorf("ReestablishMirror is not implemented in concurrent core")
5693+
func (o *ConcurrentTridentOrchestrator) ReestablishMirror(
5694+
ctx context.Context, backendUUID, volumeName, localInternalVolumeName, remoteVolumeHandle, replicationPolicy,
5695+
replicationSchedule string,
5696+
) error {
5697+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5698+
5699+
var err error
5700+
if o.bootstrapError != nil {
5701+
return o.bootstrapError
5702+
}
5703+
defer recordTiming("mirror_reestablish", &err)()
5704+
5705+
backend, err := getInconsistentBackendByUUID(backendUUID)
5706+
if err != nil {
5707+
return err
5708+
}
5709+
5710+
if !backend.CanMirror() {
5711+
return fmt.Errorf("backend does not support mirroring")
5712+
}
5713+
5714+
_, unlocker, err := db.Lock(db.Query(
5715+
db.UpsertVolume(volumeName, backendUUID),
5716+
))
5717+
defer unlocker()
5718+
if err != nil {
5719+
Logc(ctx).WithField("volume", volumeName).WithError(err).Error(
5720+
"Failed to lock volume for upsert.")
5721+
return err
5722+
}
5723+
5724+
return backend.ReestablishMirror(ctx, localInternalVolumeName, remoteVolumeHandle, replicationPolicy,
5725+
replicationSchedule)
56515726
}
56525727

5653-
func (o *ConcurrentTridentOrchestrator) PromoteMirror(ctx context.Context, backendUUID, localInternalVolumeName, remoteVolumeHandle, snapshotHandle string) (bool, error) {
5654-
return false, fmt.Errorf("PromoteMirror is not implemented in concurrent core")
5728+
func (o *ConcurrentTridentOrchestrator) PromoteMirror(
5729+
ctx context.Context, backendUUID, volumeName, localInternalVolumeName, remoteVolumeHandle, snapshotHandle string,
5730+
) (bool, error) {
5731+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5732+
5733+
var err error
5734+
if o.bootstrapError != nil {
5735+
return false, o.bootstrapError
5736+
}
5737+
defer recordTiming("mirror_promote", &err)()
5738+
5739+
backend, err := getInconsistentBackendByUUID(backendUUID)
5740+
if err != nil {
5741+
return false, err
5742+
}
5743+
5744+
if !backend.CanMirror() {
5745+
return false, fmt.Errorf("backend does not support mirroring")
5746+
}
5747+
5748+
_, unlocker, err := db.Lock(db.Query(
5749+
db.UpsertVolume(volumeName, backendUUID),
5750+
))
5751+
defer unlocker()
5752+
if err != nil {
5753+
Logc(ctx).WithField("volume", volumeName).WithError(err).Error(
5754+
"Failed to lock volume for upsert.")
5755+
return false, err
5756+
}
5757+
5758+
return backend.PromoteMirror(ctx, localInternalVolumeName, remoteVolumeHandle, snapshotHandle)
56555759
}
56565760

56575761
func (o *ConcurrentTridentOrchestrator) GetMirrorStatus(ctx context.Context, backendUUID, localInternalVolumeName, remoteVolumeHandle string) (string, error) {
5658-
return "", fmt.Errorf("GetMirrorStatus is not implemented in concurrent core")
5762+
var err error
5763+
5764+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5765+
5766+
if o.bootstrapError != nil {
5767+
return "", o.bootstrapError
5768+
}
5769+
defer recordTiming("mirror_status", &err)()
5770+
5771+
backend, err := getInconsistentBackendByUUID(backendUUID)
5772+
if err != nil {
5773+
return "", err
5774+
}
5775+
5776+
if !backend.CanMirror() {
5777+
return "", fmt.Errorf("backend does not support mirroring")
5778+
}
5779+
return backend.GetMirrorStatus(ctx, localInternalVolumeName, remoteVolumeHandle)
56595780
}
56605781

56615782
func (o *ConcurrentTridentOrchestrator) CanBackendMirror(ctx context.Context, backendUUID string) (bool, error) {
5662-
return false, fmt.Errorf("CanBackendMirror is not implemented in concurrent core")
5783+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5784+
5785+
var err error
5786+
5787+
if o.bootstrapError != nil {
5788+
return false, o.bootstrapError
5789+
}
5790+
defer recordTiming("mirror_capable", &err)()
5791+
5792+
backend, err := getInconsistentBackendByUUID(backendUUID)
5793+
if err != nil {
5794+
return false, err
5795+
}
5796+
return backend.CanMirror(), nil
56635797
}
56645798

5665-
func (o *ConcurrentTridentOrchestrator) ReleaseMirror(ctx context.Context, backendUUID, localInternalVolumeName string) error {
5666-
return fmt.Errorf("ReleaseMirror is not implemented in concurrent core")
5799+
func (o *ConcurrentTridentOrchestrator) ReleaseMirror(ctx context.Context, backendUUID, volumeName, localInternalVolumeName string) error {
5800+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5801+
5802+
var err error
5803+
if o.bootstrapError != nil {
5804+
return o.bootstrapError
5805+
}
5806+
defer recordTiming("mirror_release", &err)()
5807+
5808+
backend, err := getInconsistentBackendByUUID(backendUUID)
5809+
if err != nil {
5810+
return err
5811+
}
5812+
5813+
if !backend.CanMirror() {
5814+
return fmt.Errorf("backend does not support mirroring")
5815+
}
5816+
5817+
_, unlocker, err := db.Lock(db.Query(
5818+
db.UpsertVolume(volumeName, backendUUID),
5819+
))
5820+
defer unlocker()
5821+
if err != nil {
5822+
Logc(ctx).WithField("volume", volumeName).WithError(err).Error(
5823+
"Failed to lock volume for upsert.")
5824+
return err
5825+
}
5826+
5827+
return backend.ReleaseMirror(ctx, localInternalVolumeName)
56675828
}
56685829

56695830
func (o *ConcurrentTridentOrchestrator) GetReplicationDetails(ctx context.Context, backendUUID, localInternalVolumeName, remoteVolumeHandle string) (string, string, string, error) {
5670-
return "", "", "", fmt.Errorf("GetReplicationDetails is not implemented in concurrent core")
5831+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5832+
5833+
var err error
5834+
if o.bootstrapError != nil {
5835+
return "", "", "", o.bootstrapError
5836+
}
5837+
defer recordTiming("replication_details", &err)()
5838+
5839+
backend, err := getInconsistentBackendByUUID(backendUUID)
5840+
if err != nil {
5841+
return "", "", "", err
5842+
}
5843+
5844+
if !backend.CanMirror() {
5845+
return "", "", "", fmt.Errorf("backend does not support mirroring")
5846+
}
5847+
5848+
return backend.GetReplicationDetails(ctx, localInternalVolumeName, remoteVolumeHandle)
56715849
}
56725850

5673-
func (o *ConcurrentTridentOrchestrator) UpdateMirror(ctx context.Context, pvcVolumeName, snapshotName string) error {
5674-
return fmt.Errorf("UpdateMirror is not implemented in concurrent core")
5851+
func (o *ConcurrentTridentOrchestrator) UpdateMirror(ctx context.Context, volumeName, snapshotName string) error {
5852+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5853+
5854+
var err error
5855+
if o.bootstrapError != nil {
5856+
return o.bootstrapError
5857+
}
5858+
defer recordTiming("update_mirror", &err)()
5859+
5860+
// Get volume
5861+
tridentVolume, err := o.getVolume(ctx, volumeName)
5862+
if err != nil {
5863+
return fmt.Errorf("could not find volume '%v' in Trident; %v", volumeName, err)
5864+
}
5865+
5866+
backendUUID := tridentVolume.BackendUUID
5867+
5868+
backend, err := getInconsistentBackendByUUID(backendUUID)
5869+
if err != nil {
5870+
return fmt.Errorf("backend %s not found", backendUUID)
5871+
}
5872+
5873+
if !backend.CanMirror() {
5874+
return fmt.Errorf("backend does not support mirroring")
5875+
}
5876+
5877+
_, unlocker, err := db.Lock(db.Query(
5878+
db.UpsertVolume(volumeName, backendUUID),
5879+
))
5880+
defer unlocker()
5881+
if err != nil {
5882+
Logc(ctx).WithField("volume", volumeName).WithError(err).Error(
5883+
"Failed to lock volume for upsert.")
5884+
return err
5885+
}
5886+
5887+
logFields := LogFields{
5888+
"volume": volumeName,
5889+
"snapshot": snapshotName,
5890+
}
5891+
5892+
// Mirror update
5893+
Logc(ctx).WithFields(logFields).Info("Mirror update in progress.")
5894+
return backend.UpdateMirror(ctx, tridentVolume.Config.InternalName, snapshotName)
56755895
}
56765896

5677-
func (o *ConcurrentTridentOrchestrator) CheckMirrorTransferState(ctx context.Context, pvcVolumeName string) (*time.Time, error) {
5678-
return nil, fmt.Errorf("CheckMirrorTransferState is not implemented in concurrent core")
5897+
func (o *ConcurrentTridentOrchestrator) CheckMirrorTransferState(ctx context.Context, volumeName string) (*time.Time, error) {
5898+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5899+
5900+
var err error
5901+
if o.bootstrapError != nil {
5902+
return nil, o.bootstrapError
5903+
}
5904+
defer recordTiming("check_mirror_transfer_state", &err)()
5905+
5906+
// Get volume
5907+
tridentVolume, err := o.getVolume(ctx, volumeName)
5908+
if err != nil {
5909+
return nil, fmt.Errorf("could not find volume '%v' in Trident; %v", volumeName, err)
5910+
}
5911+
5912+
// Get backend to ensure it can mirror
5913+
backend, err := getInconsistentBackendByUUID(tridentVolume.BackendUUID)
5914+
if err != nil {
5915+
return nil, fmt.Errorf("backend %s not found", tridentVolume.BackendUUID)
5916+
}
5917+
5918+
if !backend.CanMirror() {
5919+
return nil, fmt.Errorf("backend does not support mirroring")
5920+
}
5921+
5922+
// Check transfer state of mirror relationship
5923+
return backend.CheckMirrorTransferState(ctx, tridentVolume.Config.InternalName)
56795924
}
56805925

5681-
func (o *ConcurrentTridentOrchestrator) GetMirrorTransferTime(ctx context.Context, pvcVolumeName string) (*time.Time, error) {
5682-
return nil, fmt.Errorf("GetMirrorTransferTime is not implemented in concurrent core")
5926+
func (o *ConcurrentTridentOrchestrator) GetMirrorTransferTime(ctx context.Context, volumeName string) (*time.Time, error) {
5927+
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
5928+
5929+
var err error
5930+
if o.bootstrapError != nil {
5931+
return nil, o.bootstrapError
5932+
}
5933+
defer recordTiming("get_mirror_transfer_time", &err)()
5934+
5935+
// Get volume
5936+
tridentVolume, err := o.getVolume(ctx, volumeName)
5937+
if err != nil {
5938+
return nil, fmt.Errorf("could not find volume '%v' in Trident; %v", volumeName, err)
5939+
}
5940+
5941+
// Get backend to ensure it can mirror
5942+
backend, err := getInconsistentBackendByUUID(tridentVolume.BackendUUID)
5943+
if err != nil {
5944+
return nil, fmt.Errorf("backend %s not found", tridentVolume.BackendUUID)
5945+
}
5946+
5947+
if !backend.CanMirror() {
5948+
return nil, fmt.Errorf("backend does not support mirroring")
5949+
}
5950+
5951+
// Get last transfer time of mirror relationship
5952+
return backend.GetMirrorTransferTime(ctx, tridentVolume.Config.InternalName)
56835953
}
56845954

56855955
func (o *ConcurrentTridentOrchestrator) GetCHAP(

0 commit comments

Comments
 (0)