Skip to content

Commit ed405ba

Browse files
RHINENG-26119: add support for publishing advisory update events
1 parent cf25d8a commit ed405ba

5 files changed

Lines changed: 222 additions & 2 deletions

File tree

base/utils/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ func initKafkaFromClowder() {
260260
translateTopic(&CoreCfg.NotificationsTopic)
261261
translateTopic(&CoreCfg.TemplateTopic)
262262
translateTopic(&CoreCfg.InventoryViewsTopic)
263+
translateTopic(&CoreCfg.AdvisoryUpdateTopic)
263264
}
264265
}
265266

@@ -412,6 +413,7 @@ func printKafkaParams() {
412413
fmt.Printf("NOTIFICATIONS_TOPIC=%s\n", CoreCfg.NotificationsTopic)
413414
fmt.Printf("TEMPLATE_TOPIC=%s\n", CoreCfg.TemplateTopic)
414415
fmt.Printf("INVENTORY_VIEWS_TOPIC=%s\n", CoreCfg.InventoryViewsTopic)
416+
fmt.Printf("ADVISORY_UPDATE_TOPIC=%s\n", CoreCfg.AdvisoryUpdateTopic)
415417
}
416418

417419
func printServicesParams() {

deploy/clowdapp.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ objects:
205205
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
206206
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
207207
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
208+
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
208209
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
209210
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
210211
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_UPLOAD}'}
@@ -252,6 +253,7 @@ objects:
252253
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
253254
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
254255
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
256+
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
255257
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
256258
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
257259
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_RECALC}'}
@@ -299,6 +301,7 @@ objects:
299301
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
300302
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
301303
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
304+
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
302305
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
303306
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
304307
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_USER_EVALUATION}'}

evaluator/advisory_update.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package evaluator
2+
3+
import (
4+
"app/base"
5+
"app/base/models"
6+
"app/base/mqueue"
7+
"app/base/types"
8+
"app/base/utils"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/pkg/errors"
13+
)
14+
15+
var advisoryUpdatePublisher mqueue.Writer
16+
17+
func configureAdvisoryUpdates() {
18+
if topic := utils.CoreCfg.AdvisoryUpdateTopic; topic != "" {
19+
advisoryUpdatePublisher = mqueue.NewKafkaWriterFromEnv(topic)
20+
}
21+
}
22+
23+
// Returns IDs of advisories that changed during evaluation (Add, Update, Remove), excluding unchanged (Keep).
24+
func getChangedAdvisoryIDs(advisoriesByName extendedAdvisoryMap) []int64 {
25+
ids := make([]int64, 0, len(advisoriesByName))
26+
for _, advisory := range advisoriesByName {
27+
if advisory.change != Keep {
28+
ids = append(ids, advisory.AdvisoryID)
29+
}
30+
}
31+
return ids
32+
}
33+
34+
func createAdvisoryUpdateEvent(system *models.SystemPlatformV2, advisoryIDs []int64) mqueue.AdvisoryUpdateEvent {
35+
var workspaceID uuid.UUID
36+
if system.Inventory.Workspaces != nil && len(*system.Inventory.Workspaces) > 0 {
37+
parsed, err := uuid.Parse((*system.Inventory.Workspaces)[0].ID)
38+
if err != nil {
39+
utils.LogWarn("inventoryID", system.GetInventoryID(), "err", err.Error(), "unable to parse workspace ID")
40+
} else {
41+
workspaceID = parsed
42+
}
43+
} else {
44+
utils.LogWarn("inventoryID", system.GetInventoryID(), "no workspaces for system")
45+
}
46+
47+
return mqueue.AdvisoryUpdateEvent{
48+
RhAccountID: system.Inventory.RhAccountID,
49+
WorkspaceID: workspaceID,
50+
AdvisoryIDs: advisoryIDs,
51+
ProducedAt: types.Rfc3339Timestamp(time.Now()),
52+
}
53+
}
54+
55+
func publishAdvisoryUpdates(system *models.SystemPlatformV2, advisoriesByName extendedAdvisoryMap) error {
56+
if advisoryUpdatePublisher == nil {
57+
return nil
58+
}
59+
60+
if len(advisoriesByName) == 0 {
61+
return nil
62+
}
63+
64+
defer utils.ObserveSecondsSince(time.Now(), evaluationPartDuration.WithLabelValues("advisory-update-publish"))
65+
66+
// Extract only the changed advisory IDs (delta) for the aggregator
67+
advisoryIDs := getChangedAdvisoryIDs(advisoriesByName)
68+
if len(advisoryIDs) == 0 {
69+
return nil
70+
}
71+
72+
event := createAdvisoryUpdateEvent(system, advisoryIDs)
73+
if err := mqueue.SendMessages(base.Context, advisoryUpdatePublisher, &mqueue.AdvisoryUpdateEvents{event}); err != nil {
74+
return errors.Wrap(err, "writing advisory update events")
75+
}
76+
77+
utils.LogInfo("inventoryID", system.GetInventoryID(), "advisory update event sent successfully")
78+
return nil
79+
}

