From 312e979fa79f5c4926eb14b6c61e301039f18cfa Mon Sep 17 00:00:00 2001 From: Kate Zaprazna Date: Thu, 4 Jun 2026 00:56:56 +0200 Subject: [PATCH] RHINENG-26119: add support for publishing advisory update events --- base/utils/config.go | 2 + deploy/clowdapp.yaml | 3 + evaluator/advisory_update.go | 78 +++++++++++++++++++ evaluator/advisory_update_test.go | 121 ++++++++++++++++++++++++++++++ evaluator/evaluate.go | 19 ++++- 5 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 evaluator/advisory_update.go create mode 100644 evaluator/advisory_update_test.go diff --git a/base/utils/config.go b/base/utils/config.go index 447ddcca9..696dded77 100644 --- a/base/utils/config.go +++ b/base/utils/config.go @@ -260,6 +260,7 @@ func initKafkaFromClowder() { translateTopic(&CoreCfg.NotificationsTopic) translateTopic(&CoreCfg.TemplateTopic) translateTopic(&CoreCfg.InventoryViewsTopic) + translateTopic(&CoreCfg.AdvisoryUpdateTopic) } } @@ -412,6 +413,7 @@ func printKafkaParams() { fmt.Printf("NOTIFICATIONS_TOPIC=%s\n", CoreCfg.NotificationsTopic) fmt.Printf("TEMPLATE_TOPIC=%s\n", CoreCfg.TemplateTopic) fmt.Printf("INVENTORY_VIEWS_TOPIC=%s\n", CoreCfg.InventoryViewsTopic) + fmt.Printf("ADVISORY_UPDATE_TOPIC=%s\n", CoreCfg.AdvisoryUpdateTopic) } func printServicesParams() { diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 364a2272e..7d87a2d3c 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -205,6 +205,7 @@ objects: - {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'} - {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'} - {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'} + - {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'} - {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'} - {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18 - {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_UPLOAD}'} @@ -252,6 +253,7 @@ objects: - {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'} - {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'} - {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'} + - {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'} - {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'} - {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18 - {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_RECALC}'} @@ -299,6 +301,7 @@ objects: - {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'} - {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'} - {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'} + - {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'} - {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'} - {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18 - {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_USER_EVALUATION}'} diff --git a/evaluator/advisory_update.go b/evaluator/advisory_update.go new file mode 100644 index 000000000..5d411b59f --- /dev/null +++ b/evaluator/advisory_update.go @@ -0,0 +1,78 @@ +package evaluator + +import ( + "app/base" + "app/base/models" + "app/base/mqueue" + "app/base/types" + "app/base/utils" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" +) + +var advisoryUpdatePublisher mqueue.Writer + +func configureAdvisoryUpdates() { + if topic := utils.CoreCfg.AdvisoryUpdateTopic; topic != "" { + advisoryUpdatePublisher = mqueue.NewKafkaWriterFromEnv(topic) + } +} + +func getChangedAdvisoryIDs(advisoriesByName extendedAdvisoryMap) []int64 { + ids := make([]int64, 0, len(advisoriesByName)) + for _, advisory := range advisoriesByName { + if advisory.change != Keep { + ids = append(ids, advisory.AdvisoryID) + } + } + return ids +} + +func createAdvisoryUpdateEvent(system *models.SystemPlatformV2, advisoryIDs []int64) mqueue.AdvisoryUpdateEvent { + var workspaceID uuid.UUID + if system.Inventory.Workspaces != nil && len(*system.Inventory.Workspaces) > 0 { + parsed, err := uuid.Parse((*system.Inventory.Workspaces)[0].ID) + if err != nil { + utils.LogWarn("inventoryID", system.GetInventoryID(), "err", err.Error(), "unable to parse workspace ID") + } else { + workspaceID = parsed + } + } else { + utils.LogWarn("inventoryID", system.GetInventoryID(), "no workspaces for system") + } + + return mqueue.AdvisoryUpdateEvent{ + RhAccountID: system.Inventory.RhAccountID, + WorkspaceID: workspaceID, + AdvisoryIDs: advisoryIDs, + ProducedAt: types.Rfc3339Timestamp(time.Now()), + } +} + +func publishAdvisoryUpdates(system *models.SystemPlatformV2, advisoriesByName extendedAdvisoryMap) error { + if advisoryUpdatePublisher == nil { + return nil + } + + if len(advisoriesByName) == 0 { + return nil + } + + defer utils.ObserveSecondsSince(time.Now(), evaluationPartDuration.WithLabelValues("advisory-update-publish")) + + // Extract only the changed advisory IDs (delta) for the aggregator + advisoryIDs := getChangedAdvisoryIDs(advisoriesByName) + if len(advisoryIDs) == 0 { + return nil + } + + event := createAdvisoryUpdateEvent(system, advisoryIDs) + if err := mqueue.SendMessages(base.Context, advisoryUpdatePublisher, &mqueue.AdvisoryUpdateEvents{event}); err != nil { + return errors.Wrap(err, "writing advisory update events") + } + + utils.LogInfo("inventoryID", system.GetInventoryID(), "advisory update event sent successfully") + return nil +} diff --git a/evaluator/advisory_update_test.go b/evaluator/advisory_update_test.go new file mode 100644 index 000000000..6e77862ff --- /dev/null +++ b/evaluator/advisory_update_test.go @@ -0,0 +1,121 @@ +package evaluator + +import ( + "app/base/core" + "app/base/database" + "app/base/inventory" + "app/base/models" + "app/base/mqueue" + "app/base/utils" + "testing" + + "github.com/bytedance/sonic" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestGetChangedAdvisoryIDs(t *testing.T) { + advisories := extendedAdvisoryMap{ + "RH-1": {change: Add, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}}, + "RH-2": {change: Update, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}}, + "RH-3": {change: Remove, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 3}}, + "RH-4": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 4}}, + "RH-5": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 5}}, + } + + ids := getChangedAdvisoryIDs(advisories) + assert.ElementsMatch(t, []int64{1, 2, 3}, ids) +} + +func TestCreateAdvisoryUpdateEvent(t *testing.T) { + wsID := "d964b282-17f6-47ab-b596-a4a34d711f04" + workspaces := inventory.Groups{{ID: wsID, Name: "test-workspace"}} + system := &models.SystemPlatformV2{ + Inventory: models.SystemInventory{ + ID: 1, + RhAccountID: rhAccountID, + InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + Workspaces: &workspaces, + }, + Patch: models.SystemPatch{}, + } + + changedAdvisoryIDs := []int64{1, 2} + + event := createAdvisoryUpdateEvent(system, changedAdvisoryIDs) + assert.Equal(t, rhAccountID, event.RhAccountID) + assert.Equal(t, uuid.MustParse(wsID), event.WorkspaceID) + assert.ElementsMatch(t, changedAdvisoryIDs, event.AdvisoryIDs) + assert.False(t, event.ProducedAt.Time().IsZero()) +} + +func TestPublishAdvisoryUpdates(t *testing.T) { + utils.SkipWithoutDB(t) + utils.SkipWithoutPlatform(t) + core.SetupTestEnvironment() + + configure() + loadCache() + mockWriter := mqueue.MockKafkaWriter{} + advisoryUpdatePublisher = &mockWriter + + enableAdvisoryUpdates = true + defer func() { enableAdvisoryUpdates = false }() + + // Remove stale rows that may remain from previous test runs + database.DeleteSystemAdvisories(t, testDBID, []int64{1, 2}) + database.DeleteAdvisoryAccountData(t, rhAccountID, []int64{1, 2}) + + // Pair system with advisories before evaluation + oldAdvisoryIDs := []int64{1, 3, 4} + database.CreateSystemAdvisories(t, rhAccountID, testDBID, oldAdvisoryIDs) + database.CreateAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs, 1) + database.CheckCachesValid(t) + + // Run evaluation + err := evaluateHandler(mqueue.PlatformEvent{ + SystemIDs: []uuid.UUID{testInventoryID}, + RequestIDs: []string{"request-1"}, + OrgID: &orgID, + AccountID: rhAccountID}) + assert.NoError(t, err) + + // Verify evaluated advisories exist in DB and get their IDs for cleanup + evaluatedAdvisoryNames := []string{"RH-1", "RH-2", "RH-100"} + evaluatedAdvisoryIDs := database.CheckAdvisoriesInDB(t, evaluatedAdvisoryNames) + + // Verify published Kafka message contains correct payload + assert.GreaterOrEqual(t, len(mockWriter.Messages), 1) + var event mqueue.AdvisoryUpdateEvent + assert.NoError(t, sonic.Unmarshal(mockWriter.Messages[0].Value, &event)) + assert.Equal(t, rhAccountID, event.RhAccountID) + assert.NotEmpty(t, event.AdvisoryIDs) // RH-100 gets a dynamic ID via lazy-save, so we only check delta is non-empty + assert.False(t, event.ProducedAt.Time().IsZero()) + + database.DeleteSystemAdvisories(t, testDBID, evaluatedAdvisoryIDs) + database.DeleteAdvisoryAccountData(t, rhAccountID, evaluatedAdvisoryIDs) + database.DeleteAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs) +} + +func TestPublishAdvisoryUpdatesNoDelta(t *testing.T) { + mockWriter := mqueue.MockKafkaWriter{} + advisoryUpdatePublisher = &mockWriter + + system := &models.SystemPlatformV2{ + Inventory: models.SystemInventory{ + ID: 1, + RhAccountID: rhAccountID, + InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + Patch: models.SystemPatch{}, + } + + advisories := extendedAdvisoryMap{ + "RH-1": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}}, + "RH-2": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}}, + } + + err := publishAdvisoryUpdates(system, advisories) + assert.NoError(t, err) + assert.Empty(t, mockWriter.Messages) +} diff --git a/evaluator/evaluate.go b/evaluator/evaluate.go index b64f3ddb1..08853b0e9 100644 --- a/evaluator/evaluate.go +++ b/evaluator/evaluate.go @@ -61,6 +61,7 @@ var ( nEvalGoroutines int enableInstantNotifications bool enableInventoryViews bool + enableAdvisoryUpdates bool enableSatelliteFunctionality bool errVmaasBadRequest = errors.New("vmaas bad request") ) @@ -69,7 +70,7 @@ const WarnPayloadTracker = "unable to send message to payload tracker" func configure() { core.ConfigureApp() - confugureEvaluator() + configureEvaluator() evalTopic = utils.FailIfEmpty(utils.CoreCfg.EvalTopic, "EVAL_TOPIC") ptTopic = utils.FailIfEmpty(utils.CoreCfg.PayloadTrackerTopic, "PAYLOAD_TRACKER_TOPIC") ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic) @@ -82,10 +83,11 @@ func configure() { configureRemediations() configureNotifications() configureInventoryViews() + configureAdvisoryUpdates() configureStatus() } -func confugureEvaluator() { +func configureEvaluator() { evalLabel = utils.FailIfEmpty(utils.PodConfig.GetString("label", ""), "label") // Number of kafka readers for upload topic consumerCount = utils.PodConfig.GetInt("consumer_count", 1) @@ -131,6 +133,8 @@ func confugureEvaluator() { enableInstantNotifications = utils.PodConfig.GetBool("instant_notifications", true) // Send inventory views events enableInventoryViews = utils.PodConfig.GetBool("inventory_views", true) + // Send advisory update events + enableAdvisoryUpdates = utils.PodConfig.GetBool("advisory_updates", false) // Ignore templates for satellite managed systems enableSatelliteFunctionality = utils.PodConfig.GetBool("satellite_functionality", true) } @@ -456,6 +460,8 @@ func commitWithObserve(tx *gorm.DB) error { // EvaluateAndStore first loads advisories and packages (including change evaluation) // and then executes all deletions, updates, and insertions in a single transaction. + +//nolint:funlen func evaluateAndStore(system *models.SystemPlatformV2, vmaasData *vmaas.UpdatesV3Response, event *mqueue.PlatformEvent) error { advisoriesByName, err := lazySaveAndLoadAdvisories(system, vmaasData) @@ -509,6 +515,15 @@ func evaluateAndStore(system *models.SystemPlatformV2, } } + if enableAdvisoryUpdates { + err = publishAdvisoryUpdates(system, advisoriesByName) + if err != nil { + evaluationCnt.WithLabelValues("error-advisory-update-publish").Inc() + utils.LogError("orgID", event.GetOrgID(), "inventoryID", system.GetInventoryID(), "err", err.Error(), + "publishing advisory update event failed") + } + } + err = commitWithObserve(tx) if err != nil { evaluationCnt.WithLabelValues("error-database-commit").Inc()