Skip to content

Commit cf4a645

Browse files
Merge branch 'Garaio-REM-fix-channel-closed-error'
2 parents d00dbbd + e8b9ac8 commit cf4a645

6 files changed

Lines changed: 159 additions & 3 deletions

File tree

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ To produce a new release:
114114
4. Tag the commit: `git tag vX.Y.0`
115115
5. Bump the dev version: add a new `## X.(Y+1).0 (in development)` section to `CHANGELOG.md` with `No changes yet.` underneath, and update `lib/hutch/version.rb` to the next dev version with a `.pre` suffix
116116
6. Commit with the message `Bump dev version`
117-
7. Push: `git push && git push --tags`
117+
7. Push: `git push && git push origin vX.(Y+1).0`
118118

119119
## Git Instructions
120120

hutch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Gem::Specification.new do |gem|
66
gem.add_runtime_dependency 'march_hare', '>= 4.7.0'
77
else
88
gem.platform = Gem::Platform::RUBY
9-
gem.add_runtime_dependency 'bunny', '>= 3.0', '< 4.0'
9+
gem.add_runtime_dependency 'bunny', '>= 3.1', '< 4.0'
1010
end
1111
gem.add_runtime_dependency 'carrot-top', '~> 0.0.7'
1212
gem.add_runtime_dependency 'multi_json', '~> 1.15'

lib/hutch/adapters/bunny.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class BunnyAdapter
1111
ConnectionRefused = Bunny::TCPConnectionFailed
1212
PreconditionFailed = Bunny::PreconditionFailed
1313

14-
def_delegators :@connection, :start, :disconnect, :close, :create_channel, :open?
14+
def_delegators :@connection, :start, :disconnect, :close, :create_channel, :open?, :recover_channel_topology
1515

1616
def initialize(opts={})
1717
@connection = Bunny.new(opts)

lib/hutch/broker.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,29 @@ def open_channel
113113
logger.info 'enabling publisher confirms'
114114
ch.confirm_select
115115
end
116+
117+
install_channel_recovery!(ch)
116118
end
117119
end
118120

119121
def open_channel!
120122
@channel = open_channel
121123
end
122124

125+
def install_channel_recovery!(ch)
126+
ch.on_error do |channel, close|
127+
next unless close.delivery_ack_timeout?
128+
129+
begin
130+
channel.reopen
131+
connection.recover_channel_topology(channel)
132+
logger.warn 'recovered consumer channel after a delivery acknowledgement timeout'
133+
rescue => ex
134+
logger.error "channel recovery failed: #{ex.class}: #{ex.message}"
135+
end
136+
end
137+
end
138+
123139
def declare_exchange(ch = channel)
124140
exchange_name = @config[:mq_exchange]
125141
exchange_type = @config[:mq_exchange_type]

spec/hutch/worker_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
context 'with a configured consumer tag prefix that is too long' do
6262
let(:maximum_size) { 255 - SecureRandom.uuid.size - 1 }
6363
before { Hutch::Config.set(:consumer_tag_prefix, 'a'.*(maximum_size + 1)) }
64+
after { Hutch::Config.set(:consumer_tag_prefix, 'hutch') }
6465

