Skip to content

Commit 1916895

Browse files
feat(pubsub): Support Subscriber Shutdown Options
feat(pubsub): Update minimum Ruby to v3.2 and support v4.0 This introduces structured background settings to standardize how active subscribers wrap up operations upon calling `stop`. Updates `subscription.listen` to expose two new arguments: * `shutdown_behavior`: Supports `:wait_for_processing` (finishes existing messages before quitting) or `:nack_immediately` (releases outstanding leases at once). * `shutdown_timeout`: An optional threshold limit guiding the maximum timeframe for cleanup activities.
1 parent b991a55 commit 1916895

8 files changed

Lines changed: 157 additions & 28 deletions

File tree

google-cloud-pubsub/Gemfile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ end
2222
gem "autotest-suffix", "~> 1.1"
2323
gem "avro", "~> 1.12"
2424
gem "bigdecimal", ">= 3.2", "< 5"
25-
gem "google-style", "~> 1.31.1"
26-
gem "minitest", "~> 5.25"
25+
gem "google-style", "~> 1.32.0"
26+
gem "irb", "~> 1.17"
27+
gem "minitest", "~> 5.20"
2728
gem "minitest-autotest", "~> 1.1"
2829
gem "minitest-focus", "~> 1.4"
30+
gem "minitest-mock", "~> 5.27"
2931
gem "minitest-reporters", "~> 1.7.0", require: false
3032
gem "minitest-rg", "~> 5.3"
3133
gem "ostruct", "~> 0.6"

google-cloud-pubsub/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ If the logger's `progname` is not set to `"pubsub"`, these debug logs will be su
131131

132132
## Supported Ruby Versions
133133

134-
This library is supported on Ruby 3.1+.
134+
This library is supported on Ruby 3.2+.
135135

136136
Google provides official support for Ruby versions that are actively supported
137137
by Ruby Core—that is, Ruby versions that are either in normal maintenance or in

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ class MessageListener
7272
attr_reader :callback_threads
7373
attr_reader :push_threads
7474

75+
attr_reader :shutdown_behavior
76+
attr_reader :shutdown_timeout
77+
7578
##
7679
# @private Implementation attributes.
7780
attr_reader :stream_pool, :thread_pool, :buffer, :service
@@ -83,7 +86,7 @@ class MessageListener
8386
##
8487
# @private Create an empty {MessageListener} object.
8588
def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil,
86-
threads: {}, service: nil
89+
threads: {}, shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, service: nil
8790
super() # to init MonitorMixin
8891

8992
@callback = callback
@@ -95,6 +98,8 @@ def initialize subscription_name, callback, deadline: nil, message_ordering: nil
9598
@message_ordering = message_ordering
9699
@callback_threads = Integer(threads[:callback] || 8)
97100
@push_threads = Integer(threads[:push] || 4)
101+
@shutdown_behavior = shutdown_behavior || :wait_for_processing
102+
@shutdown_timeout = shutdown_timeout
98103
@exactly_once_delivery_enabled = nil
99104

100105
@service = service
@@ -140,11 +145,18 @@ def start
140145
#
141146
# @return [MessageListener] returns self so calls can be chained.
142147
#
143-
def stop
148+
def stop shutdown_behavior: nil, shutdown_timeout: nil
149+
shutdown_behavior ||= @shutdown_behavior
150+
shutdown_timeout ||= @shutdown_timeout
151+
152+
unless [:wait_for_processing, :nack_immediately].include? shutdown_behavior
153+
raise ArgumentError, "Invalid shutdown_behavior: #{shutdown_behavior}"
154+
end
155+
144156
synchronize do
145157
@started = false
146158
@stopped = true
147-
@stream_pool.map(&:stop)
159+
@stream_pool.each { |s| s.stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout }
148160
wait_stop_buffer_thread!
149161
self
150162
end
@@ -182,8 +194,10 @@ def wait! timeout = nil
182194
#
183195
# @return [MessageListener] returns self so calls can be chained.
184196
#
185-
def stop! timeout = nil
186-
stop
197+
def stop! timeout = nil, shutdown_behavior: nil, shutdown_timeout: nil
198+
shutdown_behavior ||= @shutdown_behavior
199+
shutdown_timeout ||= timeout || @shutdown_timeout
200+
stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout
187201
wait! timeout
188202
end
189203

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ def empty?
109109
end
110110
end
111111

