Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 75 additions & 31 deletions pkg/tasks/delete_repository_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type DeleteRepositorySnapshots struct {

// This org may or may not have a domain created in pulp, so make sure the domain exists and if not, return a nil pulpClient
func lookupOptionalPulpClient(ctx context.Context, globalClient pulp_client.PulpGlobalClient, task *models.TaskInfo, daoReg *dao.DaoRegistry) (*pulp_client.PulpClient, error) {
logger := LogForTask(task.Id.String(), task.Typename, task.RequestID)
if !config.PulpConfigured() {
logger.Debug().Msg("pulp not configured, skipping pulp client setup")
return nil, nil
}
domainName, err := daoReg.Domain.FetchOrCreateDomain(ctx, task.OrgId)
Expand All @@ -51,6 +53,10 @@ func lookupOptionalPulpClient(ctx context.Context, globalClient pulp_client.Pulp
client := pulp_client.GetPulpClientWithDomain(domainName)
return &client, nil
}
logger.Debug().
Str("org_id", task.OrgId).
Str("domain_name", domainName).
Msg("no pulp domain found for org, org has never snapshotted, skipping pulp cleanup")
return nil, nil
}

Expand Down Expand Up @@ -100,13 +106,13 @@ func (d *DeleteRepositorySnapshots) Run() error {
// Do not throw an error if not found
latestDistro, err := d.getPulpClient().FindDistributionByPath(d.ctx, latestPathIdent)
if err != nil {
return err
return fmt.Errorf("failed to find latest distribution by path %v: %w", latestPathIdent, err)
}

if latestDistro != nil {
_, err := d.deleteRpmDistribution(*latestDistro.PulpHref)
if err != nil {
return err
return fmt.Errorf("failed to delete latest distribution %v: %w", *latestDistro.PulpHref, err)
}
}

Expand All @@ -115,20 +121,25 @@ func (d *DeleteRepositorySnapshots) Run() error {
return err
}

var snapErrs []error
for _, snap := range snaps {
_, err = d.deleteRpmDistribution(snap.DistributionHref)
if err != nil {
return err
snapErrs = append(snapErrs, err)
continue
}
err = d.deleteTemplateSnapshot(snap.UUID)
if err != nil {
return err
if err = d.deleteTemplateSnapshot(snap.UUID); err != nil {
snapErrs = append(snapErrs, err)
continue
}
err = d.deleteSnapshot(snap.UUID)
if err != nil {
return err
if err = d.deleteSnapshot(snap.UUID); err != nil {
snapErrs = append(snapErrs, err)
}
}
if err = errors.Join(snapErrs...); err != nil {
return err
}
Comment on lines +124 to +141
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deleteTemplateSnapshot errors, let's also continue. I think if you try to delete the snapshot before deleting the template snapshot, I think that would cause an error or at least leave things in an odd state.

// only runs if all snaps deletes succeed
_, _, err = d.deleteRpmRepoAndRemote()
if err != nil {
return err
Expand Down Expand Up @@ -160,60 +171,80 @@ func (d *DeleteRepositorySnapshots) fetchSnapshots() ([]models.Snapshot, error)
}

func (d *DeleteRepositorySnapshots) deleteRpmDistribution(snapDistributionHref string) (*zest.TaskResponse, error) {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
deleteDistributionHref, err := d.getPulpClient().DeleteRpmDistribution(d.ctx, snapDistributionHref)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to delete rpm distribution %v: %w", snapDistributionHref, err)
}
if deleteDistributionHref == nil {
logger.Debug().
Str("distribution_href", snapDistributionHref).
Msg("no task href returned for distribution deletion, distribution may have already been deleted")
return nil, nil
}
task, err := d.getPulpClient().PollTask(d.ctx, *deleteDistributionHref)
if err != nil {
return task, err
return task, fmt.Errorf("error polling distribution deletion task for %v: %w", snapDistributionHref, err)
}
return task, nil
}

func (d *DeleteRepositorySnapshots) deleteRpmRepoAndRemote() (taskRepo, taskRemote *zest.TaskResponse, err error) {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)

remoteResp, err := d.getPulpClient().GetRpmRemoteByName(d.ctx, d.payload.RepoConfigUUID)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to look up rpm remote %v: %w", d.payload.RepoConfigUUID, err)
}
if remoteResp == nil {
logger.Debug().
Str("repo_config_uuid", d.payload.RepoConfigUUID).
Msg("no rpm remote found, skipping remote deletion")
}
if remoteResp != nil {
remoteHref := remoteResp.PulpHref
deleteRemoteHref, err := d.getPulpClient().DeleteRpmRemote(d.ctx, *remoteHref)
if err != nil {
return taskRepo, nil, err
return taskRepo, nil, fmt.Errorf("failed to delete rpm remote %v: %w", *remoteHref, err)
}
taskRemote, err = d.getPulpClient().PollTask(d.ctx, deleteRemoteHref)
if err != nil {
return taskRepo, taskRemote, err
return taskRepo, taskRemote, fmt.Errorf("error polling rpm remote deletion task for %v: %w", *remoteHref, err)
}
}

repoResp, err := d.getPulpClient().GetRpmRepositoryByName(d.ctx, d.payload.RepoConfigUUID)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to look up rpm repository %v: %w", d.payload.RepoConfigUUID, err)
}
if repoResp == nil {
logger.Debug().
Str("repo_config_uuid", d.payload.RepoConfigUUID).
Msg("no rpm repository found, skipping repository deletion")
}
if repoResp != nil {
repoHref := repoResp.PulpHref
deleteRepoHref, err := d.getPulpClient().DeleteRpmRepository(d.ctx, *repoHref)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to delete rpm repository %v: %w", *repoHref, err)
}
taskRepo, err = d.getPulpClient().PollTask(d.ctx, deleteRepoHref)
if err != nil {
return taskRepo, nil, err
return taskRepo, nil, fmt.Errorf("error polling rpm repository deletion task for %v: %w", *repoHref, err)
}
}
return taskRepo, taskRemote, nil
}

