Skip to content

Commit 7d43431

Browse files
committed
RHINENG-22325: extend KafkaMessage with Headers
1 parent f5bc928 commit 7d43431

6 files changed

Lines changed: 15 additions & 9 deletions

File tree

base/mqueue/message.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package mqueue
22

3-
import "github.com/bytedance/sonic"
3+
import (
4+
"github.com/bytedance/sonic"
5+
"github.com/segmentio/kafka-go"
6+
)
47

5-
func MessageFromJSON(k string, v interface{}) (KafkaMessage, error) {
8+
func MessageFromJSON(k string, v interface{}, h []kafka.Header) (KafkaMessage, error) {
69
var m KafkaMessage
710
var err error
811

912
m.Key = []byte(k)
13+
m.Headers = h
1014
m.Value, err = sonic.Marshal(v)
1115
return m, err
1216
}

base/mqueue/mqueue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"sync"
1313

1414
"github.com/lestrrat-go/backoff/v2"
15+
"github.com/segmentio/kafka-go"
1516
)
1617

1718
const errContextCanceled = "context canceled"
@@ -42,8 +43,9 @@ func createLoggerFunc(counter Counter) func(fmt string, args ...interface{}) {
4243
}
4344

4445
type KafkaMessage struct {
45-
Key []byte
46-
Value []byte
46+
Key []byte
47+
Value []byte
48+
Headers []kafka.Header
4749
}
4850

4951
type MessageHandler func(message KafkaMessage) error

base/mqueue/mqueue_impl_gokafka.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (t *kafkaGoReaderImpl) HandleMessages(handler MessageHandler) {
3939
}
4040
}
4141
// At this level, all errors are fatal
42-
kafkaMessage := KafkaMessage{Key: m.Key, Value: m.Value}
42+
kafkaMessage := KafkaMessage{Key: m.Key, Value: m.Value, Headers: m.Headers}
4343
if err = handler(kafkaMessage); err != nil {
4444
utils.LogPanic("err", err.Error(), "Handler failed")
4545
}
@@ -61,7 +61,7 @@ type kafkaGoWriterImpl struct {
6161
func (t *kafkaGoWriterImpl) WriteMessages(ctx context.Context, msgs ...KafkaMessage) error {
6262
kafkaGoMessages := make([]kafka.Message, len(msgs))
6363
for i, m := range msgs {
64-
kafkaGoMessages[i] = kafka.Message{Key: m.Key, Value: m.Value}
64+
kafkaGoMessages[i] = kafka.Message{Key: m.Key, Value: m.Value, Headers: m.Headers}
6565
}
6666
err := t.Writer.WriteMessages(ctx, kafkaGoMessages...)
6767
return err

evaluator/notifications.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func publishNewAdvisoriesNotification(tx *gorm.DB, system *models.SystemPlatform
120120
return errors.Wrap(err, "creating notification failed")
121121
}
122122

123-
msg, err := mqueue.MessageFromJSON(system.InventoryID, notif)
123+
msg, err := mqueue.MessageFromJSON(system.InventoryID, notif, nil)
124124
if err != nil {
125125
return errors.Wrap(err, "creating message from notification failed")
126126
}

evaluator/notifications_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestAdvisoriesNotificationMessage(t *testing.T) {
114114
assert.Equal(t, displayName, notification.Context.DisplayName)
115115
assert.Equal(t, tags, notification.Context.Tags)
116116

117-
msg, err := mqueue.MessageFromJSON(inventoryID, notification)
117+
msg, err := mqueue.MessageFromJSON(inventoryID, notification, nil)
118118
assert.Nil(t, err)
119119
assert.Equal(t, inventoryID, string(msg.Key))
120120

evaluator/remediations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func publishRemediationsState(system *models.SystemPlatform, response *vmaas.Upd
8787
}
8888

8989
state := createRemediationsStateMsg(system.InventoryID, response)
90-
msg, err := mqueue.MessageFromJSON(system.InventoryID, state)
90+
msg, err := mqueue.MessageFromJSON(system.InventoryID, state, nil)
9191
if err != nil {
9292
return errors.Wrap(err, "formatting message")
9393
}

0 commit comments

Comments
 (0)