Skip to content

Commit 4b2fa43

Browse files
Introduce a few helpers for ruby-amqp/hutch#414
1 parent 2081e23 commit 4b2fa43

7 files changed

Lines changed: 222 additions & 35 deletions

File tree

AGENTS.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,30 @@ This library targets Ruby 3.0 and later versions.
1414

1515
* Only add very important comments, both in tests and in the implementation
1616

17+
## Change Log
18+
19+
If asked to perform change log updates, consult and modify `ChangeLog.md` and stick to its
20+
existing writing style.
21+
22+
23+
## Releases
24+
25+
### How to Roll (Produce) a New Release
26+
27+
Suppose the current development version in `ChangeLog.md` has
28+
a `## Changes between Bunny X.Y.0 and X.(Y+1).0 (in development)` section at the top.
29+
30+
To produce a new release:
31+
32+
1. Update `ChangeLog.md`: replace `(in development)` with today's date, e.g. `(Mar 30, 2026)`. Make sure all notable changes since the previous release are listed
33+
2. Update the version in `lib/bunny/version.rb` to match (remove any `.pre` suffix)
34+
3. Commit with the message `X.(Y+1).0` (just the version number, nothing else)
35+
4. Tag the commit: `git tag vX.(Y+1).0`
36+
5. Bump the dev version: add a new `## Changes between Bunny X.(Y+1).0 and X.(Y+2).0 (in development)` section to `ChangeLog.md` with `No changes yet.` underneath, and update `lib/bunny/version.rb` to the next dev version with a `.pre` suffix
37+
6. Commit with the message `Bump dev version`
38+
7. Push: `git push && git push --tags`
39+
40+
1741
## Git Instructions
1842

1943
* Never add yourself to the list of commit co-authors

ChangeLog.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,24 @@ Several optimizations to reduce overhead in the consumer delivery hot path:
158158
* Frame header buffer reuse: the transport layer now reuses a buffer when reading
159159
frame headers, reducing per-frame allocations
160160

161+
### `Channel#reopen`
162+
163+
A new method that reopens a channel after a server-initiated closure
164+
(e.g. due to a consumer delivery acknowledgement timeout or an unknown delivery tag).
165+
The channel is reopened on the same connection, reusing its original channel id,
166+
and its prefetch, confirm, and transactional settings are recovered.
167+
168+
### `Session#recover_channel_topology`
169+
170+
Recovers topology (exchanges, queues, bindings, consumers) for a single channel.
171+
Intended for use after `Channel#reopen`.
172+
173+
### `amq-protocol` Bumped to `2.6.0` (or Later)
174+
175+
Bunny now requires `amq-protocol` `2.6.0` or later for the `Channel::Close`
176+
predicate methods (`#unknown_delivery_tag?`, `#delivery_ack_timeout?`, `#message_too_large?`)
177+
that Bunny used to reinvent (with regular expression matches on `reply_text`)
178+
161179
### Limit Hostname Resolution Time
162180

163181
Bunny now configures its TCP socket to limit the hostname resolution time,

Gemfile

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ gemspec
4141

4242
# Use local clones if possible.
4343
# If you want to use your local copy, just symlink it to vendor.
44-
def custom_gem(name, options = Hash.new)
44+
def custom_gem(name, *args)
45+
options = args.last.is_a?(Hash) ? args.pop : {}
4546
local_path = File.expand_path("../vendor/#{name}", __FILE__)
4647
if File.exist?(local_path)
4748
puts "Using #{name} from #{local_path}..."
4849
gem name, options.merge(path: local_path).delete_if { |key, _| [:git, :branch].include?(key) }
4950
else
50-
gem name, options
51+
gem name, *args, **options
5152
end
5253
end
5354

54-
custom_gem "amq-protocol", "~> 2.5.0"
55+
custom_gem "amq-protocol", "~> 2.6"

bunny.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Gem::Specification.new do |s|
2828
s.email = ["michael.s.klishin@gmail.com"]
2929

3030
# Dependencies
31-
s.add_runtime_dependency 'amq-protocol', '~> 2.5.1'
31+
s.add_runtime_dependency 'amq-protocol', '~> 2.6'
3232
s.add_runtime_dependency 'logger', '~> 1', '>= 1.7'
3333
s.add_runtime_dependency 'sorted_set', '~> 1', '>= 1.0.2'
3434

lib/bunny/channel.rb

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,35 @@ def close
298298
maybe_kill_consumer_work_pool!
299299
end
300300

