Skip to content

Commit fa44b28

Browse files
committed
RHINENG-23045: route events based on types + test updates
1 parent 4fc2056 commit fa44b28

6 files changed

Lines changed: 58 additions & 57 deletions

File tree

listener/common_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ func getOrCreateTestAccount(t *testing.T) int {
8989
}
9090

9191
// nolint: unparam
92-
func createTestUploadEvent(orgID, inventoryID, reporter string, packages, yum bool) HostEvent {
92+
func createTestUploadEvent(orgID, inventoryID, reporter string, packages, yum bool, eventType string) HostEvent {
9393
now := time.Now()
9494
ev := HostEvent{
95-
Type: "created",
95+
Type: eventType,
9696
Host: Host{
9797
ID: inventoryID,
9898
OrgID: &orgID,

listener/event_buffers.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ var createdEventsBuffer = eventBuffer{
3030
Lock: sync.Mutex{},
3131
}
3232

33-
// var flushTimer = time.AfterFunc(87600*time.Hour, func() {
34-
// utils.LogInfo(FlushedTimeoutBuffer)
35-
// updatedEventsBuffer.flushEvalEvents()
36-
// })
37-
3833
func (b *eventBuffer) initFlushTimer(w *mqueue.Writer) {
3934
b.flushTimer = time.AfterFunc(87600*time.Hour, func() {
4035
utils.LogInfo(FlushedTimeoutBuffer)

listener/events_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestUpdateSystem(t *testing.T) {
2727
DisplayName: id,
2828
}).Error)
2929

30-
ev := createTestUploadEvent("1", id, "puptoo", false, false)
30+
ev := createTestUploadEvent("1", id, "puptoo", false, false, "created")
3131
name := "TEST_NAME"
3232
ev.Host.DisplayName = &name
3333
ev.Host.SystemProfile.InstalledPackages = &[]string{"kernel-0:4.18.0-193.1.2.el8_2.x86_64"}
@@ -112,7 +112,7 @@ func TestUploadAfterDelete(t *testing.T) {
112112
assert.NoError(t, err)
113113

114114
// upload will be skipped and system won't be created
115-
uploadEvent := createTestUploadEvent("1", id, "puptoo", true, false)
115+
uploadEvent := createTestUploadEvent("1", id, "puptoo", true, false, "created")
116116
err = HandleUpload(uploadEvent)
117117
assert.NoError(t, err)
118118
assertSystemNotInDB(t)
@@ -126,7 +126,7 @@ func TestCreateDeleteUpload(t *testing.T) {
126126
configure()
127127
deleteData(t)
128128

129-
uploadEvent := createTestUploadEvent("1", id, "puptoo", true, false)
129+
uploadEvent := createTestUploadEvent("1", id, "puptoo", true, false, "created")
130130
originalName := "UPLOADED"
131131
uploadEvent.Host.DisplayName = &originalName
132132
err := HandleUpload(uploadEvent)

listener/listener.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ func configure() {
4040
core.ConfigureApp()
4141
eventsTopic = utils.FailIfEmpty(utils.CoreCfg.EventsTopic, "EVENTS_TOPIC")
4242
evalTopic := utils.FailIfEmpty(utils.CoreCfg.EvalTopic, "EVAL_TOPIC")
43-
// createdTopic := utils.FailIfEmpty(utils.CoreCfg.CreatedSystemsTopic, "CREATED_SYSTEMS_TOPIC")
43+
createdTopic := utils.FailIfEmpty(utils.CoreCfg.CreatedSystemsTopic, "CREATED_SYSTEMS_TOPIC")
4444
ptTopic := utils.FailIfEmpty(utils.CoreCfg.PayloadTrackerTopic, "PAYLOAD_TRACKER_TOPIC")
4545
evalWriter = mqueue.NewKafkaWriterFromEnv(evalTopic)
4646
ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic)
47-
// createdSystemsWriter = mqueue.NewKafkaWriterFromEnv(createdTopic)
47+
createdSystemsWriter = mqueue.NewKafkaWriterFromEnv(createdTopic)
4848

4949
configureListener()
5050
}
@@ -76,7 +76,7 @@ func configureListener() {
7676
validReporters = loadValidReporters()
7777

7878
updatedEventsBuffer.initFlushTimer(&evalWriter)
79-
// createdEventsBuffer.initFlushTimer(createdSystemsWriter)
79+
createdEventsBuffer.initFlushTimer(&createdSystemsWriter)
8080
}
8181

8282
func loadValidReporters() map[string]int {

listener/upload.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ func HandleUpload(event HostEvent) error {
178178
}
179179

180180
ptEvent.StatusMsg = ProcessingStatus
181-
// if event.Type == "created" {
182-
// // TODO: implement
183-
// } else {
184-
updatedEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &evalWriter)
185-
// }
181+
if event.Type == "created" {
182+
createdEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &createdSystemsWriter)
183+
} else {
184+
updatedEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &evalWriter)
185+
}
186186
logAndObserve(UploadSuccess, ReceivedSuccess, &event, &ptEvent, tStart, SuccessStatus, false)
187187
return nil
188188
}

listener/upload_test.go

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -113,47 +113,52 @@ func TestUpdateSystemPlatform(t *testing.T) {
113113
deleteData(t)
114114
}
115115

116-
func TestUploadHandler(t *testing.T) {
117-
utils.SkipWithoutDB(t)
118-
utils.SkipWithoutPlatform(t)
119-
core.SetupTestEnvironment()
120-
configure()
121-
deleteData(t)
122-
123-
_ = getOrCreateTestAccount(t)
124-
event := createTestUploadEvent(id, id, "puptoo", true, false)
125-
126-
event.Host.SystemProfile.OperatingSystem = inventory.OperatingSystem{Major: 8}
127-
repos := append(event.Host.SystemProfile.GetYumRepos(), inventory.YumRepo{ID: "epel", Enabled: true})
128-
event.Host.SystemProfile.YumRepos = &repos
129-
130-
err := HandleUpload(event)
131-
assert.NoError(t, err)
132-
133-
reporterID := 1
134-
assertSystemInDB(t, id, nil, &reporterID)
135-
136-
var sys models.SystemPlatform
137-
assert.NoError(t, database.DB.Where("inventory_id = ?::uuid", id).Find(&sys).Error)
138-
after := time.Now().Add(time.Hour)
139-
sys.LastEvaluation = &after
140-
assert.NoError(t, database.DB.Save(&sys).Error)
141-
// Test that second upload did not cause re-evaluation
142-
logHook := utils.NewTestLogHook()
143-
log.AddHook(logHook)
144-
err = HandleUpload(event)
145-
assert.NoError(t, err)
146-
assertInLogs(t, UploadSuccessNoEval, logHook.LogEntries...)
147-
assertSystemReposInDB(t, sys.ID, []string{"epel-8"})
148-
deleteData(t)
116+
func TestUploadHandlerCreatedSystem(t *testing.T) {
117+
eventTypes := []string{"created", "updated"}
118+
for _, eventType := range eventTypes {
119+
t.Run(eventType, func(t *testing.T) {
120+
utils.SkipWithoutDB(t)
121+
utils.SkipWithoutPlatform(t)
122+
core.SetupTestEnvironment()
123+
configure()
124+
deleteData(t)
125+
126+
_ = getOrCreateTestAccount(t)
127+
event := createTestUploadEvent(id, id, "puptoo", true, false, eventType)
128+
129+
event.Host.SystemProfile.OperatingSystem = inventory.OperatingSystem{Major: 8}
130+
repos := append(event.Host.SystemProfile.GetYumRepos(), inventory.YumRepo{ID: "epel", Enabled: true})
131+
event.Host.SystemProfile.YumRepos = &repos
132+
133+
err := HandleUpload(event)
134+
assert.NoError(t, err)
135+
136+
reporterID := 1
137+
assertSystemInDB(t, id, nil, &reporterID)
138+
139+
var sys models.SystemPlatform
140+
assert.NoError(t, database.DB.Where("inventory_id = ?::uuid", id).Find(&sys).Error)
141+
after := time.Now().Add(time.Hour)
142+
sys.LastEvaluation = &after
143+
assert.NoError(t, database.DB.Save(&sys).Error)
144+
// Test that second upload did not cause re-evaluation
145+
logHook := utils.NewTestLogHook()
146+
log.AddHook(logHook)
147+
err = HandleUpload(event)
148+
assert.NoError(t, err)
149+
assertInLogs(t, UploadSuccessNoEval, logHook.LogEntries...)
150+
assertSystemReposInDB(t, sys.ID, []string{"epel-8"})
151+
deleteData(t)
152+
})
153+
}
149154
}
150155

151156
func TestUploadHandlerWarn(t *testing.T) {
152157
utils.SkipWithoutDB(t)
153158
configure()
154159
logHook := utils.NewTestLogHook()
155160
log.AddHook(logHook)
156-
noPkgsEvent := createTestUploadEvent("1", id, "puptoo", false, false)
161+
noPkgsEvent := createTestUploadEvent("1", id, "puptoo", false, false, "created")
157162
err := HandleUpload(noPkgsEvent)
158163
if assert.Error(t, err) {
159164
assert.ErrorIs(t, err, ErrNoPackages)
@@ -166,7 +171,7 @@ func TestUploadHandlerWarnSkipReporter(t *testing.T) {
166171
configure()
167172
logHook := utils.NewTestLogHook()
168173
log.AddHook(logHook)
169-
noPkgsEvent := createTestUploadEvent("1", id, "yupana", false, false)
174+
noPkgsEvent := createTestUploadEvent("1", id, "yupana", false, false, "created")
170175
err := HandleUpload(noPkgsEvent)
171176
if assert.Error(t, err) {
172177
assert.ErrorIs(t, err, ErrReporter)
@@ -179,7 +184,7 @@ func TestUploadHandlerWarnSkipHostType(t *testing.T) {
179184
configure()
180185
logHook := utils.NewTestLogHook()
181186
log.AddHook(logHook)
182-
event := createTestUploadEvent("1", id, "puptoo", true, false)
187+
event := createTestUploadEvent("1", id, "puptoo", true, false, "created")
183188
event.Host.SystemProfile.HostType = "edge"
184189
err := HandleUpload(event)
185190
if assert.Error(t, err) {
@@ -194,7 +199,7 @@ func TestUploadHandlerError1(t *testing.T) {
194199
configure()
195200
logHook := utils.NewTestLogHook()
196201
log.AddHook(logHook)
197-
event := createTestUploadEvent("1", id, "puptoo", true, false)
202+
event := createTestUploadEvent("1", id, "puptoo", true, false, "created")
198203
*event.Host.OrgID = ""
199204
err := HandleUpload(event)
200205
if assert.Error(t, err) {
@@ -216,10 +221,11 @@ func TestUploadHandlerError2(t *testing.T) {
216221
configure()
217222
deleteData(t)
218223
evalWriter = &erroringWriter{}
224+
createdSystemsWriter = &erroringWriter{}
219225
logHook := utils.NewTestLogHook()
220226
log.AddHook(logHook)
221227
_ = getOrCreateTestAccount(t)
222-
event := createTestUploadEvent("1", id, "puptoo", true, false)
228+
event := createTestUploadEvent("1", id, "puptoo", true, false, "created")
223229
err := HandleUpload(event)
224230
assert.Nil(t, err)
225231
time.Sleep(2 * uploadEvalTimeout)
@@ -316,7 +322,7 @@ func TestUpdateSystemPlatformYumUpdates(t *testing.T) {
316322
HTTPClient: &http.Client{},
317323
Debug: true,
318324
}
319-
hostEvent := createTestUploadEvent("1", id, "puptoo", false, true)
325+
hostEvent := createTestUploadEvent("1", id, "puptoo", false, true, "created")
320326
yumUpdates, err := getYumUpdates(hostEvent, httpClient)
321327
assert.Nil(t, err)
322328

0 commit comments

Comments
 (0)