Skip to content

Commit e1ac9dc

Browse files
adiweissGuy Baron
authored andcommitted
Handle empty body messages (#147)
1 parent eef69fb commit e1ac9dc

2 files changed

Lines changed: 201 additions & 69 deletions

File tree

gbus/worker.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,13 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<
102102
func (worker *worker) consumeMessages() {
103103

104104
for msg := range worker.messages {
105-
if msg.Body == nil || len(msg.Body) == 0 {
106-
continue
107-
}
108105
worker.processMessage(msg, false)
109106
}
110107
}
111108

112109
func (worker *worker) consumeRPC() {
113110

114111
for msg := range worker.rpcMessages {
115-
if msg.Body == nil || len(msg.Body) == 0 {
116-
continue
117-
}
118112
worker.processMessage(msg, true)
119113
}
120114
}
@@ -311,6 +305,17 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
311305
_ = worker.ack(delivery)
312306
return
313307
}
308+
309+
if delivery.Body == nil || len(delivery.Body) == 0 {
310+
worker.log().
311+
WithFields(
312+
logrus.Fields{"message-name": msgName}).
313+
Warn("body is missing for message. Cannot invoke handlers.")
314+
worker.span.LogFields(slog.String("grabbit", "no body found"))
315+
// if there are handlers registered for this type of message, it's a bug and the message must be rejected.
316+
_ = worker.reject(false, delivery)
317+
return
318+
}
314319
/*
315320
extract the bus message only after we are sure there are registered handlers since
316321
it includes deserializing the amqp payload which we want to avoid if no handlers are found

0 commit comments

Comments
 (0)