evaluator/advisory_update_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package evaluator
2+
3+
import (
4+
"app/base/core"
5+
"app/base/database"
6+
"app/base/inventory"
7+
"app/base/models"
8+
"app/base/mqueue"
9+
"app/base/utils"
10+
"testing"
11+
12+
"github.com/bytedance/sonic"
13+
"github.com/google/uuid"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestGetChangedAdvisoryIDs(t *testing.T) {
18+
advisories := extendedAdvisoryMap{
19+
"RH-1": {change: Add, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}},
20+
"RH-2": {change: Update, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}},
21+
"RH-3": {change: Remove, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 3}},
22+
"RH-4": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 4}},
23+
"RH-5": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 5}},
24+
}
25+
26+
ids := getChangedAdvisoryIDs(advisories)
27+
assert.ElementsMatch(t, []int64{1, 2, 3}, ids)
28+
}
29+
30+
func TestCreateAdvisoryUpdateEvent(t *testing.T) {
31+
wsID := "d964b282-17f6-47ab-b596-a4a34d711f04"
32+
workspaces := inventory.Groups{{ID: wsID, Name: "test-workspace"}}
33+
system := &models.SystemPlatformV2{
34+
Inventory: models.SystemInventory{
35+
ID: 1,
36+
RhAccountID: rhAccountID,
37+
InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
38+
Workspaces: &workspaces,
39+
},
40+
Patch: models.SystemPatch{},
41+
}
42+
43+
changedAdvisoryIDs := []int64{1, 2}
44+
45+
event := createAdvisoryUpdateEvent(system, changedAdvisoryIDs)
46+
assert.Equal(t, rhAccountID, event.RhAccountID)
47+
assert.Equal(t, uuid.MustParse(wsID), event.WorkspaceID)
48+
assert.ElementsMatch(t, changedAdvisoryIDs, event.AdvisoryIDs)
49+
assert.False(t, event.ProducedAt.Time().IsZero())
50+
}
51+
52+
func TestPublishAdvisoryUpdates(t *testing.T) {
53+
utils.SkipWithoutDB(t)
54+
utils.SkipWithoutPlatform(t)
55+
core.SetupTestEnvironment()
56+
57+
configure()
58+
loadCache()
59+
mockWriter := mqueue.MockKafkaWriter{}
60+
advisoryUpdatePublisher = &mockWriter
61+
62+
enableAdvisoryUpdates = true
63+
defer func() { enableAdvisoryUpdates = false }()
64+
65+
// Remove stale rows that may remain from previous test runs
66+
database.DeleteSystemAdvisories(t, testDBID, []int64{1, 2})
67+
database.DeleteAdvisoryAccountData(t, rhAccountID, []int64{1, 2})
68+
69+
// Pair system with advisories before evaluation
70+
oldAdvisoryIDs := []int64{1, 3, 4}
71+
database.CreateSystemAdvisories(t, rhAccountID, testDBID, oldAdvisoryIDs)
72+
database.CreateAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs, 1)
73+
database.CheckCachesValid(t)
74+
75+
// Run evaluation
76+
err := evaluateHandler(mqueue.PlatformEvent{
77+
SystemIDs: []uuid.UUID{testInventoryID},
78+
RequestIDs: []string{"request-1"},
79+
OrgID: &orgID,
80+
AccountID: rhAccountID})
81+
assert.NoError(t, err)
82+
83+
// Verify evaluated advisories exist in DB and get their IDs for cleanup
84+
evaluatedAdvisoryNames := []string{"RH-1", "RH-2", "RH-100"}
85+
evaluatedAdvisoryIDs := database.CheckAdvisoriesInDB(t, evaluatedAdvisoryNames)
86+
87+
// Verify published Kafka message contains correct payload
88+
assert.GreaterOrEqual(t, len(mockWriter.Messages), 1)
89+
var event mqueue.AdvisoryUpdateEvent
90+
assert.NoError(t, sonic.Unmarshal(mockWriter.Messages[0].Value, &event))
91+
assert.Equal(t, rhAccountID, event.RhAccountID)
92+
assert.NotEmpty(t, event.AdvisoryIDs) // RH-100 gets a dynamic ID via lazy-save, so we only check delta is non-empty
93+
assert.False(t, event.ProducedAt.Time().IsZero())
94+
95+
database.DeleteSystemAdvisories(t, testDBID, evaluatedAdvisoryIDs)
96+
database.DeleteAdvisoryAccountData(t, rhAccountID, evaluatedAdvisoryIDs)
97+
database.DeleteAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs)
98+
}
99+
100+
func TestPublishAdvisoryUpdatesNoDelta(t *testing.T) {
101+
mockWriter := mqueue.MockKafkaWriter{}
102+
advisoryUpdatePublisher = &mockWriter
103+
104+
system := &models.SystemPlatformV2{
105+
Inventory: models.SystemInventory{
106+
ID: 1,
107+
RhAccountID: rhAccountID,
108+
InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
109+
},
110+
Patch: models.SystemPatch{},
111+
}
112+
113+
advisories := extendedAdvisoryMap{
114+
"RH-1": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}},
115+
"RH-2": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}},
116+
}
117+
118+
err := publishAdvisoryUpdates(system, advisories)
119+
assert.NoError(t, err)
120+
assert.Empty(t, mockWriter.Messages)
121+
}

