Skip to content

Commit 26bcbb1

Browse files
fix(pubsub): remediate keep-alive unpause race and timer testability
- Wrap unpause_streaming! in synchronize block and reset @last_pong_at to prevent false-positive stream restarts upon unblocking flow control. - Extract send_keepalive_ping! and check_liveness! helper methods on Stream and re-create TimerTask instances on start/stop to support clean restarts and deterministic unit testing. - Overhaul keepalive_test.rb with synchronous unit test coverage for timer helpers, unpause race condition, and TimerTask re-creation, while increasing CI polling timeouts to prevent flakiness.
1 parent 6834cee commit 26bcbb1

3 files changed

Lines changed: 142 additions & 90 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -76,50 +76,23 @@ def initialize subscriber
7676
@stream_opened = false
7777
@reconnect_delay = nil
7878

79-
@stream_keepalive_task = Concurrent::TimerTask.new(
80-
execution_interval: @keepalive_interval
81-
) do
82-
synchronize do
83-
# @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC.
84-
# Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream.
85-
# Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings.
86-
if @stream_opened && !@stopped && @request_queue
87-
subscriber.service.logger.log :info, "subscriber-streams" do
88-
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
89-
end
90-
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
91-
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
92-
end
93-
end
94-
end
95-
96-
@pong_monitor_task = Concurrent::TimerTask.new(
97-
execution_interval: [@keepalive_interval / 5.0, 0.01].max
98-
) do
99-
synchronize do
100-
# Do not check pong deadline if @paused (client flow control inventory full).
101-
# When @paused, background_run waits on condition variable and stops calling enum.next,
102-
# so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated.
103-
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
104-
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
105-
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
106-
subscriber.service.logger.log :error, "subscriber-streams" do
107-
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
108-
end
109-
@stream_opened = false
110-
@background_thread&.raise RestartStream
111-
end
112-
end
113-
end
114-
end
79+
@stream_keepalive_task = nil
80+
@pong_monitor_task = nil
11581
end
11682

11783
def start
11884
synchronize do
11985
break if @background_thread
12086

12187
@inventory.start
88+
@stream_keepalive_task = Concurrent::TimerTask.new(
89+
execution_interval: @keepalive_interval
90+
) { send_keepalive_ping! }
12291
@stream_keepalive_task.execute
92+
93+
@pong_monitor_task = Concurrent::TimerTask.new(
94+
execution_interval: [@keepalive_interval / 5.0, 0.01].max
95+
) { check_liveness! }
12396
@pong_monitor_task.execute
12497

12598
start_streaming!
@@ -143,8 +116,10 @@ def stop
143116
@stopped = true
144117
@pause_cond.broadcast
145118

146-
@stream_keepalive_task.shutdown
147-
@pong_monitor_task.shutdown
119+
@stream_keepalive_task&.shutdown
120+
@stream_keepalive_task = nil
121+
@pong_monitor_task&.shutdown
122+
@pong_monitor_task = nil
148123

149124
# Now that the reception thread is stopped, immediately stop the
150125
# callback thread pool. All queued callbacks will see the stream
@@ -483,15 +458,45 @@ def pause_streaming?
483458
@inventory.full?
484459
end
485460

461+
def send_keepalive_ping!
462+
synchronize do
463+
if @stream_opened && !@stopped && @request_queue
464+
subscriber.service.logger.log :info, "subscriber-streams" do
465+
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
466+
end
467+
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
468+
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
469+
end
470+
end
471+
end
472+
473+
def check_liveness!
474+
synchronize do
475+
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
476+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
477+
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
478+
subscriber.service.logger.log :error, "subscriber-streams" do
479+
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
480+
end
481+
@stream_opened = false
482+
@background_thread&.raise RestartStream
483+
end
484+
end
485+
end
486+
end
487+
486488
def unpause_streaming!
487-
return unless unpause_streaming?
489+
synchronize do
490+
return unless unpause_streaming?
488491

489-
@paused = nil
490-
subscriber.service.logger.log :info, "subscriber-flow-control" do
491-
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
492+
@paused = nil
493+
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
494+
subscriber.service.logger.log :info, "subscriber-flow-control" do
495+
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
496+
end
497+
# signal to the background thread that we are unpaused
498+
@pause_cond.broadcast
492499
end
493-
# signal to the background thread that we are unpaused
494-
@pause_cond.broadcast
495500
end
496501

497502
def unpause_streaming?

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def stub.streaming_pull_internal request, options = nil
6767

6868
stream = listener.instance_variable_get(:@stream_pool).first
6969
keepalive_task = stream.instance_variable_get(:@stream_keepalive_task)
70-
puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task.running?}"
71-
_(keepalive_task.running?).must_equal false
70+
puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task&.running? || false}"
71+
_(keepalive_task.nil? || !keepalive_task.running?).must_equal true
7272
end
7373
end

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb

Lines changed: 91 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -58,29 +58,40 @@
5858
_(initial_req.protocol_version).must_equal 1
5959
end
6060

61-
it "sends keep-alive pings periodically even when inventory is empty" do
62-
q = StreamingPullStub::RaisableEnumeratorQueue.new
63-
stub = StreamingPullStub.new [[]]
64-
def stub.streaming_pull_internal req, opt = nil
65-
@requests << req
66-
@my_q.each
67-
end
68-
stub.instance_variable_set(:@my_q, q)
61+
it "restarts stream when keep-alive pong deadline is exceeded" do
62+
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
63+
stub = StreamingPullStub.new [[], [pull_res2]]
6964
subscriber.service.mocked_subscription_admin = stub
7065

66+
called = false
7167
listener = subscriber.listen streams: 1 do |msg|
68+
called = true
7269
end
7370
listener.start
7471

