Skip to content

Commit 836fa27

Browse files
feat(pubsub): Support subscriber shutdown options
1 parent ed3535f commit 836fa27

4 files changed

Lines changed: 152 additions & 22 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,15 @@ def start
140140
#
141141
# @return [MessageListener] returns self so calls can be chained.
142142
#
143-
def stop
143+
def stop shutdown_setting: :wait_for_processing, timeout: nil
144+
unless [:wait_for_processing, :nack_immediately].include? shutdown_setting
145+
raise ArgumentError, "Invalid shutdown_setting: #{shutdown_setting}"
146+
end
147+
144148
synchronize do
145149
@started = false
146150
@stopped = true
147-
@stream_pool.map(&:stop)
151+
@stream_pool.each { |s| s.stop shutdown_setting: shutdown_setting, timeout: timeout }
148152
wait_stop_buffer_thread!
149153
self
150154
end
@@ -182,8 +186,8 @@ def wait! timeout = nil
182186
#
183187
# @return [MessageListener] returns self so calls can be chained.
184188
#
185-
def stop! timeout = nil
186-
stop
189+
def stop! timeout = nil, shutdown_setting: :wait_for_processing
190+
stop shutdown_setting: shutdown_setting, timeout: timeout
187191
wait! timeout
188192
end
189193

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ def empty?
109109
end
110110
end
111111

112+
def wait_until_empty timeout = nil
113+
synchronize do
114+
if timeout
115+
target_time = Time.now + timeout
116+
while !@inventory.empty?
117+
remaining = target_time - Time.now
118+
break if remaining <= 0
119+
@wait_cond.wait remaining
120+
end
121+
else
122+
@wait_cond.wait_while { !@inventory.empty? }
123+
end
124+
end
125+
end
126+
112127
def start
113128
@background_thread ||= Thread.new { background_run }
114129

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def initialize subscriber
5757
@subscriber = subscriber
5858

5959
@request_queue = nil
60-
@stopped = nil
60+
@streaming_stopped = nil
61+
@fully_stopped = nil
6162
@paused = nil
6263
@pause_cond = new_cond
6364
@exactly_once_delivery_enabled = false
@@ -93,9 +94,9 @@ def start
9394
self
9495
end
9596

96-
def stop
97+
def stop shutdown_setting: :wait_for_processing, timeout: nil
9798
synchronize do
98-
break if @stopped
99+
break if @streaming_stopped
99100

100101
subscriber.service.logger.log :info, "subscriber-streams" do
101102
"stopping stream for subscription #{@subscriber.subscription_name}"
@@ -105,23 +106,63 @@ def stop
105106
@request_queue&.push self
106107

107108
# Signal to the background thread that we are stopped.
108-
@stopped = true
109+
@streaming_stopped = true
109110
@pause_cond.broadcast
110111

111-
# Now that the reception thread is stopped, immediately stop the
112-
# callback thread pool. All queued callbacks will see the stream
113-
# is stopped and perform a noop.
114-
@callback_thread_pool.shutdown
115-
116-
# Once all the callbacks are stopped, we can stop the inventory.
117-
@inventory.stop
112+
if shutdown_setting == :nack_immediately
113+
nack_unprocessed_messages!
114+
@fully_stopped = true
115+
@callback_thread_pool.shutdown
116+
@inventory.stop
117+
else
118+
# :wait_for_processing
119+
@shutdown_thread = Thread.new do
120+
if timeout
121+
wait_time = [timeout - 30, 0].max
122+
@inventory.wait_until_empty wait_time
123+
if !@inventory.empty?
124+
nack_unprocessed_messages!
125+
end
126+
else
127+
@inventory.wait_until_empty nil
128+
end
129+
synchronize do
130+
@fully_stopped = true
131+
@callback_thread_pool.shutdown
132+
@inventory.stop
133+
end
134+
end
135+
end
118136
end
119137

120138
self
121139
end
122140

141+
def nack_unprocessed_messages!
142+
synchronize do
143+
ack_ids = @inventory.ack_ids
144+
puts "nack_unprocessed_messages! called, ack_ids: #{ack_ids}"
145+
unless ack_ids.empty?
146+
begin
147+
subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0
148+
puts "modify_ack_deadline succeeded"
149+
rescue StandardError => e
150+
puts "modify_ack_deadline failed: #{e.message}"
151+
subscriber.service.logger.log :error, "subscriber-streams" do
152+
"Failed to nack unprocessed messages: #{e.message}"
153+
end
154+
end
155+
@inventory.remove *ack_ids
156+
end
157+
end
158+
end
159+
123160
def stopped?
124-
synchronize { @stopped }
161+
synchronize { @streaming_stopped }
162+
end
163+
164+
def fully_stopped?
165+
synchronize { @fully_stopped }
125166
end
126167

