Skip to content

Commit e53a9bd

Browse files
feat(pubsub): implement streaming keep-alive logic
1 parent eeed499 commit e53a9bd

7 files changed

Lines changed: 311 additions & 16 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class Stream
4848
##
4949
# @private exactly_once_delivery_enabled.
5050
attr_reader :exactly_once_delivery_enabled
51+
attr_accessor :keepalive_interval, :pong_deadline
5152

5253
##
5354
# @private Create an empty Subscriber::Stream object.
@@ -68,24 +69,58 @@ def initialize subscriber
6869

6970
@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads
7071

72+
@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)
73+
@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
74+
@last_ping_at = nil
75+
@last_pong_at = nil
76+
@stream_opened = false
77+
@reconnect_delay = nil
78+
7179
@stream_keepalive_task = Concurrent::TimerTask.new(
72-
execution_interval: 30
80+
execution_interval: @keepalive_interval
7381
) do
74-
# push empty request every 30 seconds to keep stream alive
75-
unless inventory.empty?
76-
subscriber.service.logger.log :info, "subscriber-streams" do
77-
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
82+
synchronize do
83+
# @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC.
84+
# Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream.
85+
# Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings.
86+
if @stream_opened && !@stopped && @request_queue
87+
subscriber.service.logger.log :info, "subscriber-streams" do
88+
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
89+
end
90+
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
91+
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
7892
end
79-
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
8093
end
81-
end.execute
94+
end
95+
96+
@pong_monitor_task = Concurrent::TimerTask.new(
97+
execution_interval: [@keepalive_interval / 5.0, 0.01].max
98+
) do
99+
synchronize do
100+
# Do not check pong deadline if @paused (client flow control inventory full).
101+
# When @paused, background_run waits on condition variable and stops calling enum.next,
102+
# so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated.
103+
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
104+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
105+
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
106+
subscriber.service.logger.log :error, "subscriber-streams" do
107+
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
108+
end
109+
@stream_opened = false
110+
@background_thread&.raise RestartStream
111+
end
112+
end
113+
end
114+
end
82115
end
83116

84117
def start
85118
synchronize do
86119
break if @background_thread
87120

88121
@inventory.start
122+
@stream_keepalive_task.execute
123+
@pong_monitor_task.execute
89124

90125
start_streaming!
91126
end
@@ -108,6 +143,9 @@ def stop
108143
@stopped = true
109144
@pause_cond.broadcast
110145

146+
@stream_keepalive_task.shutdown
147+
@pong_monitor_task.shutdown
148+
111149
# Now that the reception thread is stopped, immediately stop the
112150
# callback thread pool. All queued callbacks will see the stream
113151
# is stopped and perform a noop.
@@ -219,6 +257,13 @@ class RestartStream < StandardError; end
219257

220258
# rubocop:disable all
221259

260+
def backoff_and_wait!
261+
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
262+
synchronize do
263+
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
264+
end
265+
end
266+
222267
def background_run
223268
synchronize do
224269
# Don't allow a stream to restart if already stopped
@@ -245,11 +290,21 @@ def background_run
245290

246291
# Call the StreamingPull API to get the response enumerator
247292
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
293+
synchronize do
294+
@stream_opened = false
295+
end
248296
enum = @subscriber.service.streaming_pull @request_queue.each, options
249297
subscriber.service.logger.log :info, "subscriber-streams" do
250298
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
251299
end
252300

301+
synchronize do
302+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
303+
@last_ping_at = now
304+
@last_pong_at = now
305+
@stream_opened = true
306+
end
307+
253308
loop do
254309
synchronize do
255310
if @paused && !@stopped
@@ -264,8 +319,17 @@ def background_run
264319
begin
265320
# Cannot synchronize the enumerator, causes deadlock
266321
response = enum.next
267-
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
322+
synchronize do
323+
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
324+
# Reset backoff delay only after successfully reading a frame from enum.next.
325+
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
326+
@reconnect_delay = nil
327+
end
268328
received_messages = response.received_messages
329+
# Skip processing properties and inventory on Pong frames (empty received_messages).
330+
# Subscription properties on keep-alive Pongs are not valid.
331+
next if received_messages.empty?
332+
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
269333

