diff --git a/build.env b/build.env index f25018ad303..a2acc5663b0 100644 --- a/build.env +++ b/build.env @@ -15,7 +15,7 @@ CSI_IMAGE_VERSION=canary CSI_UPGRADE_VERSION=v3.12.1 # Ceph version to use -BASE_IMAGE=quay.io/ceph/ceph:v19 +BASE_IMAGE=quay.ceph.io/ceph-ci/ceph:wip-pkalever-rbd-group-snap-mirror CEPH_VERSION=squid # standard Golang options diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index be0864b6061..61a8a321449 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -29,11 +29,13 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/csi-addons/spec/lib/go/replication" "google.golang.org/grpc" @@ -255,9 +257,9 @@ func validateSchedulingInterval(interval string) error { func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, req *replication.EnableVolumeReplicationRequest, ) (*replication.EnableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -270,24 +272,23 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) @@ -307,17 +308,18 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } if info.GetState() != librbd.MirrorImageEnabled.String() { - err = rbdVol.HandleParentImageExistence(ctx, flattenMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) - - return nil, getGRPCError(err) - } - err = mirror.EnableMirroring(ctx, mirroringMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, rbdVol := range volumes { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err) + return nil, getGRPCError(err) + } + err = mirror.EnableMirroring(ctx, mirroringMode) + if err != nil { + log.ErrorLog(ctx, err.Error()) - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } } } @@ -330,9 +332,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, req *replication.DisableVolumeReplicationRequest, ) (*replication.DisableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -340,24 +342,23 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the force option force, err := getForceOption(ctx, req.GetParameters()) @@ -376,7 +377,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling case librbd.MirrorImageDisabling.String(): - return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) + return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID) case librbd.MirrorImageEnabled.String(): err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { @@ -398,9 +399,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context, req *replication.PromoteVolumeRequest, ) (*replication.PromoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -408,24 +409,23 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -438,7 +438,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } @@ -475,10 +475,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } log.DebugLog( ctx, - "Added scheduling at interval %s, start time %s for volume %s", + "Added scheduling at interval %s, start time %s for Id %s", interval, startTime, - rbdVol) + reqID) } return &replication.PromoteVolumeResponse{}, nil @@ -491,9 +491,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context, req *replication.DemoteVolumeRequest, ) (*replication.DemoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -501,31 +501,23 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) - if err != nil { - return nil, getGRPCError(err) - } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - creationTime, err := rbdVol.GetCreationTime(ctx) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - log.ErrorLog(ctx, err.Error()) + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -538,24 +530,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } // demote image to secondary if info.IsPrimary() { - // store the image creation time for resync - _, err = rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && errors.Is(err, librbd.ErrNotFound) { - log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol) - err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) - } - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, vol := range volumes { + // store the image creation time for resync + creationTime, cErr := vol.GetCreationTime(ctx) + if cErr != nil { + log.ErrorLog(ctx, cErr.Error()) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, cErr.Error()) + } + // store the image creation time for resync + _, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && errors.Is(err, librbd.ErrNotFound) { + log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol) + err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) + } + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) + } + } err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -602,9 +603,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -612,23 +613,23 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } defer rs.VolumeLocks.Release(volumeID) + mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -693,35 +694,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetCreationTime(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) - } - - // image creation time is stored in the image metadata. it looks like - // `"seconds:1692879841 nanos:631526669"` - // If the image gets resynced the local image creation time will be - // lost, if the keys is not present in the image metadata then we can - // assume that the image is already resynced. - savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && !errors.Is(err, librbd.ErrNotFound) { - return nil, status.Errorf(codes.Internal, - "failed to get %s key from image metadata for %s: %s", - imageCreationTimeKey, - rbdVol, - err.Error()) - } + for _, vol := range volumes { + creationTime, tErr := vol.GetCreationTime(ctx) + if tErr != nil { + return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error()) + } - if savedImageTime != "" { - st, sErr := timestampFromString(savedImageTime) - if sErr != nil { - return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + // image creation time is stored in the image metadata. it looks like + // `"seconds:1692879841 nanos:631526669"` + // If the image gets resynced the local image creation time will be + // lost, if the keys is not present in the image metadata then we can + // assume that the image is already resynced. + var savedImageTime string + savedImageTime, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && !errors.Is(err, librbd.ErrNotFound) { + return nil, status.Errorf(codes.Internal, + "failed to get %s key from image metadata for %s: %s", + imageCreationTimeKey, + vol, + err.Error()) } - log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime) - if req.GetForce() && st.Equal(*creationTime) { - err = mirror.Resync(ctx) - if err != nil { - return nil, getGRPCError(err) + + if savedImageTime != "" { + st, sErr := timestampFromString(savedImageTime) + if sErr != nil { + return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + } + log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime) + if req.GetForce() && st.Equal(*creationTime) { + err = mirror.Resync(ctx) + if err != nil { + return nil, getGRPCError(err) + } + // Break the loop as we need to issue resync only once for the image or for the group. + break } } } @@ -733,9 +739,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } } - err = rbdVol.RepairResyncedImageID(ctx, ready) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + // update imageID for all the volumes + for _, vol := range volumes { + err = vol.RepairResyncedImageID(ctx, ready) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + } } resp := &replication.ResyncVolumeResponse{ @@ -785,13 +794,14 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ - corerbd.ErrImageNotFound: codes.NotFound, - util.ErrPoolNotFound: codes.NotFound, - corerbd.ErrInvalidArgument: codes.InvalidArgument, - corerbd.ErrFlattenInProgress: codes.Aborted, - corerbd.ErrAborted: codes.Aborted, - corerbd.ErrFailedPrecondition: codes.FailedPrecondition, - corerbd.ErrUnavailable: codes.Unavailable, + corerbd.ErrImageNotFound: codes.NotFound, + util.ErrPoolNotFound: codes.NotFound, + corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, + corerbd.ErrAborted: codes.Aborted, + corerbd.ErrFailedPrecondition: codes.FailedPrecondition, + corerbd.ErrUnavailable: codes.Unavailable, + rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable, } for e, code := range errorStatusMap { @@ -809,9 +819,9 @@ func getGRPCError(err error) error { func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, req *replication.GetVolumeReplicationInfoRequest, ) (*replication.GetVolumeReplicationInfoResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -821,36 +831,23 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) + mgr := rbd.NewManager(rs.driverInstance, nil, req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) - if err != nil { - log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err) - - switch { - case errors.Is(err, corerbd.ErrImageNotFound): - err = status.Error(codes.NotFound, err.Error()) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Error(codes.NotFound, err.Error()) - default: - err = status.Error(codes.Internal, err.Error()) - } - - return nil, err - } - mirror, err := rbdVol.ToMirror() + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - log.ErrorLog(ctx, "failed to convert volume %q to mirror type: %v", rbdVol, err) + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -879,6 +876,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } + log.DebugLog(ctx, "mirrorStatus: %+v", mirrorStatus) remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx) if err != nil { log.ErrorLog(ctx, "failed to get remote site status for mirror %q: %v", mirror, err) @@ -902,6 +900,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err) } + log.DebugLog(ctx, "Madhu the response is %v", resp) return resp, nil } @@ -926,7 +925,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV if description == "" { return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound) } - log.DebugLog(ctx, "description: %s", description) + log.DebugLog(ctx, "Madhu description: %s", description) splittedString := strings.SplitN(description, ",", 2) if len(splittedString) == 1 { return nil, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound) @@ -943,6 +942,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err) } + log.DebugLog(ctx, "Madhu the description after unmarshalling is %v", localSnapInfo) // If the json unmarsal is successful but the local snapshot time is 0, we // need to consider it as an error as the LastSyncTime is required. if localSnapInfo.LocalSnapshotTime == 0 { @@ -968,6 +968,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV response.LastSyncTime = lastSyncTime response.LastSyncBytes = localSnapInfo.LastSnapshotBytes + log.DebugLog(ctx, "Madhu the return response is %v", response) return &response, nil } @@ -987,3 +988,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) return nil } + +// destoryVolumes destroys the volume connections. +func destoryVolumes(ctx context.Context, volumes []types.Volume) { + for _, vol := range volumes { + vol.Destroy(ctx) + } +} diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 4727e548b80..e373132b60c 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -27,6 +27,7 @@ import ( "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util/log" + "github.com/csi-addons/spec/lib/go/replication" "github.com/csi-addons/spec/lib/go/volumegroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -90,11 +91,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( // resolve all volumes volumes := make([]types.Volume, len(req.GetVolumeIds())) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range req.GetVolumeIds() { vol, err := mgr.GetVolumeByID(ctx, id) if err != nil { @@ -210,27 +207,41 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup( log.DebugLog(ctx, "VolumeGroup %q has been found", req.GetVolumeGroupId()) - // verify that the volume group is empty - volumes, err := vg.ListVolumes(ctx) + volumes, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{ + Type: &replication.ReplicationSource_Volumegroup{ + Volumegroup: &replication.ReplicationSource_VolumeGroupSource{ + VolumeGroupId: req.GetVolumeGroupId(), + }, + }, + }) if err != nil { - return nil, status.Errorf( - codes.NotFound, - "could not list volumes for voluem group %q: %s", - req.GetVolumeGroupId(), - err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) - log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)) + vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx) - if len(volumes) != 0 { - return nil, status.Errorf( - codes.FailedPrecondition, - "rejecting to delete non-empty volume group %q", - req.GetVolumeGroupId()) + // verify that the volume group is empty, if the group is primary + if vgrMirrorInfo.IsPrimary() { + volumes, err := vg.ListVolumes(ctx) + if err != nil { + return nil, status.Errorf( + codes.NotFound, + "could not list volumes for volume group %q: %s", + req.GetVolumeGroupId(), + err.Error()) + } + log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)) + if len(volumes) != 0 { + return nil, status.Errorf( + codes.FailedPrecondition, + "rejecting to delete non-empty volume group %q", + req.GetVolumeGroupId()) + } } // delete the volume group - err = vg.Delete(ctx) + err = vg.Delete(ctx, vgrMirrorInfo, mirror) if err != nil { return nil, status.Errorf(codes.Internal, "failed to delete volume group %q: %s", @@ -340,10 +351,27 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( } } + vol, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{ + Type: &replication.ReplicationSource_Volumegroup{ + Volumegroup: &replication.ReplicationSource_VolumeGroupSource{ + VolumeGroupId: req.GetVolumeGroupId(), + }, + }, + }) + if err != nil { + return nil, getGRPCError(err) + } + defer destoryVolumes(ctx, vol) + + vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx) + + // Skip removing images from group if the group is secondary + removeImageFromGroup := vgrMirrorInfo.IsPrimary() + // remove the volume that should not be part of the group for _, id := range toRemove { vol := beforeIDs[id] - err = vg.RemoveVolume(ctx, vol) + err = vg.RemoveVolume(ctx, vol, removeImageFromGroup) if err != nil { return nil, status.Errorf( codes.Internal, @@ -356,11 +384,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( // resolve all volumes volumes := make([]types.Volume, len(toAdd)) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range toAdd { var vol types.Volume vol, err = mgr.GetVolumeByID(ctx, id) diff --git a/internal/rbd/group/group_mirror.go b/internal/rbd/group/group_mirror.go new file mode 100644 index 00000000000..751fd4d67ca --- /dev/null +++ b/internal/rbd/group/group_mirror.go @@ -0,0 +1,392 @@ +/* +Copyright 2024 The Ceph-CSI Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/ceph/go-ceph/rados" + librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" + + "github.com/ceph/ceph-csi/internal/rbd/types" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" +) + +var ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable") + +type volumeGroupMirror struct { + *volumeGroup +} + +func (vg volumeGroupMirror) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupEnable(ioctx, name, mode) + if err != nil { + return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg) + + return nil +} + +func (vg volumeGroupMirror) DisableMirroring(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDisable(ioctx, name, force) + if err != nil && !errors.Is(rados.ErrNotFound, err) { + return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg) + + return nil +} + +func (vg volumeGroupMirror) Promote(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupPromote(ioctx, name, force) + if err != nil { + return fmt.Errorf("failed to promote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been promoted", vg) + + return nil +} + +func (vg volumeGroupMirror) ForcePromote(ctx context.Context, cr *util.Credentials) error { + promoteArgs := []string{ + "mirror", "group", "promote", + vg.String(), + "--force", + "--id", cr.ID, + "-m", vg.monitors, + "--keyfile=" + cr.KeyFile, + } + _, stderr, err := util.ExecCommandWithTimeout( + ctx, + // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. + 2*time.Minute, + "rbd", + promoteArgs..., + ) + if err != nil { + return fmt.Errorf("failed to promote group %q with error: %w", vg, err) + } + + if stderr != "" { + return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr) + } + + log.DebugLog(ctx, "volume group %q has been force promoted", vg) + + return nil +} + +func (vg volumeGroupMirror) Demote(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDemote(ioctx, name) + if err != nil { + return fmt.Errorf("failed to demote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been demoted", vg) + + return nil +} + +func (vg volumeGroupMirror) Resync(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupResync(ioctx, name) + if err != nil { + return fmt.Errorf("failed to resync volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "issued resync on volume group %q", vg) + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable) +} + +func (vg volumeGroupMirror) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + + info, err := librbd.GetMirrorGroupInfo(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring info %q: %w", vg, err) + } + + return &groupInfo{MirrorGroupInfo: info}, nil +} + +func (vg volumeGroupMirror) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + statusInfo, err := librbd.GetGlobalMirrorGroupStatus(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring status %q: %w", vg, err) + } + + return globalMirrorGroupStatus{GlobalMirrorGroupStatus: &statusInfo}, nil +} + +func (vg volumeGroupMirror) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error { + ls := admin.NewLevelSpec(vg.pool, vg.namespace, "") + ra, err := vg.conn.GetRBDAdmin() + if err != nil { + return err + } + adminConn := ra.MirrorSnashotSchedule() + err = adminConn.Add(ls, interval, startTime) + if err != nil { + return err + } + + return nil +} + +// groupInfo is a wrapper around librbd.MirrorGroupInfo that contains the +// group mirror info. +type groupInfo struct { + *librbd.MirrorGroupInfo +} + +func (info *groupInfo) GetState() string { + return info.State.String() +} + +func (info *groupInfo) IsPrimary() bool { + return info.Primary +} + +// globalMirrorGroupStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the +// global mirror group status. +type globalMirrorGroupStatus struct { + *librbd.GlobalMirrorGroupStatus +} + +func (status globalMirrorGroupStatus) GetState() string { + return status.GlobalMirrorGroupStatus.Info.State.String() +} + +func (status globalMirrorGroupStatus) IsPrimary() bool { + return status.GlobalMirrorGroupStatus.Info.Primary +} + +func (status globalMirrorGroupStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorGroupStatus.LocalStatus() + if err != nil { + err = fmt.Errorf("failed to get local site status: %w", err) + } + + return siteMirrorGroupStatus{ + SiteMirrorGroupStatus: &s, + }, err +} + +func (status globalMirrorGroupStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for i := range status.SiteStatuses { + siteStatuses = append(siteStatuses, siteMirrorGroupStatus{SiteMirrorGroupStatus: &status.SiteStatuses[i]}) + } + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorGroupStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status globalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorGroupStatus + err error = librbd.ErrNotExist + ) + + type localStatus struct { + LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` + LastSnapshotBytes int64 `json:"last_snapshot_bytes"` + LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"` + } + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + + images := status.SiteStatuses[i].MirrorImages + + totalSnpshotTime := int64(0) + totalSnapshotBytes := int64(0) + totalSnapshotDuration := int64(0) + totalImages := len(images) + for _, image := range images { + log.DebugLog(ctx, "Madhu image: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + image.MirrorUUID, + image.State, + image.Description, + image.LastUpdate, + image.Up) + description := image.Description + log.DebugLog(ctx, "[Madhu] description: %s", description) + splittedString := strings.SplitN(description, ",", 2) + if len(splittedString) == 1 { + log.DebugLog(ctx, "no snapshot details", splittedString[0]) + continue + } + var localSnapInfo localStatus + err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err) + } + log.DebugLog(ctx, "Madhu localStatus", localSnapInfo) + totalSnpshotTime += localSnapInfo.LocalSnapshotTime + totalSnapshotBytes += localSnapInfo.LastSnapshotBytes + totalSnapshotDuration += *localSnapInfo.LastSnapshotDuration + } + err = nil + totalDuration := int64(0) + if totalSnapshotDuration > 0 { + totalDuration = int64(totalSnapshotDuration / int64(totalImages)) + } + totalTime := int64(0) + if totalSnpshotTime > 0 { + totalTime = int64(totalSnpshotTime / int64(totalImages)) + } + totalBytes := int64(0) + if totalSnapshotBytes > 0 { + totalBytes = int64(totalSnapshotBytes / int64(totalImages)) + } + // write the total snapshot time, bytes and duration to the description + d := localStatus{ + LocalSnapshotTime: totalTime, + LastSnapshotBytes: totalBytes, + LastSnapshotDuration: &totalDuration, + } + description, err := json.Marshal(d) + log.DebugLog(ctx, "description: %s", description) + log.DebugLog(ctx, "description: %v", d) + if err != nil { + + return nil, fmt.Errorf("failed to marshal local snapshot info: %w", err) + } + ss.Description = fmt.Sprintf("%s, %s", ss.Description, description) + break + } + } + + return siteMirrorGroupStatus{SiteMirrorGroupStatus: &ss}, err +} + +// siteMirrorGroupStatus is a wrapper around librbd.SiteMirrorGroupStatus that contains the +// site mirror group status. +type siteMirrorGroupStatus struct { + *librbd.SiteMirrorGroupStatus +} + +func (status siteMirrorGroupStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status siteMirrorGroupStatus) GetState() string { + return status.State.String() +} + +func (status siteMirrorGroupStatus) GetDescription() string { + return status.Description +} + +func (status siteMirrorGroupStatus) IsUP() bool { + return status.Up +} + +func (status siteMirrorGroupStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() +} diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index e8414f3ddf9..55cd78201d8 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -26,6 +26,8 @@ import ( librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/csi-addons/spec/lib/go/volumegroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" @@ -184,7 +186,7 @@ func (vg *volumeGroup) Create(ctx context.Context) error { return nil } -func (vg *volumeGroup) Delete(ctx context.Context) error { +func (vg *volumeGroup) Delete(ctx context.Context, vgrMirrorInfo types.MirrorInfo, mirror types.Mirror) error { name, err := vg.GetName(ctx) if err != nil { return err @@ -195,6 +197,41 @@ func (vg *volumeGroup) Delete(ctx context.Context) error { return err } + // Cleanup only omap data if the following condition is met + // Mirroring is enabled on the group + // Local group is secondary + // Local group is in up+replaying state + log.DebugLog(ctx, "volume group %v is in %v state and is primary %v", vg, vgrMirrorInfo.GetState, vgrMirrorInfo.IsPrimary()) + if vgrMirrorInfo != nil && vgrMirrorInfo.GetState() == librbd.MirrorGroupEnabled.String() && !vgrMirrorInfo.IsPrimary() { + // If the group is in a secondary state and its up+replaying means its + // an healthy secondary and the group is primary somewhere in the + // remote cluster and the local group is getting replayed. Delete the + // OMAP data generated as we cannot delete the secondary group. When + // the group on the primary cluster gets deleted/mirroring disabled, + // the group on all the remote (secondary) clusters will get + // auto-deleted. This helps in garbage collecting the OMAP, VR, VGR, + // VGRC, PVC and PV objects after failback operation. + if mirror != nil { + sts, rErr := mirror.GetGlobalMirroringStatus(ctx) + if rErr != nil { + return status.Error(codes.Internal, rErr.Error()) + } + localStatus, rErr := sts.GetLocalSiteStatus() + if rErr != nil { + log.ErrorLog(ctx, "failed to get local status for volume group%s: %w", name, rErr) + return status.Error(codes.Internal, rErr.Error()) + } + log.DebugLog(ctx, "local status is %v and local state is %v", localStatus.IsUP(), localStatus.GetState()) + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorGroupStatusStateReplaying.String() { + return vg.commonVolumeGroup.Delete(ctx) + } + log.ErrorLog(ctx, + "secondary group status is up=%t and state=%s", + localStatus.IsUP(), + localStatus.GetState()) + } + } + err = librbd.GroupRemove(ioctx, name) if err != nil && !errors.Is(err, rados.ErrNotFound) { return fmt.Errorf("failed to remove volume group %q: %w", vg, err) @@ -252,21 +289,23 @@ func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error { return nil } -func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error { +func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume, removeImageFromGroup bool) error { // volume was already removed from the group if len(vg.volumes) == 0 { return nil } - err := vol.RemoveFromGroup(ctx, vg) - if err != nil { - if errors.Is(err, librbd.ErrNotExist) { - return nil - } + if removeImageFromGroup { + log.DebugLog(ctx, "removing image %v from group %v", vol, vg) + err := vol.RemoveFromGroup(ctx, vg) + if err != nil { + if errors.Is(err, librbd.ErrNotExist) { + return nil + } - return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err) + return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err) + } } - // toRemove contain the ID of the volume that is removed from the group toRemove, err := vol.GetID(ctx) if err != nil { @@ -415,3 +454,7 @@ func (vg *volumeGroup) CreateSnapshots( return snapshots, nil } + +func (vg *volumeGroup) ToMirror() (types.Mirror, error) { + return volumeGroupMirror{vg}, nil +} diff --git a/internal/rbd/group_controllerserver.go b/internal/rbd/group_controllerserver.go index a4b313ea7cd..c8f2e9181b6 100644 --- a/internal/rbd/group_controllerserver.go +++ b/internal/rbd/group_controllerserver.go @@ -72,7 +72,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( for _, volume := range volumes { if vg != nil { // 'normal' cleanup, remove all images from the group - vgErr := vg.RemoveVolume(ctx, volume) + vgErr := vg.RemoveVolume(ctx, volume, true) if vgErr != nil { log.ErrorLog( ctx, @@ -91,7 +91,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( // the VG should always be deleted, volumes can only belong to a single VG log.DebugLog(ctx, "removing temporary volume group %q", vg) - vgErr := vg.Delete(ctx) + vgErr := vg.Delete(ctx, nil, nil) if vgErr != nil { log.ErrorLog(ctx, "failed to remove temporary volume group %q: %v", vg, vgErr) } diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 61fcfdcaadf..53385aee8d7 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "github.com/csi-addons/spec/lib/go/replication" + "github.com/ceph/ceph-csi/internal/journal" rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" @@ -504,6 +506,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } +<<<<<<< HEAD // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // This performs the following operations: // - extracts clusterID and Mons from the cluster mapping @@ -633,4 +636,79 @@ func (mgr *rbdManager) RegenerateVolumeGroupJournal( groupHandle, vgName, requestName) return groupHandle, nil +======= +func (mgr *rbdManager) GetMirrorSource(ctx context.Context, reqID string, + rep *replication.ReplicationSource, +) ([]types.Volume, types.Mirror, error) { + switch { + // Backward compatibility: if rep is nil, we assume that the sidecar is still old and + // setting only volumeId not the replication source. + case rep == nil || rep.GetVolume() != nil: + rbdVol, err := mgr.GetVolumeByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdVol.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdVol.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume %s to mirror: %w", rbdVol, err) + } + + return []types.Volume{rbdVol}, mir, nil + case rep.GetVolumegroup() != nil: + rbdGroup, err := mgr.GetVolumeGroupByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume group by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdGroup.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdGroup.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume group %s to mirror: %w", rbdGroup, err) + } + var vols []types.Volume + vols, err = rbdGroup.ListVolumes(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to list volumes in volume group %q: %w", rbdGroup, err) + } + // Get all the volume with connection and return it + volumes := make([]types.Volume, len(vols)) + // Destroy connections if there is any error + defer func() { + if err != nil { + for _, vol := range vols { + vol.Destroy(ctx) + } + } + }() + + for i, vol := range vols { + var id string + id, err = vol.GetID(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get id for volume %q in group %q: %w", vol, rbdGroup, err) + } + var v types.Volume + v, err = mgr.GetVolumeByID(ctx, id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q in group %q: %w", id, rbdGroup, err) + } + volumes[i] = v + } + + return volumes, mir, nil + + default: + return nil, nil, errors.New("replication source is not set") + } +>>>>>>> 71b235ff1 (rbd: implement GetMirrorSource in manager) } diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 08183f9fb63..c83f951d4e4 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -58,13 +58,13 @@ type VolumeGroup interface { Create(ctx context.Context) error // Delete removes the VolumeGroup from the backend storage. - Delete(ctx context.Context) error + Delete(ctx context.Context, vgMirrorInfo MirrorInfo, mirror Mirror) error // AddVolume adds the Volume to the VolumeGroup. AddVolume(ctx context.Context, volume Volume) error // RemoveVolume removes the Volume from the VolumeGroup. - RemoveVolume(ctx context.Context, volume Volume) error + RemoveVolume(ctx context.Context, volume Volume, removeImageFromGroup bool) error // ListVolumes returns a slice with all Volumes in the VolumeGroup. ListVolumes(ctx context.Context) ([]Volume, error) @@ -73,4 +73,7 @@ type VolumeGroup interface { // The Snapshots are crash consistent, and created as a consistency // group. CreateSnapshots(ctx context.Context, cr *util.Credentials, name string) ([]Snapshot, error) + + // ToMirror converts the VolumeGroup to a Mirror. + ToMirror() (Mirror, error) } diff --git a/internal/rbd/types/manager.go b/internal/rbd/types/manager.go index 458bc93dc0d..ca3bf63a456 100644 --- a/internal/rbd/types/manager.go +++ b/internal/rbd/types/manager.go @@ -18,6 +18,8 @@ package types import ( "context" + + "github.com/csi-addons/spec/lib/go/replication" ) // VolumeResolver can be used to construct a Volume from a CSI VolumeId. @@ -71,4 +73,8 @@ type Manager interface { // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // returns the volume group handle RegenerateVolumeGroupJournal(ctx context.Context, groupID, requestName string, volumeIds []string) (string, error) + + // GetMirrorSource returns the source of the mirror for the given volume or group. + GetMirrorSource(ctx context.Context, volumeID string, + rep *replication.ReplicationSource) ([]Volume, Mirror, error) } diff --git a/scripts/Dockerfile.test b/scripts/Dockerfile.test index def6a3325eb..2ed34367faa 100644 --- a/scripts/Dockerfile.test +++ b/scripts/Dockerfile.test @@ -8,7 +8,7 @@ # little different. # -FROM registry.fedoraproject.org/fedora:latest +FROM quay.ceph.io/ceph-ci/ceph:wip-pkalever-rbd-group-snap-mirror ARG GOPATH=/go ARG GOROOT=/usr/local/go diff --git a/vendor/github.com/ceph/go-ceph/rbd/group.go b/vendor/github.com/ceph/go-ceph/rbd/group.go index 654d15e3e33..b36647bd927 100644 --- a/vendor/github.com/ceph/go-ceph/rbd/group.go +++ b/vendor/github.com/ceph/go-ceph/rbd/group.go @@ -110,7 +110,8 @@ func GroupImageAdd(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cImageName) + cImageName, + C.uint32_t(0)) return getError(ret) } @@ -135,7 +136,8 @@ func GroupImageRemove(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cImageName) + cImageName, + C.uint32_t(0)) return getError(ret) } @@ -160,7 +162,8 @@ func GroupImageRemoveByID(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cid) + cid, + C.uint32_t(0)) return getError(ret) } diff --git a/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go new file mode 100644 index 00000000000..a33fd6ee273 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go @@ -0,0 +1,373 @@ +//go:build ceph_preview + +package rbd + +// #cgo LDFLAGS: -lrbd +// #include +// #include +import "C" +import ( + "errors" + "fmt" + "unsafe" + + "github.com/ceph/go-ceph/internal/cutil" + "github.com/ceph/go-ceph/rados" +) + +// MirrorGroupEnable will enable mirroring for a group using the specified mode. +// +// Implements: +// +// int rbd_mirror_group_enable(rados_ioctx_t p, const char *name, +// rbd_mirror_image_mode_t mirror_image_mode, +// uint32_t flags); +func MirrorGroupEnable(groupIoctx *rados.IOContext, groupName string, mode ImageMirrorMode) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_enable( + cephIoctx(groupIoctx), + cGroupName, + C.rbd_mirror_image_mode_t(mode), + (C.uint32_t)(2), + ) + return getError(ret) +} + +// MirrorGroupDisable will disabling mirroring for a group +// +// Implements: +// +// int rbd_mirror_group_disable(rados_ioctx_t p, const char *name, +// bool force) +func MirrorGroupDisable(groupIoctx *rados.IOContext, groupName string, force bool) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_disable( + cephIoctx(groupIoctx), + cGroupName, + C.bool(force)) + return getError(ret) +} + +// MirrorGroupPromote will promote the mirrored group to primary status +// +// Implements: +// +// int rbd_mirror_group_promote(rados_ioctx_t p, const char *name, +// uint32_t flags, bool force) +func MirrorGroupPromote(groupIoctx *rados.IOContext, groupName string, force bool) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_promote( + cephIoctx(groupIoctx), + cGroupName, + (C.uint32_t)(0), + C.bool(force)) + return getError(ret) +} + +// MirrorGroupDemote will demote the mirrored group to primary status +// +// Implements: +// +// int rbd_mirror_group_demote(rados_ioctx_t p, const char *name, +// uint32_t flags) +func MirrorGroupDemote(groupIoctx *rados.IOContext, groupName string) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_demote( + cephIoctx(groupIoctx), + cGroupName, + (C.uint32_t)(0)) + return getError(ret) +} + +// MirrorGroupResync is used to manually resolve split-brain status by triggering +// resynchronization +// +// Implements: +// +// int rbd_mirror_group_resync(rados_ioctx_t p, const char *name) +func MirrorGroupResync(groupIoctx *rados.IOContext, groupName string) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_resync( + cephIoctx(groupIoctx), + cGroupName) + return getError(ret) +} + +// MirrorGroupState represents the current state of the mirrored group +type MirrorGroupState C.rbd_mirror_group_state_t + +// String representation of MirrorGroupState. +func (mgs MirrorGroupState) String() string { + switch mgs { + case MirrorGroupEnabled: + return "enabled" + case MirrorGroupDisabled: + return "disabled" + case MirrorGroupEnabling: + return "enabling" + case MirrorGrpupDisabling: + return "disabled" + default: + return "" + } +} + +const ( + // MirrorGrpupDisabling is the representation of + // RBD_MIRROR_GROUP_DISABLING from librbd. + MirrorGrpupDisabling = MirrorGroupState(C.RBD_MIRROR_GROUP_DISABLING) + // MirrorGroupEnabling is the representation of + // RBD_MIRROR_GROUP_ENABLING from librbd + MirrorGroupEnabling = MirrorGroupState(C.RBD_MIRROR_GROUP_ENABLING) + // MirrorGroupEnabled is the representation of + // RBD_MIRROR_IMAGE_ENABLED from librbd. + MirrorGroupEnabled = MirrorGroupState(C.RBD_MIRROR_GROUP_ENABLED) + // MirrorGroupDisabled is the representation of + // RBD_MIRROR_GROUP_DISABLED from librbd. + MirrorGroupDisabled = MirrorGroupState(C.RBD_MIRROR_GROUP_DISABLED) +) + +// MirrorGroupInfo represents the mirroring status information of group. +type MirrorGroupInfo struct { + GlobalID string + State MirrorGroupState + MirrorImageMode ImageMirrorMode + Primary bool +} + +// GetMirrorGroupInfo returns the mirroring status information of the mirrored group +// +// Implements: +// +// int rbd_mirror_group_get_info(rados_ioctx_t p, const char *name, +// rbd_mirror_group_info_t *mirror_group_info, +// size_t info_size) +func GetMirrorGroupInfo(groupIoctx *rados.IOContext, groupName string) (*MirrorGroupInfo, error) { + var cgInfo C.rbd_mirror_group_info_t + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + + ret := C.rbd_mirror_group_get_info( + cephIoctx(groupIoctx), + cGroupName, + &cgInfo, + C.sizeof_rbd_mirror_group_info_t) + var err error + if ret < 0 { + err = getError(ret) + } + if err != nil { + if errors.Is(err, ErrNotFound) || errors.Is(err, rados.RadosErrorNotFound) { + return &MirrorGroupInfo{ + State: MirrorGroupDisabled, + MirrorImageMode: ImageMirrorModeSnapshot, + Primary: false, + }, nil + } + } + info := convertMirrorGroupInfo(&cgInfo) + + // free C memory allocated by C.rbd_mirror_group_get_info call + C.rbd_mirror_group_get_info_cleanup(&cgInfo) + return &info, nil + +} + +func convertMirrorGroupInfo(cgInfo *C.rbd_mirror_group_info_t) MirrorGroupInfo { + return MirrorGroupInfo{ + GlobalID: C.GoString(cgInfo.global_id), + MirrorImageMode: ImageMirrorMode(cgInfo.mirror_image_mode), + State: MirrorGroupState(cgInfo.state), + Primary: bool(cgInfo.primary), + } +} + +// MirrorGroupStatusState is used to indicate the state of a mirrored group +// within the site status info. +type MirrorGroupStatusState int64 + +const ( + // MirrorGroupStatusStateUnknown is equivalent to MIRROR_GROUP_STATUS_STATE_UNKNOWN + MirrorGroupStatusStateUnknown = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_UNKNOWN) + // MirrorGroupStatusStateError is equivalent to MIRROR_GROUP_STATUS_STATE_ERROR + MirrorGroupStatusStateError = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_ERROR) + // MirrorGroupStatusStateStartingReplay is equivalent to MIRROR_GROUP_STATUS_STATE_STARTING_REPLAY + MirrorGroupStatusStateStartingReplay = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STARTING_REPLAY) + // MirrorGroupStatusStateReplaying is equivalent to MIRROR_GROUP_STATUS_STATE_REPLAYING + MirrorGroupStatusStateReplaying = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_REPLAYING) + // MirrorGroupStatusStateStoppingReplay is equivalent to MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY + MirrorGroupStatusStateStoppingReplay = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY) + // MirrorGroupStatusStateStopped is equivalent to MIRROR_IMAGE_GROUP_STATUS_STATE_STOPPED + MirrorGroupStatusStateStopped = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STOPPED) +) + +// String represents the MirrorImageStatusState as a short string. +func (state MirrorGroupStatusState) String() (s string) { + switch state { + case MirrorGroupStatusStateUnknown: + s = "unknown" + case MirrorGroupStatusStateError: + s = "error" + case MirrorGroupStatusStateStartingReplay: + s = "starting_replay" + case MirrorGroupStatusStateReplaying: + s = "replaying" + case MirrorGroupStatusStateStoppingReplay: + s = "stopping_replay" + case MirrorGroupStatusStateStopped: + s = "stopped" + default: + s = fmt.Sprintf("unknown(%d)", state) + } + return s +} + +// SiteMirrorGroupStatus contains information pertaining to the status of +// a mirrored group within a site. +type SiteMirrorGroupStatus struct { + MirrorUUID string + State MirrorGroupStatusState + MirrorImageCount int + MirrorImagePoolIDs int64 + MirrorImageGlobalIDs string + MirrorImages []SiteMirrorImageStatus + Description string + LastUpdate int64 + Up bool +} + +// GlobalMirrorGroupStatus contains information pertaining to the global +// status of a mirrored group. It contains general information as well +// as per-site information stored in the SiteStatuses slice. +type GlobalMirrorGroupStatus struct { + Name string + Info MirrorGroupInfo + SiteStatusesCount int + SiteStatuses []SiteMirrorGroupStatus +} + +// LocalStatus returns one SiteMirrorGroupStatus item from the SiteStatuses +// slice that corresponds to the local site's status. If the local status +// is not found than the error ErrNotExist will be returned. +func (gmis GlobalMirrorGroupStatus) LocalStatus() (SiteMirrorGroupStatus, error) { + var ( + ss SiteMirrorGroupStatus + err error = ErrNotExist + ) + for i := range gmis.SiteStatuses { + // I couldn't find it explicitly documented, but a site mirror uuid + // of an empty string indicates that this is the local site. + // This pattern occurs in both the pybind code and ceph c++. + if gmis.SiteStatuses[i].MirrorUUID == "" { + ss = gmis.SiteStatuses[i] + err = nil + break + } + } + return ss, err +} + +type groupSiteArray [cutil.MaxIdx]C.rbd_mirror_group_site_status_t + +// GetGlobalMirrorGroupStatus returns status information pertaining to the state +// of a groups's mirroring. +// +// Implements: +// +// int rbd_mirror_group_get_global_status( +// IoCtx& io_ctx, +// const char *group_name +// mirror_group_global_status_t *mirror_group_status, +// size_t status_size); +func GetGlobalMirrorGroupStatus(ioctx *rados.IOContext, groupName string) (GlobalMirrorGroupStatus, error) { + s := C.rbd_mirror_group_global_status_t{} + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_get_global_status( + cephIoctx(ioctx), + (*C.char)(cGroupName), + &s, + C.sizeof_rbd_mirror_group_global_status_t) + if err := getError(ret); err != nil { + return GlobalMirrorGroupStatus{}, err + } + + status := newGlobalMirrorGroupStatus(&s) + return status, nil +} + +func newGlobalMirrorGroupStatus(s *C.rbd_mirror_group_global_status_t) GlobalMirrorGroupStatus { + // Initializing the status object + status := GlobalMirrorGroupStatus{ + Name: C.GoString(s.name), + Info: convertMirrorGroupInfo(&s.info), + SiteStatusesCount: int(s.site_statuses_count), + SiteStatuses: make([]SiteMirrorGroupStatus, s.site_statuses_count), + } + + // Print the count of site statuses for debugging + fmt.Println("status.SiteStatusesCount: ", s.site_statuses_count) + + fmt.Printf("s.site_statuses: %+v\n", s.site_statuses) + // Check if site statuses are not null before using them + if s.site_statuses != nil && s.site_statuses_count > 0 { + gsscs := (*groupSiteArray)(unsafe.Pointer(s.site_statuses))[:s.site_statuses_count:s.site_statuses_count] + for i := C.uint32_t(0); i < s.site_statuses_count; i++ { + gss := gsscs[i] + // Ensure that fields are valid before using them + if gss.mirror_uuid != nil && gss.mirror_image_global_ids != nil { + status.SiteStatuses[i] = SiteMirrorGroupStatus{ + MirrorUUID: C.GoString(gss.mirror_uuid), + MirrorImageGlobalIDs: C.GoString(*gss.mirror_image_global_ids), + MirrorImagePoolIDs: int64(*gss.mirror_image_pool_ids), + State: MirrorGroupStatusState(gss.state), + Description: C.GoString(gss.description), + MirrorImageCount: int(gss.mirror_image_count), + LastUpdate: int64(gss.last_update), + MirrorImages: make([]SiteMirrorImageStatus, gss.mirror_image_count), + Up: bool(gss.up), + } + + // Check if the mirror_images pointer is valid + if gss.mirror_images != nil && gss.mirror_image_count > 0 { + + sscs := (*siteArray)(unsafe.Pointer(gss.mirror_images))[:gss.mirror_image_count:gss.mirror_image_count] + fmt.Printf("sscs: siteArray %+v\n", sscs) + for j := C.uint32_t(0); j < gss.mirror_image_count; j++ { + ss := sscs[j] + + // Ensure that fields are valid before using them + if ss.mirror_uuid != nil { + status.SiteStatuses[i].MirrorImages[j] = SiteMirrorImageStatus{ + MirrorUUID: C.GoString(ss.mirror_uuid), + State: MirrorImageStatusState(ss.state), + Description: C.GoString(ss.description), + LastUpdate: int64(ss.last_update), + Up: bool(ss.up), + } + } else { + // Log if a field is invalid + fmt.Println("Warning: mirror_uuid is nil at index", i, j) + } + } + } else { + // Handle case where mirror_images is nil or mirror_image_count is 0 + fmt.Println("Warning: mirror_images is nil or mirror_image_count is 0 at index", i) + } + } else { + // Handle case where mirror_uuid or mirror_image_global_ids is nil + fmt.Println("Warning: mirror_uuid or mirror_image_global_ids is nil at index", i) + } + } + } else { + // Handle case where site statuses are nil or count is 0 + fmt.Println("Warning: site_statuses is nil or site_statuses_count is 0") + } + // Return the populated status + return status +}