func (d *DeleteRepositorySnapshots) deleteSnapshot(snapUUID string) error {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
err := d.daoReg.Snapshot.Delete(d.ctx, snapUUID)
if err != nil {
var daoErr *ce.DaoError
if errors.As(err, &daoErr) && daoErr.NotFound {
logger.Warn().
Str("snapshot_uuid", snapUUID).
Msg("snapshot not found during deletion, already deleted")
return nil
}
return fmt.Errorf("error deleting snapshot %v: %w", snapUUID, err)
Expand All @@ -222,13 +253,17 @@ func (d *DeleteRepositorySnapshots) deleteSnapshot(snapUUID string) error {
}

func (d *DeleteRepositorySnapshots) deleteRepoConfig() error {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
err := d.daoReg.RepositoryConfig.Delete(d.ctx, d.task.OrgId, d.payload.RepoConfigUUID)
if err != nil {
var daoErr *ce.DaoError
if errors.As(err, &daoErr) && daoErr.NotFound {
logger.Warn().
Str("repo_config_uuid", d.payload.RepoConfigUUID).
Msg("repo config not found during deletion, already deleted")
return nil
}
return fmt.Errorf("error deleting repository configuration: %w", err)
return fmt.Errorf("error deleting repository configuration %v: %w", d.payload.RepoConfigUUID, err)
}
return nil
}
Expand All @@ -249,13 +284,15 @@ func (d *DeleteRepositorySnapshots) candlepinRHContentId(templateOrgId string, r
}

func (d *DeleteRepositorySnapshots) deleteCandlepinContent() error {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
if !config.CandlepinConfigured() {
logger.Debug().Msg("candlepin not configured, skipping candlepin content deletion")
return nil
}
if d.task.OrgId == config.RedHatOrg {
templates, err := d.daoReg.Template.InternalOnlyGetTemplatesForRepoConfig(d.ctx, d.payload.RepoConfigUUID, false)
if err != nil {
return fmt.Errorf("couldn't get templates for repo config")
return fmt.Errorf("couldn't get templates for repo config %v: %w", d.payload.RepoConfigUUID, err)
}
for _, template := range templates {
// We have to lookup the content ID for RH content, as its based on the repo label
Expand All @@ -265,45 +302,48 @@ func (d *DeleteRepositorySnapshots) deleteCandlepinContent() error {
}
err = d.cpClient.DemoteContentFromEnvironment(d.ctx, template.UUID, []string{contentId})
if err != nil {
return fmt.Errorf("couldn't demote content from environment, %v", err)
return fmt.Errorf("couldn't demote content from environment for template %v: %w", template.UUID, err)
}
}
} else {
err := d.cpClient.RemoveContentFromProduct(d.ctx, d.task.OrgId, d.payload.RepoConfigUUID)
if err != nil {
return err
return fmt.Errorf("failed to remove content from candlepin product for repo %v: %w", d.payload.RepoConfigUUID, err)
}

err = d.cpClient.DeleteContent(d.ctx, d.task.OrgId, d.payload.RepoConfigUUID)
if err != nil {
return err
return fmt.Errorf("error deleting candlepin content for repo %v: %w", d.payload.RepoConfigUUID, err)
}
}

return nil
}

func (d *DeleteRepositorySnapshots) deleteTemplateRepoDistributions() (err error) {
func (d *DeleteRepositorySnapshots) deleteTemplateRepoDistributions() error {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
var errs []error

var templates []api.TemplateResponse
if d.task.OrgId == config.RedHatOrg {
var err error
templates, err = d.daoReg.Template.InternalOnlyGetTemplatesForRepoConfig(d.ctx, d.payload.RepoConfigUUID, false)
if err != nil {
return err
return fmt.Errorf("failed to get templates for repo config %v: %w", d.payload.RepoConfigUUID, err)
}
} else {
templateResponse, _, err := d.daoReg.Template.List(d.ctx, d.task.OrgId, true, api.PaginationData{Limit: -1}, api.TemplateFilterData{RepositoryUUIDs: []string{d.payload.RepoConfigUUID}})
if err != nil {
return err
return fmt.Errorf("failed to get templates for repo config %v: %w", d.payload.RepoConfigUUID, err)
}
templates = templateResponse.Data
}

for _, template := range templates {
distHref, err := d.daoReg.Template.GetDistributionHref(d.ctx, template.UUID, d.payload.RepoConfigUUID)
if err != nil {
return err
errs = append(errs, fmt.Errorf("failed to get distribution href for template %v: %w", template.UUID, err))
continue
}

if distHref == nil {
Expand All @@ -315,25 +355,29 @@ func (d *DeleteRepositorySnapshots) deleteTemplateRepoDistributions() (err error

taskHref, err := d.getPulpClient().DeleteRpmDistribution(d.ctx, *distHref)
if err != nil {
return err
errs = append(errs, fmt.Errorf("failed to delete rpm distribution %v for template %v: %w", *distHref, template.UUID, err))
continue
}

if taskHref != nil {
_, err = d.getPulpClient().PollTask(d.ctx, *taskHref)
if err != nil {
return err
if _, err = d.getPulpClient().PollTask(d.ctx, *taskHref); err != nil {
errs = append(errs, fmt.Errorf("error polling distribution deletion task for template %v: %w", template.UUID, err))
}
}
}

return nil
return errors.Join(errs...)
}

func (d *DeleteRepositorySnapshots) deleteTemplateSnapshot(snapshotUUID string) error {
logger := LogForTask(d.task.Id.String(), d.task.Typename, d.task.RequestID)
err := d.daoReg.Template.DeleteTemplateSnapshot(d.ctx, snapshotUUID)
if err != nil {
var daoErr *ce.DaoError
if errors.As(err, &daoErr) && daoErr.NotFound {
logger.Warn().
Str("snapshot_uuid", snapshotUUID).
Msg("template snapshot association not found during deletion, already deleted")
return nil
}
return fmt.Errorf("error deleting template snapshot %v: %w", snapshotUUID, err)
Expand Down
Loading
Loading