Skip to content

Commit 4fc2056

Browse files
committed
RHINENG-23045: refactor listener to support changing of the queue
1 parent 515671e commit 4fc2056

8 files changed

Lines changed: 124 additions & 78 deletions

File tree

base/utils/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type coreConfig struct {
6262
KafkaWriterMaxAttempts int
6363
EventsTopic string
6464
EvalTopic string
65+
CreatedSystemsTopic string
6566
PayloadTrackerTopic string
6667
RemediationUpdateTopic string
6768
NotificationsTopic string
@@ -169,6 +170,7 @@ func initKafkaFromEnv() {
169170
func initTopicsFromEnv() {
170171
CoreCfg.EventsTopic = Getenv("EVENTS_TOPIC", "")
171172
CoreCfg.EvalTopic = Getenv("EVAL_TOPIC", "")
173+
CoreCfg.CreatedSystemsTopic = Getenv("CREATED_SYSTEMS_TOPIC", "")
172174
CoreCfg.PayloadTrackerTopic = Getenv("PAYLOAD_TRACKER_TOPIC", "")
173175
CoreCfg.RemediationUpdateTopic = Getenv("REMEDIATIONS_UPDATE_TOPIC", "")
174176
CoreCfg.NotificationsTopic = Getenv("NOTIFICATIONS_TOPIC", "")

conf/common.env

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ KAFKA_READY_ADDRESS=http://kafka:9099/
1111
# set if you want to bypass kafka SSL verification
1212
#KAFKA_SSL_SKIP_VERIFY=true
1313

14-
EVAL_TOPIC=patchman.evaluator.user-evaluation
14+
EVAL_TOPIC=patchman.evaluator.upload
15+
CREATED_SYSTEMS_TOPIC=patchman.evaluator.user-evaluation
1516
EVENTS_TOPIC=platform.inventory.events
1617
NOTIFICATIONS_TOPIC=platform.notifications.ingress
1718
PAYLOAD_TRACKER_TOPIC=platform.payload-status

conf/listener.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
DB_USER=listener
22
DB_PASSWD=listener
3+
CREATED_SYSTEMS_TOPIC=patchman.evaluator.user-evaluation
34

45
POD_CONFIG=

conf/local.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ KAFKA_GROUP=patchman
2121
KAFKA_SSL_CERT=dev/kafka/secrets/ca.crt
2222
PAYLOAD_TRACKER_TOPIC=platform.payload-status
2323
EVENTS_TOPIC=platform.inventory.events
24-
EVAL_TOPIC=patchman.evaluator.user-evaluation
24+
EVAL_TOPIC=patchman.evaluator.upload
2525
TEMPLATE_TOPIC=platform.content-sources.template
2626
INVENTORY_VIEWS_TOPIC=platform.inventory.host-apps
2727

deploy/clowdapp.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ objects:
159159
- {name: KAFKA_WRITER_MAX_ATTEMPTS, value: '${KAFKA_WRITER_MAX_ATTEMPTS}'}
160160
- {name: EVENTS_TOPIC, value: platform.inventory.events}
161161
- {name: EVAL_TOPIC, value: patchman.evaluator.upload}
162+
- {name: CREATED_SYSTEMS_TOPIC, value: patchman.evaluator.user-evaluation}
162163
- {name: PAYLOAD_TRACKER_TOPIC, value: platform.payload-status}
163164
- {name: TEMPLATE_TOPIC, value: platform.content-sources.template}
164165
- {name: ENABLE_PROFILER, value: '${ENABLE_PROFILER_LISTENER}'}

listener/event_buffers.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package listener
2+
3+
import (
4+
"app/base"
5+
"app/base/mqueue"
6+
"app/base/utils"
7+
"sync"
8+
"time"
9+
)
10+
11+
// accumulate events and create group PlatformEvents to save some resources
12+
var eventBufferSize = 5 * mqueue.BatchSize
13+
14+
type eventBuffer struct {
15+
EvalBuffer mqueue.EvalDataSlice
16+
PtBuffer mqueue.PayloadTrackerEvents
17+
Lock sync.Mutex
18+
flushTimer *time.Timer
19+
}
20+
21+
var updatedEventsBuffer = eventBuffer{
22+
EvalBuffer: make(mqueue.EvalDataSlice, 0, eventBufferSize+1),
23+
PtBuffer: make(mqueue.PayloadTrackerEvents, 0, eventBufferSize+1),
24+
Lock: sync.Mutex{},
25+
}
26+
27+
var createdEventsBuffer = eventBuffer{
28+
EvalBuffer: make(mqueue.EvalDataSlice, 0, eventBufferSize+1),
29+
PtBuffer: make(mqueue.PayloadTrackerEvents, 0, eventBufferSize+1),
30+
Lock: sync.Mutex{},
31+
}
32+
33+
// var flushTimer = time.AfterFunc(87600*time.Hour, func() {
34+
// utils.LogInfo(FlushedTimeoutBuffer)
35+
// updatedEventsBuffer.flushEvalEvents()
36+
// })
37+
38+
func (b *eventBuffer) initFlushTimer(w *mqueue.Writer) {
39+
b.flushTimer = time.AfterFunc(87600*time.Hour, func() {
40+
utils.LogInfo(FlushedTimeoutBuffer)
41+
b.flushEvalEvents(w)
42+
})
43+
}
44+
45+
// send events after full buffer or timeout
46+
func (b *eventBuffer) bufferEvalEvents(
47+
inventoryID string,
48+
rhAccountID int,
49+
ptEvent *mqueue.PayloadTrackerEvent,
50+
w *mqueue.Writer,
51+
) {
52+
tStart := time.Now()
53+
defer utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-eval-events"))
54+
55+
b.Lock.Lock()
56+
evalData := mqueue.EvalData{
57+
InventoryID: inventoryID,
58+
RhAccountID: rhAccountID,
59+
OrgID: ptEvent.OrgID,
60+
RequestID: *ptEvent.RequestID,
61+
}
62+
b.EvalBuffer = append(b.EvalBuffer, evalData)
63+
b.PtBuffer = append(b.PtBuffer, *ptEvent)
64+
b.Lock.Unlock()
65+
66+
b.flushTimer.Reset(uploadEvalTimeout)
67+
if len(b.EvalBuffer) >= eventBufferSize {
68+
utils.LogInfo(FlushedFullBuffer)
69+
b.flushEvalEvents(w)
70+
}
71+
}
72+
73+
func (b *eventBuffer) flushEvalEvents(w *mqueue.Writer) {
74+
tStart := time.Now()
75+
b.Lock.Lock()
76+
defer b.Lock.Unlock()
77+
err := mqueue.SendMessages(base.Context, *w, b.EvalBuffer)
78+
if err != nil {
79+
utils.LogError("err", err.Error(), ErrorKafkaSend)
80+
}
81+
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-evaluator"))
82+
err = mqueue.SendMessages(base.Context, ptWriter, b.PtBuffer)
83+
if err != nil {
84+
utils.LogWarn("err", err.Error(), WarnPayloadTracker)
85+
}
86+
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-payload-tracker"))
87+
utils.LogDebug("evaluator_messages", len(b.EvalBuffer),
88+
"payload_tracker_messages", len(b.PtBuffer), "flushed buffers")
89+
// empty buffer
90+
b.EvalBuffer = b.EvalBuffer[:0]
91+
b.PtBuffer = b.PtBuffer[:0]
92+
}

listener/listener.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@ import (
1414
)
1515

1616
var (
17-
eventsTopic string
18-
eventsConsumers int
19-
enableTemplates bool
20-
templatesTopic string
21-
templatesConsumers int
22-
evalWriter mqueue.Writer
23-
ptWriter mqueue.Writer
24-
validReporters map[string]int
25-
allowedReporters map[string]bool
26-
excludedHostTypes map[string]bool
27-
enableBypass bool
28-
uploadEvalTimeout time.Duration
29-
deletionThreshold time.Duration
30-
useTraceLevel bool
17+
eventsTopic string
18+
eventsConsumers int
19+
enableTemplates bool
20+
templatesTopic string
21+
templatesConsumers int
22+
evalWriter mqueue.Writer
23+
createdSystemsWriter mqueue.Writer
24+
ptWriter mqueue.Writer
25+
validReporters map[string]int
26+
allowedReporters map[string]bool
27+
excludedHostTypes map[string]bool
28+
enableBypass bool
29+
uploadEvalTimeout time.Duration
30+
deletionThreshold time.Duration
31+
useTraceLevel bool
3132
)
3233

3334
const (
@@ -39,9 +40,11 @@ func configure() {
3940
core.ConfigureApp()
4041
eventsTopic = utils.FailIfEmpty(utils.CoreCfg.EventsTopic, "EVENTS_TOPIC")
4142
evalTopic := utils.FailIfEmpty(utils.CoreCfg.EvalTopic, "EVAL_TOPIC")
43+
// createdTopic := utils.FailIfEmpty(utils.CoreCfg.CreatedSystemsTopic, "CREATED_SYSTEMS_TOPIC")
4244
ptTopic := utils.FailIfEmpty(utils.CoreCfg.PayloadTrackerTopic, "PAYLOAD_TRACKER_TOPIC")
4345
evalWriter = mqueue.NewKafkaWriterFromEnv(evalTopic)
4446
ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic)
47+
// createdSystemsWriter = mqueue.NewKafkaWriterFromEnv(createdTopic)
4548

4649
configureListener()
4750
}
@@ -71,6 +74,9 @@ func configureListener() {
7174
useTraceLevel = log.IsLevelEnabled(log.TraceLevel)
7275

7376
validReporters = loadValidReporters()
77+
78+
updatedEventsBuffer.initFlushTimer(&evalWriter)
79+
// createdEventsBuffer.initFlushTimer(createdSystemsWriter)
7480
}
7581

7682
func loadValidReporters() map[string]int {

listener/upload.go

Lines changed: 5 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"net/url"
2121
"regexp"
2222
"strings"
23-
"sync"
2423
"time"
2524

2625
stdErrors "errors"
@@ -179,7 +178,11 @@ func HandleUpload(event HostEvent) error {
179178
}
180179

181180
ptEvent.StatusMsg = ProcessingStatus
182-
bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent)
181+
// if event.Type == "created" {
182+
// // TODO: implement
183+
// } else {
184+
updatedEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &evalWriter)
185+
// }
183186
logAndObserve(UploadSuccess, ReceivedSuccess, &event, &ptEvent, tStart, SuccessStatus, false)
184187
return nil
185188
}
@@ -261,66 +264,6 @@ func sendPayloadStatus(w mqueue.Writer, event mqueue.PayloadTrackerEvent, status
261264
}
262265
}
263266