270334
# Use synchronize so changes happen atomically
271335
synchronize do
@@ -310,18 +374,21 @@ def background_run
310374
"#{status_code}; will be retried."
311375
end
312376
# Restart the stream with an incremental back for a retriable error.
377+
backoff_and_wait!
313378
retry
314379
rescue RestartStream
315380
subscriber.service.logger.log :info, "subscriber-streams" do
316381
"Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried."
317382
end
383+
backoff_and_wait!
318384
retry
319385
rescue StandardError => e
320386
subscriber.service.logger.log :error, "subscriber-streams" do
321387
"error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}"
322388
end
323389
@subscriber.error! e
324390

391+
backoff_and_wait!
325392
retry
326393
end
327394

@@ -443,6 +510,7 @@ def initial_input_request
443510
req.client_id = @subscriber.service.client_id
444511
req.max_outstanding_messages = @inventory.limit
445512
req.max_outstanding_bytes = @inventory.bytesize
513+
req.protocol_version = 1
446514
end
447515
end
448516

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
subscription: sub_path,
7272
stream_ack_deadline_seconds: 60,
7373
max_outstanding_messages: 1000,
74-
max_outstanding_bytes: 100 * 1000 * 1000
74+
max_outstanding_bytes: 100 * 1000 * 1000,
75+
protocol_version: 1
7576
)]
7677
]
7778

@@ -132,7 +133,8 @@
132133
subscription: sub_path,
133134
stream_ack_deadline_seconds: 60,
134135
max_outstanding_messages: 1000,
135-
max_outstanding_bytes: 100 * 1000 * 1000
136+
max_outstanding_bytes: 100 * 1000 * 1000,
137+
protocol_version: 1
136138
)]
137139
]
138140

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "helper"
16+
17+
describe Google::Cloud::PubSub::MessageListener, :bug_regression, :mock_pubsub do
18+
let(:topic_name) { "topic-name-goes-here" }
19+
let(:sub_name) { "subscription-name-goes-here" }
20+
let(:sub_hash) { subscription_hash topic_name, sub_name }
21+
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
22+
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }
23+
24+
it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do
25+
attempts = []
26+
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
27+
response_groups = [[GRPC::Unavailable.new("simulated disconnect")], [pull_res]]
28+
stub = StreamingPullStub.new response_groups
29+
def stub.streaming_pull_internal request, options = nil
30+
@attempts ||= []
31+
@attempts << Process.clock_gettime(Process::CLOCK_MONOTONIC)
32+
super
33+
end
34+
stub.instance_variable_set(:@attempts, attempts)
35+
subscriber.service.mocked_subscription_admin = stub
36+
37+
listener = subscriber.listen streams: 1 do |msg|
38+
end
39+
listener.start
40+
41+
retries = 0
42+
until attempts.count >= 2
43+
fail "stream did not retry" if retries > 200
44+
retries += 1
45+
sleep 0.05
46+
end
47+
48+
listener.stop
49+
listener.wait!
50+
51+
elapsed = attempts[1] - attempts[0]
52+
puts "\n[b/528401453 Test] Elapsed delay between attempts: #{elapsed.round(3)}s"
53+
_(elapsed).must_be :>=, 1.0
54+
end
55+
56+
it "b/528404815: shuts down keepalive TimerTask when stream is stopped" do
57+
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
58+
stub = StreamingPullStub.new [[pull_res]]
59+
subscriber.service.mocked_subscription_admin = stub
60+
61+
listener = subscriber.listen streams: 1 do |msg|
62+
end
63+
listener.start
64+
sleep 0.1
65+
listener.stop
66+
listener.wait!
67+
68+
stream = listener.instance_variable_get(:@stream_pool).first
69+
keepalive_task = stream.instance_variable_get(:@stream_keepalive_task)
70+
puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task.running?}"
71+
_(keepalive_task.running?).must_equal false
72+
end
73+
end

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676
subscription: sub_path,
7777
stream_ack_deadline_seconds: 60,
7878
max_outstanding_messages: 1000,
79-
max_outstanding_bytes: 100 * 1000 * 1000
79+
max_outstanding_bytes: 100 * 1000 * 1000,
80+
protocol_version: 1
8081
)]
8182
]
8283

@@ -141,7 +142,8 @@
141142
subscription: sub_path,
142143
stream_ack_deadline_seconds: 60,
143144
max_outstanding_messages: 1000,
144-
max_outstanding_bytes: 100 * 1000 * 1000
145+
max_outstanding_bytes: 100 * 1000 * 1000,
146+
protocol_version: 1
145147
)]
146148
]
147149

0 commit comments

Comments
 (0)