301+
# Reopens a channel that was closed by the server (e.g. due to a consumer
302+
# delivery acknowledgement timeout). The channel is reopened on the same
303+
# connection, reusing its original channel id, and its prefetch, confirm,
304+
# and transactional settings are recovered.
305+
#
306+
# This does NOT recover topology (queues, exchanges, bindings, consumers).
307+
# Use {Bunny::Session#recover_channel_topology} for that.
308+
#
309+
# @return [Bunny::Channel] self
310+
# @see Bunny::Session#recover_channel_topology
311+
# @api public
312+
def reopen
313+
raise "Cannot reopen a channel that is not closed" unless closed?
314+
315+
existing = @connection.synchronised_find_channel(@id)
316+
if existing && existing != self
317+
raise "Channel id #{@id} has been reassigned to another channel"
318+
end
319+
320+
@work_pool = ConsumerWorkPool.new(@work_pool.size, @work_pool.abort_on_exception)
321+
@work_pool.start
322+
323+
open
324+
325+
recover_from_network_failure
326+
327+
self
328+
end
329+
301330
# @return [Boolean] true if this channel is open, false otherwise
302331
# @api public
303332
def open?
@@ -2175,14 +2204,7 @@ def handle_method(method)
21752204

21762205
# @private
21772206
def channel_level_exception_after_operation_that_has_no_response?(method)
2178-
re1 = /unknown delivery tag/
2179-
re2 = /delivery acknowledgement on channel \d+ timed out/
2180-
re3 = /larger than configured max size/
2181-
2182-
res = [re1, re2, re3]
2183-
desc = method.reply_text
2184-
2185-
method.reply_code == 406 && (res.any? { |re| desc =~ re })
2207+
method.unknown_delivery_tag? || method.delivery_ack_timeout? || method.message_too_large?
21862208
end
21872209

21882210
# @private

lib/bunny/session.rb

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,17 @@ def after_recovery_attempts_exhausted(&block)
627627
@recovery_attempts_exhausted = block
628628
end
629629

630+
# Recovers topology (exchanges, queues, bindings, consumers) for a single channel.
631+
# Intended for use after {Bunny::Channel#reopen} to restore the channel to a
632+
# working state with its original consumers.
633+
#
634+
# @param [Bunny::Channel] ch Channel whose topology should be recovered
635+
# @api public
636+
def recover_channel_topology(ch)
637+
filter = Proc.new { |entity| entity.channel == ch }
638+
recover_topology_with(filter)
639+
end
640+
630641
#
631642
# Implementation
632643
#
@@ -1045,46 +1056,63 @@ def recover_channels
10451056
# @private
10461057
def recover_topology
10471058
@logger.debug "Will recover topology now"
1048-
# The recovery sequence is the following:
1049-
# 1. Recover exchanges
1050-
@logger.debug "Will recover recorded exchanges"
1051-
@topology_registry.filtered_exchanges.reject { |x| x.predeclared? }.each do |rx|
1059+
recover_topology_with
1060+
end
1061+
1062+
# @private
1063+
def recover_topology_with(filter = nil)
1064+
exchanges = @topology_registry.filtered_exchanges.reject(&:predeclared?)
1065+
queues = @topology_registry.filtered_queues
1066+
queue_bindings = @topology_registry.filtered_queue_bindings
1067+
exchange_bindings = @topology_registry.filtered_exchange_bindings
1068+
consumers = @topology_registry.filtered_consumers
1069+
1070+
if filter
1071+
exchanges = exchanges.select(&filter)
1072+
queues = queues.select(&filter)
1073+
queue_bindings = queue_bindings.select(&filter)
1074+
exchange_bindings = exchange_bindings.select(&filter)
1075+
consumers = consumers.select(&filter)
1076+
end
1077+
1078+
@logger.debug { "Will recover #{exchanges.size} exchange(s)" }
1079+
exchanges.each do |x|
10521080
begin
1053-
recover_exchange(rx)
1081+
recover_exchange(x)
10541082
rescue Exception => e
1055-
@logger.error "Caught an exception while re-declaring exchange #{rx.name}: #{e.inspect}"
1083+
@logger.error "Caught an exception while recovering exchange #{x.name}: #{e.inspect}"
10561084
end
10571085
end
1058-
# 2. Recover queues
1059-
@logger.debug "Will recover recorded queues"
1060-
@topology_registry.filtered_queues.each do |rq|
1086+
1087+
@logger.debug { "Will recover #{queues.size} queue(s)" }
1088+
queues.each do |q|
10611089
begin
1062-
recover_queue(rq)
1090+
recover_queue(q)
10631091
rescue Exception => e
1064-
@logger.error "Caught an exception while re-declaring queue #{rq.name}: #{e.inspect}"
1092+
@logger.error "Caught an exception while recovering queue #{q.name}: #{e.inspect}"
10651093
end
10661094
end
1067-
# 3. Recover bindings
1068-
@logger.debug "Will recover recorded bindings"
1069-
@topology_registry.filtered_queue_bindings.each do |rb|
1095+
1096+
@logger.debug { "Will recover #{queue_bindings.size + exchange_bindings.size} binding(s)" }
1097+
queue_bindings.each do |b|
10701098
begin
1071-
recover_queue_binding(rb)
1099+
recover_queue_binding(b)
10721100
rescue Exception => e
1073-
@logger.error "Caught an exception while re-declaring a binding of queue #{rb.destination}: #{e.inspect}"
1101+
@logger.error "Caught an exception while recovering a binding of queue #{b.destination}: #{e.inspect}"
10741102
end
10751103
end
1076-
@topology_registry.filtered_exchange_bindings.each do |rb|
1104+
1105+
exchange_bindings.each do |b|
10771106
begin
1078-
recover_exchange_binding(rb)
1107+
recover_exchange_binding(b)
10791108
rescue Exception => e
1080-
@logger.error "Caught an exception while re-declaring a binding of exchange #{rb.source}: #{e.inspect}"
1109+
@logger.error "Caught an exception while recovering a binding of exchange #{b.source}: #{e.inspect}"
10811110
end
10821111
end
10831112

