Skip to content

Commit c39c410

Browse files
feat(pubsub): Support subscriber shutdown options
1 parent ed3535f commit c39c410

4 files changed

Lines changed: 143 additions & 22 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,15 @@ def start
140140
#
141141
# @return [MessageListener] returns self so calls can be chained.
142142
#
143-
def stop
143+
def stop shutdown_setting: :wait_for_processing, timeout: nil
144+
unless [:wait_for_processing, :nack_immediately].include? shutdown_setting
145+
raise ArgumentError, "Invalid shutdown_setting: #{shutdown_setting}"
146+
end
147+
144148
synchronize do
145149
@started = false
146150
@stopped = true
147-
@stream_pool.map(&:stop)
151+
@stream_pool.each { |s| s.stop shutdown_setting: shutdown_setting, timeout: timeout }
148152
wait_stop_buffer_thread!
149153
self
150154
end
@@ -182,8 +186,8 @@ def wait! timeout = nil
182186
#
183187
# @return [MessageListener] returns self so calls can be chained.
184188
#
185-
def stop! timeout = nil
186-
stop
189+
def stop! timeout = nil, shutdown_setting: :wait_for_processing
190+
stop shutdown_setting: shutdown_setting, timeout: timeout
187191
wait! timeout
188192
end
189193

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ def empty?
109109
end
110110
end
111111

112+
def wait_until_empty timeout = nil
113+
synchronize do
114+
if timeout
115+
target_time = Time.now + timeout
116+
while !@inventory.empty?
117+
remaining = target_time - Time.now
118+
break if remaining <= 0
119+
@wait_cond.wait remaining
120+
end
121+
else
122+
@wait_cond.wait_while { !@inventory.empty? }
123+
end
124+
end
125+
end
126+
112127
def start
113128
@background_thread ||= Thread.new { background_run }
114129

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def initialize subscriber
5757
@subscriber = subscriber
5858

5959
@request_queue = nil
60-
@stopped = nil
60+
@streaming_stopped = nil
61+
@fully_stopped = nil
6162
@paused = nil
6263
@pause_cond = new_cond
6364
@exactly_once_delivery_enabled = false
@@ -93,9 +94,9 @@ def start
9394
self
9495
end
9596

96-
def stop
97+
def stop shutdown_setting: :wait_for_processing, timeout: nil
9798
synchronize do
98-
break if @stopped
99+
break if @streaming_stopped
99100

100101
subscriber.service.logger.log :info, "subscriber-streams" do
101102
"stopping stream for subscription #{@subscriber.subscription_name}"
@@ -105,23 +106,60 @@ def stop
105106
@request_queue&.push self
106107

107108
# Signal to the background thread that we are stopped.
108-
@stopped = true
109+
@streaming_stopped = true
109110
@pause_cond.broadcast
110111

111-
# Now that the reception thread is stopped, immediately stop the
112-
# callback thread pool. All queued callbacks will see the stream
113-
# is stopped and perform a noop.
114-
@callback_thread_pool.shutdown
115-
116-
# Once all the callbacks are stopped, we can stop the inventory.
117-
@inventory.stop
112+
if shutdown_setting == :nack_immediately
113+
nack_unprocessed_messages!
114+
@fully_stopped = true
115+
@callback_thread_pool.shutdown
116+
@inventory.stop
117+
else
118+
# :wait_for_processing
119+
@shutdown_thread = Thread.new do
120+
if timeout
121+
wait_time = [timeout - 30, 0].max
122+
@inventory.wait_until_empty wait_time
123+
if !@inventory.empty?
124+
nack_unprocessed_messages!
125+
end
126+
else
127+
@inventory.wait_until_empty nil
128+
end
129+
synchronize do
130+
@fully_stopped = true
131+
@callback_thread_pool.shutdown
132+
@inventory.stop
133+
end
134+
end
135+
end
118136
end
119137

120138
self
121139
end
122140