112+
def wait_until_empty timeout = nil
113+
synchronize do
114+
if timeout
115+
target_time = Time.now + timeout
116+
while !@inventory.empty?
117+
remaining = target_time - Time.now
118+
break if remaining <= 0
119+
@wait_cond.wait remaining
120+
end
121+
else
122+
@wait_cond.wait_while { !@inventory.empty? }
123+
end
124+
end
125+
end
126+
112127
def start
113128
@background_thread ||= Thread.new { background_run }
114129

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def initialize subscriber
5757
@subscriber = subscriber
5858

5959
@request_queue = nil
60-
@stopped = nil
60+
@streaming_stopped = nil
61+
@fully_stopped = nil
6162
@paused = nil
6263
@pause_cond = new_cond
6364
@exactly_once_delivery_enabled = false
@@ -93,9 +94,9 @@ def start
9394
self
9495
end
9596

96-
def stop
97+
def stop shutdown_behavior: :wait_for_processing, shutdown_timeout: nil
9798
synchronize do
98-
break if @stopped
99+
break if @streaming_stopped
99100

100101
subscriber.service.logger.log :info, "subscriber-streams" do
101102
"stopping stream for subscription #{@subscriber.subscription_name}"
@@ -105,23 +106,60 @@ def stop
105106
@request_queue&.push self
106107

107108
# Signal to the background thread that we are stopped.
108-
@stopped = true
109+
@streaming_stopped = true
109110
@pause_cond.broadcast
110111

111-
# Now that the reception thread is stopped, immediately stop the
112-
# callback thread pool. All queued callbacks will see the stream
113-
# is stopped and perform a noop.
114-
@callback_thread_pool.shutdown
115-
116-
# Once all the callbacks are stopped, we can stop the inventory.
117-
@inventory.stop
112+
if shutdown_behavior == :nack_immediately
113+
nack_unprocessed_messages!
114+
@fully_stopped = true
115+
@callback_thread_pool.shutdown
116+
@inventory.stop
117+
else
118+
# :wait_for_processing
119+
@shutdown_thread = Thread.new do
120+
if shutdown_timeout
121+
wait_time = [shutdown_timeout - 30, 0].max
122+
@inventory.wait_until_empty wait_time
123+
if !@inventory.empty?
124+
nack_unprocessed_messages!
125+
end
126+
else
127+
@inventory.wait_until_empty nil
128+
end
129+
synchronize do
130+
@fully_stopped = true
131+
@callback_thread_pool.shutdown
132+
@inventory.stop
133+
end
134+
end
135+
end
118136
end
119137

120138
self
121139
end
122140

141+
def nack_unprocessed_messages!
142+
synchronize do
143+
ack_ids = @inventory.ack_ids
144+
unless ack_ids.empty?
145+
begin
146+
subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0
147+
rescue StandardError => e
148+
subscriber.service.logger.log :error, "subscriber-streams" do
149+
"Failed to nack unprocessed messages: #{e.message}"
150+
end
151+
end
152+
@inventory.remove(*ack_ids)
153+
end
154+
end
155+
end
156+
123157
def stopped?
124-
synchronize { @stopped }
158+
synchronize { @streaming_stopped }
159+
end
160+
161+
def fully_stopped?
162+
synchronize { @fully_stopped }
125163
end
126164

127165
def paused?
@@ -133,6 +171,7 @@ def running?
133171
end
134172

135173
def wait! timeout = nil
174+
@shutdown_thread&.join timeout
136175
# Wait for all queued callbacks to be processed.
137176
@callback_thread_pool.wait_for_termination timeout
138177

@@ -222,15 +261,15 @@ class RestartStream < StandardError; end
222261
def background_run
223262
synchronize do
224263
# Don't allow a stream to restart if already stopped
225-
if @stopped
264+
if @streaming_stopped
226265
subscriber.service.logger.log :debug, "subscriber-streams" do
227266
"not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \
228267
" stopped"
229268
end
230269
return
231270
end
232271

233-
@stopped = false
272+
@streaming_stopped = false
234273
@paused = false
235274

236275
# signal to the previous queue to shut down
@@ -252,14 +291,14 @@ def background_run
252291

253292
loop do
254293
synchronize do
255-
if @paused && !@stopped
294+
if @paused && !@streaming_stopped
256295
@pause_cond.wait
257296
next
258297
end
259298
end
260299