127168
def paused?
@@ -133,6 +174,7 @@ def running?
133174
end
134175

135176
def wait! timeout = nil
177+
@shutdown_thread&.join timeout
136178
# Wait for all queued callbacks to be processed.
137179
@callback_thread_pool.wait_for_termination timeout
138180

@@ -222,15 +264,15 @@ class RestartStream < StandardError; end
222264
def background_run
223265
synchronize do
224266
# Don't allow a stream to restart if already stopped
225-
if @stopped
267+
if @streaming_stopped
226268
subscriber.service.logger.log :debug, "subscriber-streams" do
227269
"not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \
228270
" stopped"
229271
end
230272
return
231273
end
232274

233-
@stopped = false
275+
@streaming_stopped = false
234276
@paused = false
235277

236278
# signal to the previous queue to shut down
@@ -252,14 +294,14 @@ def background_run
252294

253295
loop do
254296
synchronize do
255-
if @paused && !@stopped
297+
if @paused && !@streaming_stopped
256298
@pause_cond.wait
257299
next
258300
end
259301
end
260302

261303
# Break loop, close thread if stopped
262-
break if synchronize { @stopped }
304+
break if synchronize { @streaming_stopped }
263305

264306
begin
265307
# Cannot synchronize the enumerator, causes deadlock
@@ -297,7 +339,7 @@ def background_run
297339

298340
# Has the loop broken but we aren't stopped?
299341
# Could be GRPC has thrown an internal error, so restart.
300-
raise RestartStream unless synchronize { @stopped }
342+
raise RestartStream unless synchronize { @streaming_stopped }
301343

302344
# We must be stopped, tell the stream to quit.
303345
stop
@@ -372,7 +414,7 @@ def perform_callback_sync rec_msg
372414
subscriber.service.logger.log :info, "callback-delivery" do
373415
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks"
374416
end
375-
@subscriber.callback.call rec_msg unless stopped?
417+
@subscriber.callback.call rec_msg unless fully_stopped?
376418
rescue StandardError => e
377419
subscriber.service.logger.log :info, "callback-exceptions" do
378420
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,73 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
217217
listener.stop
218218
listener.wait!
219219
end
220+
221+
it "should nack unprocessed messages when stopped with nack_immediately" do
222+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
223+
response_groups = [[pull_res1]]
224+
225+
stub = StreamingPullStub.new response_groups
226+
nacked = false
227+
228+
subscriber.service.mocked_subscription_admin = stub
229+
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
230+
puts "modify_ack_deadline called with #{ack_deadline_seconds}"
231+
if ack_deadline_seconds == 0
232+
nacked = true
233+
end
234+
nil
235+
end
236+
237+
listener = subscriber.listen streams: 1 do |msg|
238+
puts "callback started"
239+
sleep 0.5
240+
puts "callback finished"
241+
end
242+
243+
listener.start
244+
245+
# Wait for message to be pulled and added to inventory
246+
sleep 0.1
247+
puts "inventory count: #{listener.stream_pool.first.inventory.count}"
248+
249+
listener.stop shutdown_setting: :nack_immediately
250+
puts "stop called"
251+
listener.wait!
252+
puts "wait finished"
253+
254+
assert nacked
255+
end
256+
257+
it "should wait for processing when stopped with wait_for_processing" do
258+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
259+
response_groups = [[pull_res1]]
260+
261+
stub = StreamingPullStub.new response_groups
262+
called = false
263+
nacked = false
264+
265+
subscriber.service.mocked_subscription_admin = stub
266+
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
267+
if ack_deadline_seconds == 0
268+
nacked = true
269+
end
270+
nil
271+
end
272+
273+
listener = subscriber.listen streams: 1 do |msg|
274+
sleep 0.5
275+
called = true
276+
end
277+
278+
listener.start
279+
280+
# Wait for message to be pulled and added to inventory
281+
sleep 0.1
282+
283+
listener.stop shutdown_setting: :wait_for_processing
284+
listener.wait!
285+
286+
assert called
287+
refute nacked
288+
end
220289
end

0 commit comments

Comments
 (0)