Skip to content

Commit 7317a1b

Browse files
committed
RHINENG-19806: reimplement repo based recalc
recalc only system which have updated repos and packages
1 parent 2bd35e1 commit 7317a1b

3 files changed

Lines changed: 63 additions & 66 deletions

File tree

tasks/vmaas_sync/repo_based.go

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"app/base/utils"
88
"app/base/vmaas"
99
"app/tasks"
10+
"maps"
1011
"net/http"
12+
"slices"
1113
"time"
1214
)
1315

@@ -23,16 +25,15 @@ func getCurrentRepoBasedInventoryIDs() ([]mqueue.EvalData, error) {
2325
}
2426

2527
now := time.Now()
26-
thirdParty := true
27-
repoPackages, repoNoPackages, latestRepoChange, err := getUpdatedRepos(now, lastRepoBaseEval, &thirdParty)
28+
repos, packages, latestRepoChange, err := getUpdatedReposWithPackages(now, lastRepoBaseEval)
2829
if latestRepoChange == nil {
2930
return nil, nil
3031
}
3132
if err != nil {
3233
return nil, err
3334
}
3435

35-
inventoryAIDs, err := getRepoBasedInventoryIDs(repoPackages, repoNoPackages)
36+
inventoryAIDs, err := getRepoBasedInventoryIDs(repos, packages)
3637
if err != nil {
3738
return nil, err
3839
}
@@ -42,61 +43,65 @@ func getCurrentRepoBasedInventoryIDs() ([]mqueue.EvalData, error) {
4243
return inventoryAIDs, nil
4344
}
4445

