1+ require 'spec_helper'
2+ require 'hutch/broker'
3+ require 'hutch/worker'
4+ require 'hutch/consumer'
5+ require 'bunny'
6+ require 'json'
7+ require 'securerandom'
8+ require 'timeout'
9+
10+ describe 'channel recovery after delivery acknowledgement timeout' , rabbitmq : true , adapter : :bunny do
11+ let ( :log ) { StringIO . new }
12+ let ( :logger ) { Logger . new ( log ) }
13+ let ( :exchange_name ) { "hutch.integration.exchange.#{ SecureRandom . hex ( 4 ) } " }
14+ let ( :queue_name ) { "hutch.integration.queue.#{ SecureRandom . hex ( 4 ) } " }
15+ let ( :routing_key ) { "hutch.integration.key.#{ SecureRandom . hex ( 4 ) } " }
16+
17+ let ( :processed ) { [ ] }
18+ let ( :processed_lock ) { Mutex . new }
19+ let ( :timed_out_once ) { [ false ] }
20+
21+ let ( :consumer_class ) do
22+ msgs = processed
23+ lock = processed_lock
24+ rk = routing_key
25+ qn = queue_name
26+ timed_out = timed_out_once
27+
28+ Class . new do
29+ include Hutch ::Consumer
30+
31+ consume rk
32+ queue_name qn
33+ arguments (
34+ 'x-queue-type' => 'quorum' ,
35+ 'x-consumer-timeout' => 60_000
36+ )
37+
38+ define_method ( :process ) do |message |
39+ if message [ 'id' ] == 'trigger-timeout' && !timed_out [ 0 ]
40+ timed_out [ 0 ] = true
41+ sleep 210
42+ end
43+
44+ lock . synchronize { msgs << message [ 'id' ] }
45+ end
46+ end
47+ end
48+
49+ let ( :broker ) { Hutch ::Broker . new }
50+ let ( :worker ) { Hutch ::Worker . new ( broker , [ consumer_class ] , [ ] ) }
51+
52+ let ( :publisher ) do
53+ Bunny . new (
54+ host : Hutch ::Config [ :mq_host ] ,
55+ port : Hutch ::Config [ :mq_port ] ,
56+ username : Hutch ::Config [ :mq_username ] ,
57+ password : Hutch ::Config [ :mq_password ] ,
58+ vhost : Hutch ::Config [ :mq_vhost ]
59+ ) . tap ( &:start )
60+ end
61+
62+ let ( :publisher_channel ) { publisher . create_channel }
63+ let ( :exchange ) { publisher_channel . topic ( exchange_name , durable : true ) }
64+
65+ before do
66+ Hutch ::Logging . logger = logger
67+ Hutch ::Config . set ( :mq_exchange , exchange_name )
68+ Hutch ::Config . set ( :force_publisher_confirms , false )
69+ Hutch ::Config . set ( :client_logger , logger )
70+ end
71+
72+ after do
73+ publisher_channel . close rescue nil
74+ publisher . close rescue nil
75+ broker . disconnect rescue nil
76+ Hutch ::Logging . logger = Logger . new ( File ::NULL )
77+ end
78+
79+ def wait_for ( timeout , label )
80+ Timeout . timeout ( timeout ) do
81+ loop do
82+ return true if yield
83+ sleep 0.25
84+ end
85+ end
86+ rescue Timeout ::Error
87+ raise <<~MSG
88+ Timed out waiting for: #{ label }
89+
90+ processed_messages=#{ processed_messages . inspect }
91+ channel_open=#{ broker . channel . open? rescue 'unknown' }
92+ channel_closed=#{ broker . channel . closed? rescue 'unknown' }
93+
94+ log output:
95+ #{ log_output }
96+ MSG
97+ end
98+
99+ def processed_messages
100+ processed_lock . synchronize { processed . dup }
101+ end
102+
103+ def log_output
104+ log . rewind
105+ log . read
106+ end
107+
108+ def publish_message ( id )
109+ exchange . publish (
110+ JSON . dump ( 'id' => id ) ,
111+ routing_key : routing_key ,
112+ content_type : 'application/json' ,
113+ persistent : true
114+ )
115+ end
116+
117+ # This spec is intentionally slow because RabbitMQ enforces delivery
118+ # acknowledgement timeouts on a periodic sweep, not immediately at the deadline.
119+ it 're-subscribes and consumes later messages after RabbitMQ closes the channel for ack timeout' do
120+ broker . connect
121+ worker . setup_queues
122+
123+ publish_message ( 'trigger-timeout' )
124+
125+ wait_for ( 240 , 'delivery acknowledgement timeout' ) do
126+ log_output . match? ( /delivery acknowledgement on channel \d + timed out/i )
127+ end
128+
129+ publish_message ( 'after-recovery' )
130+
131+ wait_for ( 90 , 'after-recovery message consumption' ) do
132+ processed_messages . include? ( 'after-recovery' )
133+ end
134+
135+ expect ( log_output ) . to match ( /delivery acknowledgement on channel \d + timed out/i )
136+ expect ( log_output ) . to match ( /channel recovery succeeded/i )
137+ expect ( processed_messages ) . to include ( 'after-recovery' )
138+ end
139+ end
0 commit comments