Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions base/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func initKafkaFromClowder() {
translateTopic(&CoreCfg.NotificationsTopic)
translateTopic(&CoreCfg.TemplateTopic)
translateTopic(&CoreCfg.InventoryViewsTopic)
translateTopic(&CoreCfg.AdvisoryUpdateTopic)
}
}

Expand Down Expand Up @@ -412,6 +413,7 @@ func printKafkaParams() {
fmt.Printf("NOTIFICATIONS_TOPIC=%s\n", CoreCfg.NotificationsTopic)
fmt.Printf("TEMPLATE_TOPIC=%s\n", CoreCfg.TemplateTopic)
fmt.Printf("INVENTORY_VIEWS_TOPIC=%s\n", CoreCfg.InventoryViewsTopic)
fmt.Printf("ADVISORY_UPDATE_TOPIC=%s\n", CoreCfg.AdvisoryUpdateTopic)
}

func printServicesParams() {
Expand Down
3 changes: 3 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ objects:
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_UPLOAD}'}
Expand Down Expand Up @@ -252,6 +253,7 @@ objects:
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_RECALC}'}
Expand Down Expand Up @@ -299,6 +301,7 @@ objects:
- {name: REMEDIATIONS_UPDATE_TOPIC, value: 'platform.remediation-updates.patch'}
- {name: NOTIFICATIONS_TOPIC, value: 'platform.notifications.ingress'}
- {name: INVENTORY_VIEWS_TOPIC, value: '${INVENTORY_VIEWS_TOPIC}'}
- {name: ADVISORY_UPDATE_TOPIC, value: 'patchman.advisory.update'}
- {name: SSL_CERT_DIR, value: '${SSL_CERT_DIR}'}
- {name: GOGC, value: '${GOGC}'} # set garbage collection limit for go 1.18
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_EVALUATOR_USER_EVALUATION}'}
Expand Down
78 changes: 78 additions & 0 deletions evaluator/advisory_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package evaluator

import (
"app/base"
"app/base/models"
"app/base/mqueue"
"app/base/types"
"app/base/utils"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
)

var advisoryUpdatePublisher mqueue.Writer

func configureAdvisoryUpdates() {
if topic := utils.CoreCfg.AdvisoryUpdateTopic; topic != "" {
advisoryUpdatePublisher = mqueue.NewKafkaWriterFromEnv(topic)
}
}

func getChangedAdvisoryIDs(advisoriesByName extendedAdvisoryMap) []int64 {
ids := make([]int64, 0, len(advisoriesByName))
for _, advisory := range advisoriesByName {
if advisory.change != Keep {
ids = append(ids, advisory.AdvisoryID)
}
}
return ids
}

func createAdvisoryUpdateEvent(system *models.SystemPlatformV2, advisoryIDs []int64) mqueue.AdvisoryUpdateEvent {
var workspaceID uuid.UUID
if system.Inventory.Workspaces != nil && len(*system.Inventory.Workspaces) > 0 {
parsed, err := uuid.Parse((*system.Inventory.Workspaces)[0].ID)
if err != nil {
utils.LogWarn("inventoryID", system.GetInventoryID(), "err", err.Error(), "unable to parse workspace ID")
} else {
workspaceID = parsed
}
} else {
utils.LogWarn("inventoryID", system.GetInventoryID(), "no workspaces for system")
}

return mqueue.AdvisoryUpdateEvent{
RhAccountID: system.Inventory.RhAccountID,
WorkspaceID: workspaceID,
AdvisoryIDs: advisoryIDs,
ProducedAt: types.Rfc3339Timestamp(time.Now()),
}
}

