Skip to content

Commit 9bf1c00

Browse files
committed
RHINENG-23045: refactoring eventBuffer
1 parent fa44b28 commit 9bf1c00

3 files changed

Lines changed: 39 additions & 41 deletions

File tree

listener/event_buffers.go

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,80 +8,73 @@ import (
88
"time"
99
)
1010

11-
// accumulate events and create group PlatformEvents to save some resources
12-
var eventBufferSize = 5 * mqueue.BatchSize
13-
1411
type eventBuffer struct {
15-
EvalBuffer mqueue.EvalDataSlice
16-
PtBuffer mqueue.PayloadTrackerEvents
17-
Lock sync.Mutex
12+
evalBuffer mqueue.EvalDataSlice
13+
ptBuffer mqueue.PayloadTrackerEvents
14+
lock sync.Mutex
1815
flushTimer *time.Timer
16+
evalWriter *mqueue.Writer
17+
ptWriter *mqueue.Writer
1918
}
2019

21-
var updatedEventsBuffer = eventBuffer{
22-
EvalBuffer: make(mqueue.EvalDataSlice, 0, eventBufferSize+1),
23-
PtBuffer: make(mqueue.PayloadTrackerEvents, 0, eventBufferSize+1),
24-
Lock: sync.Mutex{},
25-
}
26-
27-
var createdEventsBuffer = eventBuffer{
28-
EvalBuffer: make(mqueue.EvalDataSlice, 0, eventBufferSize+1),
29-
PtBuffer: make(mqueue.PayloadTrackerEvents, 0, eventBufferSize+1),
30-
Lock: sync.Mutex{},
31-
}
32-
33-
func (b *eventBuffer) initFlushTimer(w *mqueue.Writer) {
20+
func (b *eventBuffer) initEventBuffer(evalWriter, ptWriter *mqueue.Writer) {
21+
b.evalBuffer = make(mqueue.EvalDataSlice, 0, eventBufferSize+1)
22+
b.ptBuffer = make(mqueue.PayloadTrackerEvents, 0, eventBufferSize+1)
23+
b.lock = sync.Mutex{}
3424
b.flushTimer = time.AfterFunc(87600*time.Hour, func() {
3525
utils.LogInfo(FlushedTimeoutBuffer)
36-
b.flushEvalEvents(w)
26+
b.flushEvalEvents()
3727
})
28+
b.evalWriter = evalWriter
29+
b.ptWriter = ptWriter
3830
}
3931

4032
// send events after full buffer or timeout
4133
func (b *eventBuffer) bufferEvalEvents(
4234
inventoryID string,
4335
rhAccountID int,
4436
ptEvent *mqueue.PayloadTrackerEvent,
45-
w *mqueue.Writer,
4637
) {
4738
tStart := time.Now()
4839
defer utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-eval-events"))
4940

50-
b.Lock.Lock()
41+
b.lock.Lock()
5142
evalData := mqueue.EvalData{
5243
InventoryID: inventoryID,
5344
RhAccountID: rhAccountID,
5445
OrgID: ptEvent.OrgID,
5546
RequestID: *ptEvent.RequestID,
5647
}
57-
b.EvalBuffer = append(b.EvalBuffer, evalData)
58-
b.PtBuffer = append(b.PtBuffer, *ptEvent)
59-
b.Lock.Unlock()
48+
b.evalBuffer = append(b.evalBuffer, evalData)
49+
b.ptBuffer = append(b.ptBuffer, *ptEvent)
6050

6151
b.flushTimer.Reset(uploadEvalTimeout)
62-
if len(b.EvalBuffer) >= eventBufferSize {
52+
shouldFlush := len(b.evalBuffer) >= eventBufferSize
53+
b.lock.Unlock()
54+
55+
if shouldFlush {
6356
utils.LogInfo(FlushedFullBuffer)
64-
b.flushEvalEvents(w)
57+
b.flushEvalEvents()
6558
}
6659
}
6760

68-
func (b *eventBuffer) flushEvalEvents(w *mqueue.Writer) {
61+
func (b *eventBuffer) flushEvalEvents() {
6962
tStart := time.Now()
70-
b.Lock.Lock()
71-
defer b.Lock.Unlock()
72-
err := mqueue.SendMessages(base.Context, *w, b.EvalBuffer)
63+
b.lock.Lock()
64+
defer b.lock.Unlock()
65+
err := mqueue.SendMessages(base.Context, *b.evalWriter, b.evalBuffer)
7366
if err != nil {
7467
utils.LogError("err", err.Error(), ErrorKafkaSend)
7568
}
7669
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-evaluator"))
77-
err = mqueue.SendMessages(base.Context, ptWriter, b.PtBuffer)
70+
err = mqueue.SendMessages(base.Context, *b.ptWriter, b.ptBuffer)
7871
if err != nil {
7972
utils.LogWarn("err", err.Error(), WarnPayloadTracker)
8073
}
8174
utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("buffer-sent-payload-tracker"))
82-
utils.LogDebug("evaluator_messages", len(b.EvalBuffer),
83-
"payload_tracker_messages", len(b.PtBuffer), "flushed buffers")
75+
utils.LogDebug("evaluator_messages", len(b.evalBuffer),
76+
"payload_tracker_messages", len(b.ptBuffer), "flushed buffers")
8477
// empty buffer
85-
b.EvalBuffer = b.EvalBuffer[:0]
86-
b.PtBuffer = b.PtBuffer[:0]
78+
b.evalBuffer = b.evalBuffer[:0]
79+
b.ptBuffer = b.ptBuffer[:0]
8780
}

listener/listener.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ var (
2929
uploadEvalTimeout time.Duration
3030
deletionThreshold time.Duration
3131
useTraceLevel bool
32+
33+
// Event buffers
34+
eventBufferSize = 5 * mqueue.BatchSize
35+
updatedEventsBuffer eventBuffer
36+
createdEventsBuffer eventBuffer
3237
)
3338

3439
const (
@@ -46,6 +51,9 @@ func configure() {
4651
ptWriter = mqueue.NewKafkaWriterFromEnv(ptTopic)
4752
createdSystemsWriter = mqueue.NewKafkaWriterFromEnv(createdTopic)
4853

54+
updatedEventsBuffer.initEventBuffer(&evalWriter, &ptWriter)
55+
createdEventsBuffer.initEventBuffer(&createdSystemsWriter, &ptWriter)
56+
4957
configureListener()
5058
}
5159

@@ -74,9 +82,6 @@ func configureListener() {
7482
useTraceLevel = log.IsLevelEnabled(log.TraceLevel)
7583

7684
validReporters = loadValidReporters()
77-
78-
updatedEventsBuffer.initFlushTimer(&evalWriter)
79-
createdEventsBuffer.initFlushTimer(&createdSystemsWriter)
8085
}
8186

8287
func loadValidReporters() map[string]int {

listener/upload.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ func HandleUpload(event HostEvent) error {
179179

180180
ptEvent.StatusMsg = ProcessingStatus
181181
if event.Type == "created" {
182-
createdEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &createdSystemsWriter)
182+
createdEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent)
183183
} else {
184-
updatedEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent, &evalWriter)
184+
updatedEventsBuffer.bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent)
185185
}
186186
logAndObserve(UploadSuccess, ReceivedSuccess, &event, &ptEvent, tStart, SuccessStatus, false)
187187
return nil

0 commit comments

Comments
 (0)