@@ -39,8 +39,45 @@ func (l *KafkaMessageLog) Title() string {
3939 }
4040}
4141
42- func (l * KafkaMessageLog ) Metadata () map [string ]string {
43- return nil
42+ type headerIndex struct {
43+ Key string `json:"key"`
44+ Value string `json:"value"`
45+ }
46+
47+ func (l * KafkaMessageLog ) IndexFields () map [string ]any {
48+ m := map [string ]any {
49+ "clientId" : l .ClientId ,
50+ "offset" : l .Offset ,
51+ "messageId" : l .MessageId ,
52+ "partition" : l .Partition ,
53+ }
54+
55+ if l .Key .Value != "" {
56+ m ["key" ] = l .Key .Value
57+ } else {
58+ m ["key" ] = string (l .Key .Binary )
59+ }
60+
61+ if l .Message .Value != "" {
62+ m ["message" ] = l .Message .Value
63+ } else {
64+ m ["message" ] = string (l .Message .Binary )
65+ }
66+
67+ var headers []headerIndex
68+ for k , h := range l .Headers {
69+ val := h .Value
70+ if val == "" {
71+ val = string (h .Binary )
72+ }
73+ headers = append (headers , headerIndex {
74+ Key : k ,
75+ Value : val ,
76+ })
77+ }
78+ m ["headers" ] = headers
79+
80+ return m
4481}
4582
4683func newKafkaLog (record * kafka.Record ) * KafkaMessageLog {
@@ -83,13 +120,8 @@ func (l *KafkaRequestLogEvent) Title() string {
83120 return l .Request .Title ()
84121}
85122
86- func (l * KafkaRequestLogEvent ) Metadata () map [string ]string {
87- return nil
88- }
89-
90123type KafkaRequest interface {
91124 Title () string
92- Metadata () map [string ]string
93125}
94126
95127type KafkaRequestHeader struct {
@@ -120,10 +152,6 @@ func (r *KafkaJoinGroupRequest) Title() string {
120152 return fmt .Sprintf ("JoinGroup %s" , r .GroupName )
121153}
122154
123- func (l * KafkaJoinGroupRequest ) Metadata () map [string ]string {
124- return nil
125- }
126-
127155type KafkaJoinGroupResponse struct {
128156 KafkaResponseError
129157 GenerationId int32 `json:"generationId"`
@@ -151,10 +179,6 @@ func (r *KafkaSyncGroupRequest) Title() string {
151179 return fmt .Sprintf ("SyncGroup %s" , r .GroupName )
152180}
153181
154- func (r * KafkaSyncGroupRequest ) Metadata () map [string ]string {
155- return nil
156- }
157-
158182type KafkaSyncGroupResponse struct {
159183 KafkaResponseError
160184 ProtocolType string `json:"protocolType"`
@@ -170,10 +194,6 @@ func (r *KafkaListOffsetsRequest) Title() string {
170194 return "ListOffsets"
171195}
172196
173- func (r * KafkaListOffsetsRequest ) Metadata () map [string ]string {
174- return nil
175- }
176-
177197type KafkaListOffsetsRequestPartition struct {
178198 Partition int `json:"partition"`
179199 Timestamp int64 `json:"timestamp"`
@@ -204,10 +224,6 @@ func (r *KafkaFindCoordinatorRequest) Title() string {
204224 return "FindCoordinator"
205225}
206226
207- func (r * KafkaFindCoordinatorRequest ) Metadata () map [string ]string {
208- return nil
209- }
210-
211227type KafkaFindCoordinatorResponse struct {
212228 KafkaResponseError
213229 Host string `json:"host"`
@@ -226,10 +242,6 @@ func (r *KafkaInitProducerIdRequest) Title() string {
226242 return "InitProducerId"
227243}
228244
229- func (r * KafkaInitProducerIdRequest ) Metadata () map [string ]string {
230- return nil
231- }
232-
233245type KafkaInitProducerIdResponse struct {
234246 KafkaResponseError
235247 ProducerId int64 `json:"producerId"`
0 commit comments