141+
def nack_unprocessed_messages!
142+
synchronize do
143+
ack_ids = @inventory.ack_ids
144+
unless ack_ids.empty?
145+
begin
146+
subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0
147+
rescue StandardError => e
148+
subscriber.service.logger.log :error, "subscriber-streams" do
149+
"Failed to nack unprocessed messages: #{e.message}"
150+
end
151+
end
152+
@inventory.remove *ack_ids
153+
end
154+
end
155+
end
156+
123157
def stopped?
124-
synchronize { @stopped }
158+
synchronize { @streaming_stopped }
159+
end
160+
161+
def fully_stopped?
162+
synchronize { @fully_stopped }
125163
end
126164

127165
def paused?
@@ -133,6 +171,7 @@ def running?
133171
end
134172

135173
def wait! timeout = nil
174+
@shutdown_thread&.join timeout
136175
# Wait for all queued callbacks to be processed.
137176
@callback_thread_pool.wait_for_termination timeout
138177

@@ -222,15 +261,15 @@ class RestartStream < StandardError; end
222261
def background_run
223262
synchronize do
224263
# Don't allow a stream to restart if already stopped
225-
if @stopped
264+
if @streaming_stopped
226265
subscriber.service.logger.log :debug, "subscriber-streams" do
227266
"not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \
228267
" stopped"
229268
end
230269
return
231270
end
232271

233-
@stopped = false
272+
@streaming_stopped = false
234273
@paused = false
235274

236275
# signal to the previous queue to shut down
@@ -252,14 +291,14 @@ def background_run
252291

253292
loop do
254293
synchronize do
255-
if @paused && !@stopped
294+
if @paused && !@streaming_stopped
256295
@pause_cond.wait
257296
next
258297
end
259298
end
260299

261300
# Break loop, close thread if stopped
262-
break if synchronize { @stopped }
301+
break if synchronize { @streaming_stopped }
263302

264303
begin
265304
# Cannot synchronize the enumerator, causes deadlock
@@ -297,7 +336,7 @@ def background_run
297336

298337
# Has the loop broken but we aren't stopped?
299338
# Could be GRPC has thrown an internal error, so restart.
300-
raise RestartStream unless synchronize { @stopped }
339+
raise RestartStream unless synchronize { @streaming_stopped }
301340

302341
# We must be stopped, tell the stream to quit.
303342
stop
@@ -372,7 +411,7 @@ def perform_callback_sync rec_msg
372411
subscriber.service.logger.log :info, "callback-delivery" do
373412
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks"
374413
end
375-
@subscriber.callback.call rec_msg unless stopped?
414+
@subscriber.callback.call rec_msg unless fully_stopped?
376415
rescue StandardError => e
377416
subscriber.service.logger.log :info, "callback-exceptions" do
378417
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,67 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
217217
listener.stop
218218
listener.wait!
219219
end
220+
221+
it "should nack unprocessed messages when stopped with nack_immediately" do
222+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
223+
response_groups = [[pull_res1]]
224+
225+
stub = StreamingPullStub.new response_groups
226+
nacked = false
227+
228+
subscriber.service.mocked_subscription_admin = stub
229+
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
230+
if ack_deadline_seconds == 0
231+
nacked = true
232+
end
233+
nil
234+
end
235+
236+
listener = subscriber.listen streams: 1 do |msg|
237+
sleep 0.5
238+
end
239+
240+
listener.start
241+
242+
# Wait for message to be pulled and added to inventory
243+
sleep 0.1
244+
245+
listener.stop shutdown_setting: :nack_immediately
246+
listener.wait!
247+
248+
assert nacked
249+
end
250+
251+
it "should wait for processing when stopped with wait_for_processing" do
252+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
253+
response_groups = [[pull_res1]]
254+
255+
stub = StreamingPullStub.new response_groups
256+
called = false
257+
nacked = false
258+
259+
subscriber.service.mocked_subscription_admin = stub
260+
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
261+
if ack_deadline_seconds == 0
262+
nacked = true
263+
end
264+
nil
265+
end
266+
267+
listener = subscriber.listen streams: 1 do |msg|
268+
sleep 0.5
269+
called = true
270+
end
271+
272+
listener.start
273+
274+
# Wait for message to be pulled and added to inventory
275+
sleep 0.1
276+
277+
listener.stop shutdown_setting: :wait_for_processing
278+
listener.wait!
279+
280+
assert called
281+
refute nacked
282+
end
220283
end

0 commit comments

Comments
 (0)