261300
# Break loop, close thread if stopped
262-
break if synchronize { @stopped }
301+
break if synchronize { @streaming_stopped }
263302

264303
begin
265304
# Cannot synchronize the enumerator, causes deadlock
@@ -297,7 +336,7 @@ def background_run
297336

298337
# Has the loop broken but we aren't stopped?
299338
# Could be GRPC has thrown an internal error, so restart.
300-
raise RestartStream unless synchronize { @stopped }
339+
raise RestartStream unless synchronize { @streaming_stopped }
301340

302341
# We must be stopped, tell the stream to quit.
303342
stop
@@ -372,7 +411,7 @@ def perform_callback_sync rec_msg
372411
subscriber.service.logger.log :info, "callback-delivery" do
373412
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks"
374413
end
375-
@subscriber.callback.call rec_msg unless stopped?
414+
@subscriber.callback.call rec_msg unless fully_stopped?
376415
rescue StandardError => e
377416
subscriber.service.logger.log :info, "callback-exceptions" do
378417
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \

google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ def wait_for_messages max: 100
325325
# messages ({ReceivedMessage#nack!},
326326
# {ReceivedMessage#modify_ack_deadline!}). Default is 4.
327327
#
328+
# @param [Symbol] shutdown_behavior Defines how active messages are treated during a stop. Use `:wait_for_processing`
329+
# to wait for tasks to finish, or `:nack_immediately` to skip waiting and drop them. Default is `:wait_for_processing`.
330+
# @param [Integer] shutdown_timeout Specifies precisely how long the wind-down state holds. Defaults to Nil. Optional.
331+
#
328332
# @yield [received_message] a block for processing new messages
329333
# @yieldparam [ReceivedMessage] received_message the newly received
330334
# message
@@ -408,13 +412,16 @@ def wait_for_messages max: 100
408412
# # Shut down the subscriber when ready to stop receiving messages.
409413
# listener.stop!
410414
#
411-
def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block
415+
def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {},
416+
shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, &block
412417
ensure_service!
413418
deadline ||= self.deadline
414419
message_ordering = message_ordering? if message_ordering.nil?
415420

416421
MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory,
417-
message_ordering: message_ordering, threads: threads, service: service
422+
message_ordering: message_ordering, threads: threads,
423+
shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout,
424+
service: service
418425
end
419426

420427
##

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,55 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
217217
listener.stop
218218
listener.wait!
219219
end
220+
221+
it "should nack unprocessed messages when stopped with nack_immediately" do
222+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
223+
response_groups = [[pull_res1]]
224+
225+
stub = StreamingPullStub.new response_groups
226+
227+
subscriber.service.mocked_subscription_admin = stub
228+
229+
listener = subscriber.listen streams: 1 do |msg|
230+
sleep 0.5
231+
end
232+
233+
listener.start
234+
235+
# Wait for message to be pulled and added to inventory
236+
sleep 0.1
237+
238+
listener.stop shutdown_behavior: :nack_immediately
239+
listener.wait!
240+
241+
# Verifies that exactly one 0-second ModifyAckDeadline (NACK) was dispatched.
242+
assert_equal 1, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 }
243+
end
244+
245+
it "should wait for processing when stopped with wait_for_processing" do
246+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
247+
response_groups = [[pull_res1]]
248+
249+
stub = StreamingPullStub.new response_groups
250+
called = false
251+
252+
subscriber.service.mocked_subscription_admin = stub
253+
254+
listener = subscriber.listen streams: 1 do |msg|
255+
sleep 0.5
256+
called = true
257+
end
258+
259+
listener.start
260+
261+
# Wait for message to be pulled and added to inventory
262+
sleep 0.1
263+
264+
listener.stop shutdown_behavior: :wait_for_processing
265+
listener.wait!
266+
267+
assert called
268+
# Confirms that NO 0-second ModifyAckDeadline (NACK) interventions were sent.
269+
assert_equal 0, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 }
270+
end
220271
end

google-cloud-pubsub/test/helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
gem "minitest"
1818
require "minitest/autorun"
1919
require "minitest/focus"
20+
require "minitest/mock"
2021
require "minitest/rg"
2122
require "ostruct"
2223
require "json"

0 commit comments

Comments
 (0)