1084-
# 4. Recover consumers
1085-
@logger.debug "Will recover recorded consumers"
1086-
@topology_registry.filtered_consumers.each do |rc|
1087-
recover_consumer(rc)
1113+
@logger.debug { "Will recover #{consumers.size} consumer(s)" }
1114+
consumers.each do |c|
1115+
recover_consumer(c)
10881116
end
10891117
end
10901118

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
require "spec_helper"
2+
3+
describe Bunny::Channel, "#reopen" do
4+
let(:connection) do
5+
c = Bunny.new(username: "bunny_gem", password: "bunny_password", vhost: "bunny_testbed")
6+
c.start
7+
c
8+
end
9+
10+
after :each do
11+
connection.close if connection.open?
12+
end
13+
14+
it "reopens a channel after a server-initiated closure" do
15+
ch = connection.create_channel
16+
q = ch.queue("bunny.test.channel-reopen.#{rand}", exclusive: true)
17+
18+
ch.on_error do |closed_ch, amq_close|
19+
@channel_close = amq_close
20+
end
21+
22+
# ack with an invalid tag to force a 406 channel closure
23+
ch.ack(82, false)
24+
sleep 0.25
25+
26+
expect(@channel_close.reply_code).to eq AMQ::Protocol::PreconditionFailed::VALUE
27+
expect(@channel_close).to be_unknown_delivery_tag
28+
expect(@channel_close).not_to be_delivery_ack_timeout
29+
expect(ch).to be_closed
30+
31+
ch.reopen
32+
expect(ch).to be_open
33+
34+
# the channel should be functional again
35+
q2 = ch.queue("bunny.test.channel-reopen.after.#{rand}", exclusive: true)
36+
ch.default_exchange.publish("hello", routing_key: q2.name)
37+
sleep 0.25
38+
expect(q2.message_count).to eq 1
39+
end
40+
41+
it "recovers prefetch setting after reopen" do
42+
ch = connection.create_channel
43+
ch.prefetch(5)
44+
45+
ch.on_error do |_, _| end
46+
47+
ch.ack(82, false)
48+
sleep 0.25
49+
expect(ch).to be_closed
50+
51+
ch.reopen
52+
expect(ch).to be_open
53+
expect(ch.prefetch_count).to eq 5
54+
end
55+
56+
it "raises when called on a channel that is still open" do
57+
ch = connection.create_channel
58+
expect { ch.reopen }.to raise_error(RuntimeError, /not closed/)
59+
end
60+
61+
context "with recover_channel_topology" do
62+
it "re-registers consumers that receive new messages" do
63+
ch = connection.create_channel
64+
ch.prefetch(1)
65+
q = ch.queue("bunny.test.channel-reopen.topology.#{rand}", exclusive: true)
66+
x = ch.default_exchange
67+
68+
delivered = []
69+
q.subscribe(manual_ack: true) do |di, _props, payload|
70+
delivered << payload
71+
ch.ack(di.delivery_tag)
72+
end
73+
74+
x.publish("before", routing_key: q.name)
75+
sleep 0.5
76+
expect(delivered).to eq ["before"]
77+
78+
ch.on_error do |_, _| end
79+
80+
# force channel closure
81+
ch.ack(999, false)
82+
sleep 0.25
83+
expect(ch).to be_closed
84+
85+
ch.reopen
86+
connection.recover_channel_topology(ch)
87+
sleep 0.25
88+
89+
x.publish("after", routing_key: q.name)
90+
sleep 0.5
91+
expect(delivered).to include("after")
92+
end
93+
end
94+
end

0 commit comments

Comments
 (0)