Skip to content

Commit eef69fb

Browse files
author
Guy Baron
authored
calling channel.Cancel when worker is stopped (#149)
1 parent 4ad01a1 commit eef69fb

3 files changed

Lines changed: 43 additions & 39 deletions

File tree

gbus/metrics/message_metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ var (
1010
rejectedMessages = newRejectedMessagesCounter()
1111
)
1212

13+
//ResetRejectedMessagesCounter resets the counter intended to be used in tests only
14+
func ResetRejectedMessagesCounter() {
15+
16+
prometheus.Unregister(rejectedMessages)
17+
rejectedMessages = newRejectedMessagesCounter()
18+
}
19+
1320
//ReportRejectedMessage reports a message being rejected to the metrics counter
1421
func ReportRejectedMessage() {
1522
rejectedMessages.Inc()
@@ -28,6 +35,7 @@ func GetRejectedMessagesValue() (float64, error) {
2835
}
2936

3037
func newRejectedMessagesCounter() prometheus.Counter {
38+
3139
return promauto.NewCounter(prometheus.CounterOpts{
3240
Namespace: grabbitPrefix,
3341
Subsystem: "messages",

gbus/worker.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,14 @@ func (worker *worker) Start() error {
7373

7474
func (worker *worker) Stop() error {
7575
worker.log().Info("stopping worker")
76-
close(worker.stop) // worker.stop <- true
76+
e1 := worker.channel.Cancel(worker.consumerTag, false)
77+
e2 := worker.channel.Cancel(worker.consumerTag+"_rpc", false)
78+
if e1 != nil {
79+
return e1
80+
}
81+
if e2 != nil {
82+
return e2
83+
}
7784
return nil
7885
}
7986

@@ -192,6 +199,7 @@ func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error {
192199
if !requeue {
193200
metrics.ReportRejectedMessage()
194201
}
202+
195203
worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected")
196204
return err
197205
}
@@ -205,34 +213,16 @@ func (worker *worker) isDead(delivery amqp.Delivery) bool {
205213
}
206214

207215
func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
208-
tx, txCreateErr := worker.txProvider.New()
209-
if txCreateErr != nil {
210-
worker.log().WithError(txCreateErr).Error("failed creating new tx")
211-
worker.span.LogFields(slog.Error(txCreateErr))
212-
_ = worker.reject(true, delivery)
213-
return
214-
}
215-
err := metrics.RunHandlerWithMetric(func() error {
216-
return worker.deadletterHandler(tx, &delivery)
217-
}, worker.deadletterHandler.Name(), worker.log())
218-
219-
var reject bool
220-
if err != nil {
221-
worker.log().WithError(err).Error("failed handling deadletter")
222-
worker.span.LogFields(slog.Error(err))
223-
err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount)
224-
reject = true
225-
} else {
226-
err = worker.SafeWithRetries(tx.Commit, MaxRetryCount)
216+
txWrapper := func(tx *sql.Tx) error {
217+
handlerWrapper := func() error {
218+
return worker.deadletterHandler(tx, &delivery)
219+
}
220+
return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), worker.log())
227221
}
228222

223+
err := worker.withTx(txWrapper)
229224
if err != nil {
230-
worker.log().WithError(err).Error("Rollback/Commit deadletter handler message")
231-
worker.span.LogFields(slog.Error(err))
232-
reject = true
233-
}
234-
235-
if reject {
225+
//we reject the deelivery but requeue it so the message will not be lost and recovered to the dlq
236226
_ = worker.reject(true, delivery)
237227
} else {
238228
_ = worker.ack(delivery)

tests/bus_test.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,8 @@ func TestRPC(t *testing.T) {
228228
}
229229

230230
func TestDeadlettering(t *testing.T) {
231-
rejectedMessages, err := metrics.GetRejectedMessagesValue()
232-
if err != nil {
233-
t.Error("failed to get rejected messages value")
234-
}
231+
metrics.ResetRejectedMessagesCounter()
232+
235233
proceed := make(chan bool)
236234
poison := gbus.NewBusMessage(PoisonMessage{})
237235
service1 := createNamedBusForTest(testSvc1)
@@ -259,7 +257,7 @@ func TestDeadlettering(t *testing.T) {
259257

260258
<-proceed
261259
count, _ := metrics.GetRejectedMessagesValue()
262-
if count != rejectedMessages+1 {
260+
if count != 1 {
263261
t.Error("Should have one rejected message")
264262
}
265263

@@ -348,11 +346,7 @@ func TestReturnDeadToQueue(t *testing.T) {
348346

349347
func TestDeadLetterHandlerPanic(t *testing.T) {
350348
proceed := make(chan bool)
351-
rejectedMessages, err := metrics.GetRejectedMessagesValue()
352-
if err != nil {
353-
t.Error("failed to get rejected messages value")
354-
}
355-
349+
metrics.ResetRejectedMessagesCounter()
356350
poison := gbus.NewBusMessage(Command1{})
357351
service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
358352
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
@@ -361,6 +355,14 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
361355
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
362356
visited := false
363357
deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error {
358+
/*
359+
this handler will be called more than once since when grabbit rejects
360+
a message from a deadletter queue to rejects it with the requeu option set to
361+
true and that is why this will be called more than once even though the retry count
362+
is set to 0
363+
364+
*/
365+
364366
if !visited {
365367
visited = true
366368
panic("PANIC DEAD HANDLER aaahhh!!!!!!")
@@ -374,7 +376,7 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
374376
}
375377

376378
deadletterSvc.HandleDeadletter(deadMessageHandler)
377-
err = service1.HandleMessage(Command1{}, faultyHandler)
379+
err := service1.HandleMessage(Command1{}, faultyHandler)
378380
if err != nil {
379381
t.Error("failed to register faultyhandler")
380382
}
@@ -388,8 +390,12 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
388390
select {
389391
case <-proceed:
390392
count, _ := metrics.GetRejectedMessagesValue()
391-
if count != rejectedMessages+2 {
392-
t.Error("Should have 2 rejected messages")
393+
//we expect only 1 rejcted meessage from the counter since rejected messages that get
394+
//requeued are not reported to the metric so the counter won't be increment when the message
395+
//in the dlq gets rejected as it is rejected with the requeue option set to true
396+
if count != 1 {
397+
398+
t.Errorf("Should have 1 rejected messages but was %v", count)
393399
}
394400
case <-time.After(2 * time.Second):
395401
t.Fatal("timeout, dlq failed to reject message after handler panicked")

0 commit comments

Comments
 (0)