6566
it 'raises an error' do
6667
expect { worker.setup_queue(consumer) }.to raise_error(/Tag must be 255 bytes long at most/)
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
require 'spec_helper'
2+
require 'hutch/broker'
3+
require 'hutch/worker'
4+
require 'hutch/consumer'
5+
require 'bunny'
6+
require 'json'
7+
require 'securerandom'
8+
require 'timeout'
9+
10+
describe 'channel recovery after delivery acknowledgement timeout', rabbitmq: true, adapter: :bunny do
11+
let(:log) { StringIO.new }
12+
let(:logger) { Logger.new(log) }
13+
let(:exchange_name) { "hutch.integration.exchange.#{SecureRandom.hex(4)}" }
14+
let(:queue_name) { "hutch.integration.queue.#{SecureRandom.hex(4)}" }
15+
let(:routing_key) { "hutch.integration.key.#{SecureRandom.hex(4)}" }
16+
17+
let(:processed) { [] }
18+
let(:processed_lock) { Mutex.new }
19+
let(:timed_out_once) { [false] }
20+
21+
let(:consumer_class) do
22+
msgs = processed
23+
lock = processed_lock
24+
rk = routing_key
25+
qn = queue_name
26+
timed_out = timed_out_once
27+
28+
Class.new do
29+
include Hutch::Consumer
30+
31+
consume rk
32+
queue_name qn
33+
arguments(
34+
'x-queue-type' => 'quorum',
35+
'x-consumer-timeout' => 60_000
36+
)
37+
38+
define_method(:process) do |message|
39+
if message['id'] == 'trigger-timeout' && !timed_out[0]
40+
timed_out[0] = true
41+
sleep 210
42+
end
43+
44+
lock.synchronize { msgs << message['id'] }
45+
end
46+
end
47+
end
48+
49+
let(:broker) { Hutch::Broker.new }
50+
let(:worker) { Hutch::Worker.new(broker, [consumer_class], []) }
51+
52+
let(:publisher) do
53+
Bunny.new(
54+
host: Hutch::Config[:mq_host],
55+
port: Hutch::Config[:mq_port],
56+
username: Hutch::Config[:mq_username],
57+
password: Hutch::Config[:mq_password],
58+
vhost: Hutch::Config[:mq_vhost]
59+
).tap(&:start)
60+
end
61+
62+
let(:publisher_channel) { publisher.create_channel }
63+
let(:exchange) { publisher_channel.topic(exchange_name, durable: true) }
64+
65+
before do
66+
Hutch::Logging.logger = logger
67+
Hutch::Config.set(:mq_exchange, exchange_name)
68+
Hutch::Config.set(:force_publisher_confirms, false)
69+
Hutch::Config.set(:client_logger, logger)
70+
end
71+
72+
after do
73+
publisher_channel.close rescue nil
74+
publisher.close rescue nil
75+
broker.disconnect rescue nil
76+
Hutch::Logging.logger = Logger.new(File::NULL)
77+
end
78+
79+
def wait_for(timeout, label)
80+
Timeout.timeout(timeout) do
81+
loop do
82+
return true if yield
83+
sleep 0.25
84+
end
85+
end
86+
rescue Timeout::Error
87+
raise <<~MSG
88+
Timed out waiting for: #{label}
89+
90+
processed_messages=#{processed_messages.inspect}
91+
channel_open=#{broker.channel.open? rescue 'unknown'}
92+
channel_closed=#{broker.channel.closed? rescue 'unknown'}
93+
94+
log output:
95+
#{log_output}
96+
MSG
97+
end
98+
99+
def processed_messages
100+
processed_lock.synchronize { processed.dup }
101+
end
102+
103+
def log_output
104+
log.rewind
105+
log.read
106+
end
107+
108+
def publish_message(id)
109+
exchange.publish(
110+
JSON.dump('id' => id),
111+
routing_key: routing_key,
112+
content_type: 'application/json',
113+
persistent: true
114+
)
115+
end
116+
117+
# This spec is intentionally slow because RabbitMQ enforces delivery
118+
# acknowledgement timeouts on a periodic sweep, not immediately at the deadline.
119+
it 're-subscribes and consumes later messages after RabbitMQ closes the channel for ack timeout' do
120+
broker.connect
121+
worker.setup_queues
122+
123+
publish_message('trigger-timeout')
124+
125+
wait_for(240, 'delivery acknowledgement timeout') do
126+
log_output.match?(/delivery acknowledgement on channel \d+ timed out/i)
127+
end
128+
129+
publish_message('after-recovery')
130+
131+
wait_for(90, 'after-recovery message consumption') do
132+
processed_messages.include?('after-recovery')
133+
end
134+
135+
expect(log_output).to match(/delivery acknowledgement on channel \d+ timed out/i)
136+
expect(log_output).to match(/recovered consumer channel after a delivery acknowledgement timeout/i)
137+
expect(processed_messages).to include('after-recovery')
138+
end
139+
end

0 commit comments

Comments
 (0)