264-
// accumulate events and create group PlatformEvents to save some resources
265-
var evalBufferSize = 5 * mqueue.BatchSize
266-
var eBuffer = struct {
267-
EvalBuffer mqueue.EvalDataSlice
268-
PtBuffer mqueue.PayloadTrackerEvents
269-
Lock sync.Mutex
270-
}{
271-
EvalBuffer: make(mqueue.EvalDataSlice, 0, evalBufferSize+1),
272-
PtBuffer: make(mqueue.PayloadTrackerEvents, 0, evalBufferSize+1),
273-
Lock: sync.Mutex{},
274-
}
275-
var flushTimer = time.AfterFunc(87600*time.Hour, func() {
276-
utils.LogInfo(FlushedTimeoutBuffer)
277-
flushEvalEvents()
278-
})
279-
280-
// send events after full buffer or timeout
281-
func bufferEvalEvents(inventoryID string, rhAccountID int, ptEvent *mqueue.PayloadTrackerEvent) {
282-
tStart := time.Now()
283-
defer utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-eval-events"))
284-
285-
eBuffer.Lock.Lock()
286-
evalData := mqueue.EvalData{
287-
InventoryID: inventoryID,
288-
RhAccountID: rhAccountID,
289-
OrgID: ptEvent.OrgID,
290-
RequestID: *ptEvent.RequestID,
291-
}
292-
eBuffer.EvalBuffer = append(eBuffer.EvalBuffer, evalData)
293-
eBuffer.PtBuffer = append(eBuffer.PtBuffer, *ptEvent)
294-
eBuffer.Lock.Unlock()
295-
296-
flushTimer.Reset(uploadEvalTimeout)
297-
if len(eBuffer.EvalBuffer) >= evalBufferSize {
298-
utils.LogInfo(FlushedFullBuffer)
299-
flushEvalEvents()
300-
}
301-
}
302-
303-
func flushEvalEvents() {
304-
tStart := time.Now()
305-
eBuffer.Lock.Lock()
306-
defer eBuffer.Lock.Unlock()
307-
err := mqueue.SendMessages(base.Context, evalWriter, eBuffer.EvalBuffer)
308-
if err != nil {
309-
utils.LogError("err", err.Error(), ErrorKafkaSend)
310-
}
311-
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-evaluator"))
312-
err = mqueue.SendMessages(base.Context, ptWriter, eBuffer.PtBuffer)
313-
if err != nil {
314-
utils.LogWarn("err", err.Error(), WarnPayloadTracker)
315-
}
316-
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-payload-tracker"))
317-
utils.LogDebug("evaluator_messages", len(eBuffer.EvalBuffer),
318-
"payload_tracker_messages", len(eBuffer.PtBuffer), "flushed buffers")
319-
// empty buffer
320-
eBuffer.EvalBuffer = eBuffer.EvalBuffer[:0]
321-
eBuffer.PtBuffer = eBuffer.PtBuffer[:0]
322-
}
323-
324267
func updateReporterCounter(reporter string) {
325268
if _, ok := validReporters[reporter]; ok {
326269
receivedFromReporter.WithLabelValues(reporter).Inc()

0 commit comments

Comments
 (0)