func publishAdvisoryUpdates(system *models.SystemPlatformV2, advisoriesByName extendedAdvisoryMap) error {
if advisoryUpdatePublisher == nil {
return nil
}

if len(advisoriesByName) == 0 {
return nil
}

defer utils.ObserveSecondsSince(time.Now(), evaluationPartDuration.WithLabelValues("advisory-update-publish"))

// Extract only the changed advisory IDs (delta) for the aggregator
advisoryIDs := getChangedAdvisoryIDs(advisoriesByName)
if len(advisoryIDs) == 0 {
return nil
}

event := createAdvisoryUpdateEvent(system, advisoryIDs)
if err := mqueue.SendMessages(base.Context, advisoryUpdatePublisher, &mqueue.AdvisoryUpdateEvents{event}); err != nil {
return errors.Wrap(err, "writing advisory update events")
}

utils.LogInfo("inventoryID", system.GetInventoryID(), "advisory update event sent successfully")
return nil
}
121 changes: 121 additions & 0 deletions evaluator/advisory_update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package evaluator

import (
"app/base/core"
"app/base/database"
"app/base/inventory"
"app/base/models"
"app/base/mqueue"
"app/base/utils"
"testing"

"github.com/bytedance/sonic"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)

func TestGetChangedAdvisoryIDs(t *testing.T) {
advisories := extendedAdvisoryMap{
"RH-1": {change: Add, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}},
"RH-2": {change: Update, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}},
"RH-3": {change: Remove, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 3}},
"RH-4": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 4}},
"RH-5": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 5}},
}

ids := getChangedAdvisoryIDs(advisories)
assert.ElementsMatch(t, []int64{1, 2, 3}, ids)
}

func TestCreateAdvisoryUpdateEvent(t *testing.T) {
wsID := "d964b282-17f6-47ab-b596-a4a34d711f04"
workspaces := inventory.Groups{{ID: wsID, Name: "test-workspace"}}
system := &models.SystemPlatformV2{
Inventory: models.SystemInventory{
ID: 1,
RhAccountID: rhAccountID,
InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
Workspaces: &workspaces,
},
Patch: models.SystemPatch{},
}

changedAdvisoryIDs := []int64{1, 2}

event := createAdvisoryUpdateEvent(system, changedAdvisoryIDs)
assert.Equal(t, rhAccountID, event.RhAccountID)
assert.Equal(t, uuid.MustParse(wsID), event.WorkspaceID)
assert.ElementsMatch(t, changedAdvisoryIDs, event.AdvisoryIDs)
assert.False(t, event.ProducedAt.Time().IsZero())
}

func TestPublishAdvisoryUpdates(t *testing.T) {
utils.SkipWithoutDB(t)
utils.SkipWithoutPlatform(t)
core.SetupTestEnvironment()

configure()
loadCache()
mockWriter := mqueue.MockKafkaWriter{}
advisoryUpdatePublisher = &mockWriter

enableAdvisoryUpdates = true
defer func() { enableAdvisoryUpdates = false }()

// Remove stale rows that may remain from previous test runs
database.DeleteSystemAdvisories(t, testDBID, []int64{1, 2})
database.DeleteAdvisoryAccountData(t, rhAccountID, []int64{1, 2})

// Pair system with advisories before evaluation
oldAdvisoryIDs := []int64{1, 3, 4}
database.CreateSystemAdvisories(t, rhAccountID, testDBID, oldAdvisoryIDs)
database.CreateAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs, 1)
database.CheckCachesValid(t)

// Run evaluation
err := evaluateHandler(mqueue.PlatformEvent{
SystemIDs: []uuid.UUID{testInventoryID},
RequestIDs: []string{"request-1"},
OrgID: &orgID,
AccountID: rhAccountID})
assert.NoError(t, err)

// Verify evaluated advisories exist in DB and get their IDs for cleanup
evaluatedAdvisoryNames := []string{"RH-1", "RH-2", "RH-100"}
evaluatedAdvisoryIDs := database.CheckAdvisoriesInDB(t, evaluatedAdvisoryNames)

// Verify published Kafka message contains correct payload
assert.GreaterOrEqual(t, len(mockWriter.Messages), 1)
var event mqueue.AdvisoryUpdateEvent
assert.NoError(t, sonic.Unmarshal(mockWriter.Messages[0].Value, &event))
assert.Equal(t, rhAccountID, event.RhAccountID)
assert.NotEmpty(t, event.AdvisoryIDs) // RH-100 gets a dynamic ID via lazy-save, so we only check delta is non-empty
assert.False(t, event.ProducedAt.Time().IsZero())