45-
func getRepoBasedInventoryIDs(repoPackages [][]string, repos []string) ([]mqueue.EvalData, error) {
46+
func getRepoBasedInventoryIDs(repos []string, packages []string) ([]mqueue.EvalData, error) {
4647
var ids []mqueue.EvalData
47-
if len(repoPackages) == 0 && len(repos) == 0 {
48+
if len(repos) == 0 || len(packages) == 0 {
4849
return ids, nil
4950
}
5051

51-
// unique repo names from both repoPackages and repos
52-
uniqRepos := make(map[string]bool)
53-
for _, r := range repoPackages {
54-
uniqRepos[r[0]] = true
55-
}
56-
for _, r := range repos {
57-
uniqRepos[r] = true
58-
}
59-
uniqRepoList := make([]string, 0, len(uniqRepos))
60-
for k := range uniqRepos {
61-
uniqRepoList = append(uniqRepoList, k)
62-
}
63-
64-
innerQuery := database.DB.Table("system_repo sr").
52+
query := tasks.CancelableDB().Table("system_platform sp").
53+
Joins("JOIN system_repo sr ON sp.rh_account_id = sr.rh_account_id AND sp.id = sr.system_id").
6554
Joins("JOIN repo ON repo.id = sr.repo_id").
66-
Joins("JOIN system_platform sp ON sp.rh_account_id = sr.rh_account_id AND sp.id = sr.system_id").
6755
Joins("JOIN rh_account ra ON ra.id = sp.rh_account_id").
68-
Select("distinct sp.id, sp.inventory_id, sp.rh_account_id, ra.org_id, repo.name").
69-
Where("repo.name IN (?)", uniqRepoList)
70-
query := tasks.CancelableDB().Table("(?) as rb", innerQuery).
71-
Order("rb.rh_account_id").
72-
Select("distinct rb.inventory_id, rb.rh_account_id, rb.org_id")
73-
whereQ := database.DB
74-
75-
if len(repoPackages) > 0 && len(repoPackages) < tasks.MaxChangedPackages {
76-
query = query.
77-
Joins("JOIN system_package2 spkg ON spkg.rh_account_id = rb.rh_account_id AND spkg.system_id = rb.id").
78-
Joins("JOIN package_name pn ON pn.id = spkg.name_id")
79-
whereQ = whereQ.Where("(rb.name, pn.name) IN (?)", repoPackages)
80-
} else {
81-
whereQ = whereQ.Where("rb.name IN (?)", uniqRepoList)
56+
Joins("JOIN system_package2 spkg ON spkg.rh_account_id = sp.rh_account_id AND spkg.system_id = sp.id").
57+
Joins("JOIN package_name pn ON pn.id = spkg.name_id").
58+
Select("distinct sp.inventory_id, sp.rh_account_id, ra.org_id").
59+
Where("repo.name IN (?)", repos).
60+
Where("pn.name IN (?)", packages).
61+
Order("sp.rh_account_id")
62+
if err := query.Scan(&ids).Error; err != nil {
63+
return nil, err
64+
}
65+
return ids, nil
66+
}
67+
68+
func getUpdatedRepos(syncStart time.Time) ([]string, error) {
69+
repoMap, _, err := getVmaasRepos(syncStart, nil, false)
70+
if err != nil {
71+
return []string{}, err
8272
}
73+
return slices.Collect(maps.Keys(repoMap)), nil
74+
}
8375

84-
if len(repos) > 0 {
85-
whereQ = whereQ.Or("rb.name IN (?)", repos)
76+
func getUpdatedReposWithPackages(syncStart time.Time, modifiedSince *string) ([]string, []string, *time.Time, error) {
77+
repoMap, lastChange, err := getVmaasRepos(syncStart, modifiedSince, true)
78+
if err != nil {
79+
return nil, nil, nil, err
8680
}
81+
var affectedRepos = make([]string, 0, len(repoMap))
82+
var affectedPackages []string
8783

88-
if err := query.Where(whereQ).Scan(&ids).Error; err != nil {
89-
return nil, err
84+
included := make(map[string]bool) // remember packages already in list
85+
for repoName, packageList := range repoMap {
86+
if len(packageList) == 0 {
87+
continue
88+
}
89+
affectedRepos = append(affectedRepos, repoName)
90+
for _, packageName := range packageList {
91+
if !included[packageName] {
92+
included[packageName] = true
93+
affectedPackages = append(affectedPackages, packageName)
94+
}
95+
}
9096
}
91-
return ids, nil
97+
return affectedRepos, affectedPackages, lastChange, nil
9298
}
9399

94100
// nolint: funlen
95-
func getUpdatedRepos(syncStart time.Time, modifiedSince *string, thirdParty *bool,
96-
) ([][]string, []string, *time.Time, error) {
101+
func getVmaasRepos(syncStart time.Time, modifiedSince *string, thirdParty bool,
102+
) (map[string][]string, *time.Time, error) {
97103
page := 1
98-
var repoPackages [][]string
99-
var repoNoPackages []string
104+
var repoPackages = make(map[string][]string)
100105
var latestRepoChange *time.Time
101106
var nReposRedhat int
102107
var nReposThirdParty int
@@ -106,7 +111,7 @@ func getUpdatedRepos(syncStart time.Time, modifiedSince *string, thirdParty *boo
106111
Page: page,
107112
RepositoryList: []string{".*"},
108113
PageSize: tasks.AdvisoryPageSize,
109-
ThirdParty: thirdParty,
114+
ThirdParty: &thirdParty,
110115
ModifiedSince: modifiedSince,
111116
ShowPackages: true,
112117
}
@@ -119,7 +124,7 @@ func getUpdatedRepos(syncStart time.Time, modifiedSince *string, thirdParty *boo
119124

120125
vmaasDataPtr, err := utils.HTTPCallRetry(vmaasCallFunc, tasks.VmaasCallExpRetry, tasks.VmaasCallMaxRetries)
121126
if err != nil {
122-
return nil, nil, nil, err
127+
return nil, nil, err
123128
}
124129
vmaasCallCnt.WithLabelValues("success").Inc()
125130
repos := vmaasDataPtr.(*vmaas.ReposResponse)
@@ -148,15 +153,12 @@ func getUpdatedRepos(syncStart time.Time, modifiedSince *string, thirdParty *boo
148153

149154
for k, contentSet := range repos.RepositoryList {
150155
thirdParty := false
151-
packages := make(map[string]bool)
156+
repoPackages[k] = make([]string, 0)
152157
for _, repo := range contentSet {
153158
if repo["third_party"] == (interface{})(true) {
154159
thirdParty = true
155160
}
156-
repoPackages = append(repoPackages, getRepoUpdatedPackages(k, repo, packages)...)
157-
}
158-
if len(packages) == 0 {
159-
repoNoPackages = append(repoNoPackages, k)
161+
repoPackages[k] = getRepoUpdatedPackages(repo)
160162
}
161163

162164
if thirdParty {
@@ -173,17 +175,17 @@ func getUpdatedRepos(syncStart time.Time, modifiedSince *string, thirdParty *boo
173175
}
174176

175177
utils.LogInfo("redhat", nReposRedhat, "thirdparty", nReposThirdParty, "Repos downloading complete")
176-
return repoPackages, repoNoPackages, latestRepoChange, nil
178+
return repoPackages, latestRepoChange, nil
177179
}
178180

179-
func getRepoUpdatedPackages(contentSetName string, repo map[string]interface{}, packages map[string]bool) [][]string {
180-
var repoPackages [][]string
181+
func getRepoUpdatedPackages(repo map[string]interface{}) []string {
182+
var repoPackages []string
181183
if value, ok := repo["updated_package_names"]; ok {
182184
if updatedPkgs, ok := value.([]interface{}); ok {
185+
repoPackages = make([]string, 0, len(updatedPkgs))
183186
for _, p := range updatedPkgs {
184-
if pkg, ok := p.(string); ok && !packages[pkg] {
185-
packages[pkg] = true
186-
repoPackages = append(repoPackages, []string{contentSetName, pkg})
187+
if pkg, ok := p.(string); ok {
188+
repoPackages = append(repoPackages, pkg)
187189
}
188190
}
189191
}

tasks/vmaas_sync/repo_sync.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,11 @@ import (
99

1010
func syncRepos(syncStart time.Time) error {
1111
// mark non-thirdparty repos known to vmaas
12-
thirdParty := false
13-
repoPackages, repoNoPackages, _, err := getUpdatedRepos(syncStart, nil, &thirdParty)
12+
redhatRepos, err := getUpdatedRepos(syncStart)
1413
if err != nil {
1514
return err
1615
}
1716

18-
redhatRepos := repoNoPackages
19-
for _, repoPkg := range repoPackages {
20-
redhatRepos = append(redhatRepos, repoPkg[0])
21-
}
22-
2317
if len(redhatRepos) == 0 {
2418
return nil
2519
}

tasks/vmaas_sync/vmaas_sync.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,17 @@ func SyncData(lastModifiedTS *types.Rfc3339TimestampWithZ, vmaasExportedTS *type
104104
}
105105
}
106106

107-
// refresh caches
108-
caches.RefreshAdvisoryCaches()
109-
110107
database.UpdateTimestampKVValue(LastSync, syncStart)
111108
if lastModified == nil {
112109
database.UpdateTimestampKVValue(LastFullSync, syncStart)
113110
}
114111
if vmaasExportedTS != nil {
115112
database.UpdateTimestampKVValue(VmaasExported, *vmaasExportedTS.Time())
116113
}
114+
115+
// refresh caches
116+
caches.RefreshAdvisoryCaches()
117+
117118
utils.LogInfo("Data sync finished successfully")
118119
return nil
119120
}

0 commit comments

Comments
 (0)