evaluator/evaluate.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161
nEvalGoroutines int
6262
enableInstantNotifications bool
6363
enableInventoryViews bool
64+
enableAdvisoryUpdates bool
6465
enableSatelliteFunctionality bool
6566
errVmaasBadRequest = errors.New("vmaas bad request")
6667
)
@@ -69,7 +70,7 @@ const WarnPayloadTracker = "unable to send message to payload tracker"
6970

7071
func configure() {
7172
core.ConfigureApp()
72-
confugureEvaluator()
73+
configureEvaluator()
7374
evalTopic = utils.FailIfEmpty(utils.CoreCfg.EvalTopic, "EVAL_TOPIC")
7475
ptTopic = utils.FailIfEmpty(utils.CoreCfg.PayloadTrackerTopic, "PAYLOAD_TRACKER_TOPIC")
7576
ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic)
@@ -82,10 +83,11 @@ func configure() {
8283
configureRemediations()
8384
configureNotifications()
8485
configureInventoryViews()
86+
configureAdvisoryUpdates()
8587
configureStatus()
8688
}
8789

88-
func confugureEvaluator() {
90+
func configureEvaluator() {
8991
evalLabel = utils.FailIfEmpty(utils.PodConfig.GetString("label", ""), "label")
9092
// Number of kafka readers for upload topic
9193
consumerCount = utils.PodConfig.GetInt("consumer_count", 1)
@@ -131,6 +133,8 @@ func confugureEvaluator() {
131133
enableInstantNotifications = utils.PodConfig.GetBool("instant_notifications", true)
132134
// Send inventory views events
133135
enableInventoryViews = utils.PodConfig.GetBool("inventory_views", true)
136+
// Send advisory update events
137+
enableAdvisoryUpdates = utils.PodConfig.GetBool("advisory_updates", false)
134138
// Ignore templates for satellite managed systems
135139
enableSatelliteFunctionality = utils.PodConfig.GetBool("satellite_functionality", true)
136140
}
@@ -456,6 +460,8 @@ func commitWithObserve(tx *gorm.DB) error {
456460

457461
// EvaluateAndStore first loads advisories and packages (including change evaluation)
458462
// and then executes all deletions, updates, and insertions in a single transaction.
463+
464+
//nolint:funlen
459465
func evaluateAndStore(system *models.SystemPlatformV2,
460466
vmaasData *vmaas.UpdatesV3Response, event *mqueue.PlatformEvent) error {
461467
advisoriesByName, err := lazySaveAndLoadAdvisories(system, vmaasData)
@@ -509,6 +515,15 @@ func evaluateAndStore(system *models.SystemPlatformV2,
509515
}
510516
}
511517

518+
if enableAdvisoryUpdates {
519+
err = publishAdvisoryUpdates(system, advisoriesByName)
520+
if err != nil {
521+
evaluationCnt.WithLabelValues("error-advisory-update-publish").Inc()
522+
utils.LogError("orgID", event.GetOrgID(), "inventoryID", system.GetInventoryID(), "err", err.Error(),
523+
"publishing advisory update event failed")
524+
}
525+
}
526+
512527
err = commitWithObserve(tx)
513528
if err != nil {
514529
evaluationCnt.WithLabelValues("error-database-commit").Inc()

0 commit comments

Comments
 (0)