Skip to content

Commit 1f23bfc

Browse files
committed
fix: Recover Hutch worker when consumer channel is closed due to delivery_ack_timeout?
1 parent a05fc29 commit 1f23bfc

4 files changed

Lines changed: 163 additions & 1 deletion

File tree

Gemfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ ruby '>= 2.7.0'
44

55
gemspec
66

7+
gem 'bunny', github: 'ruby-amqp/bunny', branch: 'main'
8+
79
group :development do
810
gem "rake"
911
gem "guard", "~> 2.14", platform: :mri

lib/hutch/adapters/bunny.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class BunnyAdapter
1111
ConnectionRefused = Bunny::TCPConnectionFailed
1212
PreconditionFailed = Bunny::PreconditionFailed
1313

14-
def_delegators :@connection, :start, :disconnect, :close, :create_channel, :open?
14+
def_delegators :@connection, :start, :disconnect, :close, :create_channel, :open?, :recover_channel_topology
1515

1616
def initialize(opts={})
1717
@connection = Bunny.new(opts)

lib/hutch/broker.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require 'hutch/logging'
55
require 'hutch/exceptions'
66
require 'hutch/publisher'
7+
require 'thread'
78

89
module Hutch
910
class Broker
@@ -113,13 +114,33 @@ def open_channel
113114
logger.info 'enabling publisher confirms'
114115
ch.confirm_select
115116
end
117+
118+
install_channel_recovery!(ch)
116119
end
117120
end
118121

119122
def open_channel!
120123
@channel = open_channel
121124
end
122125

126+
def install_channel_recovery!(ch)
127+
ch.on_error do |channel, close|
128+
next unless close.delivery_ack_timeout?
129+
130+
# Reopen performs blocking protocol operations, so run it outside Bunny's
131+
# channel error callback thread to avoid timing out waiting for OpenOk.
132+
Thread.new do
133+
begin
134+
channel.reopen
135+
connection.recover_channel_topology(channel)
136+
logger.info 'channel recovery succeeded'
137+
rescue => ex
138+
logger.error "channel recovery failed: #{ex.class}: #{ex.message}"
139+
end
140+
end
141+
end
142+
end
143+
123144
def declare_exchange(ch = channel)
124145
exchange_name = @config[:mq_exchange]
125146
exchange_type = @config[:mq_exchange_type]
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
require 'spec_helper'
2+
require 'hutch/broker'
3+
require 'hutch/worker'
4+
require 'hutch/consumer'
5+
require 'bunny'
6+
require 'json'
7+
require 'securerandom'
8+
require 'timeout'
9+
10+
describe 'channel recovery after delivery acknowledgement timeout', rabbitmq: true, adapter: :bunny do
11+
let(:log) { StringIO.new }
12+
let(:logger) { Logger.new(log) }
13+
let(:exchange_name) { "hutch.integration.exchange.#{SecureRandom.hex(4)}" }
14+
let(:queue_name) { "hutch.integration.queue.#{SecureRandom.hex(4)}" }
15+
let(:routing_key) { "hutch.integration.key.#{SecureRandom.hex(4)}" }
16+
17+
let(:processed) { [] }
18+
let(:processed_lock) { Mutex.new }
19+
let(:timed_out_once) { [false] }
20+
21+
let(:consumer_class) do
22+
msgs = processed
23+
lock = processed_lock
24+
rk = routing_key
25+
qn = queue_name
26+
timed_out = timed_out_once
27+
28+
Class.new do
29+
include Hutch::Consumer
30+
31+
consume rk
32+
queue_name qn
33+
arguments(
34+
'x-queue-type' => 'quorum',
35+
'x-consumer-timeout' => 60_000
36+
)
37+
38+
define_method(:process) do |message|
39+
if message['id'] == 'trigger-timeout' && !timed_out[0]
40+
timed_out[0] = true
41+
sleep 210
42+
end
43+
44+
lock.synchronize { msgs << message['id'] }
45+
end
46+
end
47+
end
48+
49+
let(:broker) { Hutch::Broker.new }
50+
let(:worker) { Hutch::Worker.new(broker, [consumer_class], []) }
51+
52+
let(:publisher) do
53+
Bunny.new(
54+
host: Hutch::Config[:mq_host],
55+
port: Hutch::Config[:mq_port],
56+
username: Hutch::Config[:mq_username],
57+
password: Hutch::Config[:mq_password],
58+
vhost: Hutch::Config[:mq_vhost]
59+
).tap(&:start)
60+
end
61+
62+
let(:publisher_channel) { publisher.create_channel }
63+
let(:exchange) { publisher_channel.topic(exchange_name, durable: true) }
64+
65+
before do
66+
Hutch::Logging.logger = logger
67+
Hutch::Config.set(:mq_exchange, exchange_name)
68+
Hutch::Config.set(:force_publisher_confirms, false)
69+
Hutch::Config.set(:client_logger, logger)
70+
end
71+
72+
after do
73+
publisher_channel.close rescue nil
74+
publisher.close rescue nil
75+
broker.disconnect rescue nil
76+
Hutch::Logging.logger = Logger.new(File::NULL)
77+
end
78+
79+
def wait_for(timeout, label)
80+
Timeout.timeout(timeout) do
81+
loop do
82+
return true if yield
83+
sleep 0.25
84+
end
85+
end
86+
rescue Timeout::Error
87+
raise <<~MSG
88+
Timed out waiting for: #{label}
89+
90+
processed_messages=#{processed_messages.inspect}
91+
channel_open=#{broker.channel.open? rescue 'unknown'}
92+
channel_closed=#{broker.channel.closed? rescue 'unknown'}
93+
94+
log output:
95+
#{log_output}
96+
MSG
97+
end
98+
99+
def processed_messages
100+
processed_lock.synchronize { processed.dup }
101+
end
102+
103+
def log_output
104+
log.rewind
105+
log.read
106+
end
107+
108+
def publish_message(id)
109+
exchange.publish(
110+
JSON.dump('id' => id),
111+
routing_key: routing_key,
112+
content_type: 'application/json',
113+
persistent: true
114+
)
115+
end
116+
117+
# This spec is intentionally slow because RabbitMQ enforces delivery
118+
# acknowledgement timeouts on a periodic sweep, not immediately at the deadline.
119+
it 're-subscribes and consumes later messages after RabbitMQ closes the channel for ack timeout' do
120+
broker.connect
121+
worker.setup_queues
122+
123+
publish_message('trigger-timeout')
124+
125+
wait_for(240, 'delivery acknowledgement timeout') do
126+
log_output.match?(/delivery acknowledgement on channel \d+ timed out/i)
127+
end
128+
129+
publish_message('after-recovery')
130+
131+
wait_for(90, 'after-recovery message consumption') do
132+
processed_messages.include?('after-recovery')
133+
end
134+
135+
expect(log_output).to match(/delivery acknowledgement on channel \d+ timed out/i)
136+
expect(log_output).to match(/channel recovery succeeded/i)
137+
expect(processed_messages).to include('after-recovery')
138+
end
139+
end

0 commit comments

Comments
 (0)