database.DeleteSystemAdvisories(t, testDBID, evaluatedAdvisoryIDs)
database.DeleteAdvisoryAccountData(t, rhAccountID, evaluatedAdvisoryIDs)
database.DeleteAdvisoryAccountData(t, rhAccountID, oldAdvisoryIDs)
}

func TestPublishAdvisoryUpdatesNoDelta(t *testing.T) {
mockWriter := mqueue.MockKafkaWriter{}
advisoryUpdatePublisher = &mockWriter

system := &models.SystemPlatformV2{
Inventory: models.SystemInventory{
ID: 1,
RhAccountID: rhAccountID,
InventoryID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
},
Patch: models.SystemPatch{},
}

advisories := extendedAdvisoryMap{
"RH-1": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 1}},
"RH-2": {change: Keep, SystemAdvisories: models.SystemAdvisories{AdvisoryID: 2}},
}

err := publishAdvisoryUpdates(system, advisories)
assert.NoError(t, err)
assert.Empty(t, mockWriter.Messages)
}
19 changes: 17 additions & 2 deletions evaluator/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
nEvalGoroutines int
enableInstantNotifications bool
enableInventoryViews bool
enableAdvisoryUpdates bool
enableSatelliteFunctionality bool
errVmaasBadRequest = errors.New("vmaas bad request")
)
Expand All @@ -69,7 +70,7 @@ const WarnPayloadTracker = "unable to send message to payload tracker"

func configure() {
core.ConfigureApp()
confugureEvaluator()
configureEvaluator()
evalTopic = utils.FailIfEmpty(utils.CoreCfg.EvalTopic, "EVAL_TOPIC")
ptTopic = utils.FailIfEmpty(utils.CoreCfg.PayloadTrackerTopic, "PAYLOAD_TRACKER_TOPIC")
ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic)
Expand All @@ -82,10 +83,11 @@ func configure() {
configureRemediations()
configureNotifications()
configureInventoryViews()
configureAdvisoryUpdates()
configureStatus()
}

func confugureEvaluator() {
func configureEvaluator() {
evalLabel = utils.FailIfEmpty(utils.PodConfig.GetString("label", ""), "label")
// Number of kafka readers for upload topic
consumerCount = utils.PodConfig.GetInt("consumer_count", 1)
Expand Down Expand Up @@ -131,6 +133,8 @@ func confugureEvaluator() {
enableInstantNotifications = utils.PodConfig.GetBool("instant_notifications", true)
// Send inventory views events
enableInventoryViews = utils.PodConfig.GetBool("inventory_views", true)
// Send advisory update events
enableAdvisoryUpdates = utils.PodConfig.GetBool("advisory_updates", false)
// Ignore templates for satellite managed systems
enableSatelliteFunctionality = utils.PodConfig.GetBool("satellite_functionality", true)
}
Expand Down Expand Up @@ -456,6 +460,8 @@ func commitWithObserve(tx *gorm.DB) error {

// EvaluateAndStore first loads advisories and packages (including change evaluation)
// and then executes all deletions, updates, and insertions in a single transaction.

//nolint:funlen
func evaluateAndStore(system *models.SystemPlatformV2,
vmaasData *vmaas.UpdatesV3Response, event *mqueue.PlatformEvent) error {
advisoriesByName, err := lazySaveAndLoadAdvisories(system, vmaasData)
Expand Down Expand Up @@ -509,6 +515,15 @@ func evaluateAndStore(system *models.SystemPlatformV2,
}
}

if enableAdvisoryUpdates {
err = publishAdvisoryUpdates(system, advisoriesByName)
if err != nil {
evaluationCnt.WithLabelValues("error-advisory-update-publish").Inc()
utils.LogError("orgID", event.GetOrgID(), "inventoryID", system.GetInventoryID(), "err", err.Error(),
"publishing advisory update event failed")
}
}

err = commitWithObserve(tx)
if err != nil {
evaluationCnt.WithLabelValues("error-database-commit").Inc()
Expand Down
Loading