75-
pong_thread = Thread.new do
76-
10.times do
77-
sleep 0.02
78-
q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: [])
79-
end
72+
listener_retries = 0
73+
until called
74+
fail "stream did not restart and deliver message" if listener_retries > 500
75+
listener_retries += 1
76+
sleep 0.01
8077
end
8178

82-
sleep 0.18
83-
pong_thread.join
79+
listener.stop
80+
listener.wait!
81+
82+
_(stub.requests.count).must_equal 2
83+
end
84+
85+
it "sends keep-alive ping synchronously when stream is open and queue exists" do
86+
stub = StreamingPullStub.new [[]]
87+
subscriber.service.mocked_subscription_admin = stub
88+
89+
listener = subscriber.listen streams: 1 do |msg|
90+
end
91+
listener.start
92+
93+
stream = listener.instance_variable_get(:@stream_pool).first
94+
stream.send(:send_keepalive_ping!)
8495

8596
listener.stop
8697
listener.wait!
@@ -89,58 +100,94 @@ def stub.streaming_pull_internal req, opt = nil
89100
_(reqs.count).must_be :>=, 2
90101
end
91102

92-
it "restarts stream when keep-alive pong deadline is exceeded" do
93-
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
94-
stub = StreamingPullStub.new [[], [pull_res2]]
103+
it "does not restart stream when check_liveness! runs under active pongs" do
104+
stub = StreamingPullStub.new [[]]
95105
subscriber.service.mocked_subscription_admin = stub
96106

97-
called = false
98107
listener = subscriber.listen streams: 1 do |msg|
99-
called = true
100108
end
101109
listener.start
102110

103-
listener_retries = 0
104-
until called
105-
fail "stream did not restart and deliver message" if listener_retries > 200
106-
listener_retries += 1
107-
sleep 0.01
108-
end
111+
stream = listener.instance_variable_get(:@stream_pool).first
112+
stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC)
113+
stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC)
114+
115+
stream.send(:check_liveness!)
116+
_(stream.instance_variable_get(:@stream_opened)).must_equal true
109117

110118
listener.stop
111119
listener.wait!
112-
113-
_(stub.requests.count).must_equal 2
114120
end
115121

116-
it "does not restart stream when actively receiving keep-alive pongs" do
117-
q = StreamingPullStub::RaisableEnumeratorQueue.new
122+
it "does not trigger false restarts on unpausing even if pause exceeded deadline" do
118123
stub = StreamingPullStub.new [[]]
119-
def stub.streaming_pull_internal req, opt = nil
120-
@requests << req
121-
@my_q.each
122-
end
123-
stub.instance_variable_set(:@my_q, q)
124124
subscriber.service.mocked_subscription_admin = stub
125125

126126
listener = subscriber.listen streams: 1 do |msg|
127127
end
128128
listener.start
129129

130-
pong_sender = Thread.new do
131-
8.times do
132-
sleep 0.02
133-
empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
134-
q.push empty_pong
135-
end
130+
stream = listener.instance_variable_get(:@stream_pool).first
131+
stream.instance_variable_set :@stream_opened, true
132+
stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 30.0
133+
stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 35.0
134+
stream.instance_variable_set :@paused, true
135+
stream.instance_variable_set :@stopped, false
136+
137+
# 1. When paused, liveness checker should skip verification.
138+
stream.send(:check_liveness!)
139+
_(stream.instance_variable_get(:@stream_opened)).must_equal true
140+
141+
# 2. Unpausing must reset @last_pong_at to prevent immediate restart.
142+
stream.send(:unpause_streaming!)
143+
144+
# 3. Direct liveness check immediately after unpausing (simulates the monitor thread racing the background thread).
145+
# It should NOT close the stream because our reset made @last_pong_at newer than @last_ping_at.
146+
stream.send(:check_liveness!)
147+
_(stream.instance_variable_get(:@stream_opened)).must_equal true
148+
149+
listener.stop
150+
listener.wait!
151+
end
152+
153+
it "re-creates and executes timer tasks if stopped and restarted" do
154+
stub = StreamingPullStub.new [[]]
155+
subscriber.service.mocked_subscription_admin = stub
156+
157+
listener = subscriber.listen streams: 1 do |msg|
136158
end
159+
listener.start
160+
161+
stream = listener.instance_variable_get(:@stream_pool).first
162+
163+
# 1. Timers should be active on a started stream
164+
refute_nil stream.instance_variable_get(:@stream_keepalive_task)
165+
refute_nil stream.instance_variable_get(:@pong_monitor_task)
137166

138-
sleep 0.15
139-
pong_sender.join
167+
first_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
168+
first_monitor = stream.instance_variable_get(:@pong_monitor_task)
140169

170+
# 2. Stopping the stream must shutdown and nilify the timers
141171
listener.stop
142172
listener.wait!
173+
assert_nil stream.instance_variable_get(:@stream_keepalive_task)
174+
assert_nil stream.instance_variable_get(:@pong_monitor_task)
175+
assert first_keepalive.shutdown?
176+
assert first_monitor.shutdown?
177+
178+
# 3. Simulating a restart should create completely new timer instances
179+
stream.instance_variable_set :@background_thread, nil
180+
stream.instance_variable_set :@stopped, false
181+
stream.start
182+
183+
new_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
184+
new_monitor = stream.instance_variable_get(:@pong_monitor_task)
185+
186+
refute_nil new_keepalive
187+
refute_nil new_monitor
188+
refute_equal first_keepalive, new_keepalive
189+
refute_equal first_monitor, new_monitor
143190

144-
_(stub.requests.count).must_equal 1
191+
stream.stop
145192
end
146193
end

0 commit comments

Comments
 (0)