Skip to content

Commit ca925d3

Browse files
feat(pubsub): implement streaming keep-alive logic
1 parent eeed499 commit ca925d3

6 files changed

Lines changed: 238 additions & 16 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class Stream
4848
##
4949
# @private exactly_once_delivery_enabled.
5050
attr_reader :exactly_once_delivery_enabled
51+
attr_accessor :keepalive_interval, :pong_deadline
5152

5253
##
5354
# @private Create an empty Subscriber::Stream object.
@@ -68,24 +69,58 @@ def initialize subscriber
6869

6970
@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads
7071

72+
@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)
73+
@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
74+
@last_ping_at = nil
75+
@last_pong_at = nil
76+
@stream_opened = false
77+
@reconnect_delay = nil
78+
7179
@stream_keepalive_task = Concurrent::TimerTask.new(
72-
execution_interval: 30
80+
execution_interval: @keepalive_interval
7381
) do
74-
# push empty request every 30 seconds to keep stream alive
75-
unless inventory.empty?
76-
subscriber.service.logger.log :info, "subscriber-streams" do
77-
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
82+
synchronize do
83+
# @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC.
84+
# Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream.
85+
# Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings.
86+
if @stream_opened && !@stopped && @request_queue
87+
subscriber.service.logger.log :info, "subscriber-streams" do
88+
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
89+
end
90+
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
91+
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
7892
end
79-
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
8093
end
81-
end.execute
94+
end
95+
96+
@pong_monitor_task = Concurrent::TimerTask.new(
97+
execution_interval: [@keepalive_interval / 5.0, 0.01].max
98+
) do
99+
synchronize do
100+
# Do not check pong deadline if @paused (client flow control inventory full).
101+
# When @paused, background_run waits on condition variable and stops calling enum.next,
102+
# so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated.
103+
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
104+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
105+
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
106+
subscriber.service.logger.log :error, "subscriber-streams" do
107+
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
108+
end
109+
@stream_opened = false
110+
@background_thread&.raise RestartStream
111+
end
112+
end
113+
end
114+
end
82115
end
83116

84117
def start
85118
synchronize do
86119
break if @background_thread
87120

88121
@inventory.start
122+
@stream_keepalive_task.execute
123+
@pong_monitor_task.execute
89124

90125
start_streaming!
91126
end
@@ -108,6 +143,9 @@ def stop
108143
@stopped = true
109144
@pause_cond.broadcast
110145

146+
@stream_keepalive_task.shutdown
147+
@pong_monitor_task.shutdown
148+
111149
# Now that the reception thread is stopped, immediately stop the
112150
# callback thread pool. All queued callbacks will see the stream
113151
# is stopped and perform a noop.
@@ -219,6 +257,13 @@ class RestartStream < StandardError; end
219257

220258
# rubocop:disable all
221259

260+
def backoff_and_wait!
261+
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
262+
synchronize do
263+
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
264+
end
265+
end
266+
222267
def background_run
223268
synchronize do
224269
# Don't allow a stream to restart if already stopped
@@ -245,11 +290,21 @@ def background_run
245290

246291
# Call the StreamingPull API to get the response enumerator
247292
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
293+
synchronize do
294+
@stream_opened = false
295+
end
248296
enum = @subscriber.service.streaming_pull @request_queue.each, options
249297
subscriber.service.logger.log :info, "subscriber-streams" do
250298
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
251299
end
252300

301+
synchronize do
302+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
303+
@last_ping_at = now
304+
@last_pong_at = now
305+
@stream_opened = true
306+
end
307+
253308
loop do
254309
synchronize do
255310
if @paused && !@stopped
@@ -264,8 +319,17 @@ def background_run
264319
begin
265320
# Cannot synchronize the enumerator, causes deadlock
266321
response = enum.next
267-
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
322+
synchronize do
323+
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
324+
# Reset backoff delay only after successfully reading a frame from enum.next.
325+
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
326+
@reconnect_delay = nil
327+
end
268328
received_messages = response.received_messages
329+
# Skip processing properties and inventory on Pong frames (empty received_messages).
330+
# Subscription properties on keep-alive Pongs are not valid.
331+
next if received_messages.empty?
332+
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
269333

270334
# Use synchronize so changes happen atomically
271335
synchronize do
@@ -310,18 +374,21 @@ def background_run
310374
"#{status_code}; will be retried."
311375
end
312376
# Restart the stream with an incremental back for a retriable error.
377+
backoff_and_wait!
313378
retry
314379
rescue RestartStream
315380
subscriber.service.logger.log :info, "subscriber-streams" do
316381
"Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried."
317382
end
383+
backoff_and_wait!
318384
retry
319385
rescue StandardError => e
320386
subscriber.service.logger.log :error, "subscriber-streams" do
321387
"error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}"
322388
end
323389
@subscriber.error! e
324390

391+
backoff_and_wait!
325392
retry
326393
end
327394

@@ -443,6 +510,7 @@ def initial_input_request
443510
req.client_id = @subscriber.service.client_id
444511
req.max_outstanding_messages = @inventory.limit
445512
req.max_outstanding_bytes = @inventory.bytesize
513+
req.protocol_version = 1
446514
end
447515
end
448516

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
subscription: sub_path,
7272
stream_ack_deadline_seconds: 60,
7373
max_outstanding_messages: 1000,
74-
max_outstanding_bytes: 100 * 1000 * 1000
74+
max_outstanding_bytes: 100 * 1000 * 1000,
75+
protocol_version: 1
7576
)]
7677
]
7778

@@ -132,7 +133,8 @@
132133
subscription: sub_path,
133134
stream_ack_deadline_seconds: 60,
134135
max_outstanding_messages: 1000,
135-
max_outstanding_bytes: 100 * 1000 * 1000
136+
max_outstanding_bytes: 100 * 1000 * 1000,
137+
protocol_version: 1
136138
)]
137139
]
138140

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676
subscription: sub_path,
7777
stream_ack_deadline_seconds: 60,
7878
max_outstanding_messages: 1000,
79-
max_outstanding_bytes: 100 * 1000 * 1000
79+
max_outstanding_bytes: 100 * 1000 * 1000,
80+
protocol_version: 1
8081
)]
8182
]
8283

@@ -141,7 +142,8 @@
141142
subscription: sub_path,
142143
stream_ack_deadline_seconds: 60,
143144
max_outstanding_messages: 1000,
144-
max_outstanding_bytes: 100 * 1000 * 1000
145+
max_outstanding_bytes: 100 * 1000 * 1000,
146+
protocol_version: 1
145147
)]
146148
]
147149

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "helper"
16+
17+
describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do
18+
let(:topic_name) { "topic-name-goes-here" }
19+
let(:sub_name) { "subscription-name-goes-here" }
20+
let(:sub_hash) { subscription_hash topic_name, sub_name }
21+
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
22+
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }
23+
let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
24+
rec_message_hash("rec_message1-msg-goes-here", 1111) }
25+
26+
before do
27+
ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05"
28+
ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05"
29+
end
30+
31+
after do
32+
ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL"
33+
ENV.delete "PUBSUB_TEST_PONG_DEADLINE"
34+
end
35+
36+
it "sends protocol_version = 1 in initial streaming pull request" do
37+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
38+
stub = StreamingPullStub.new [[pull_res1]]
39+
subscriber.service.mocked_subscription_admin = stub
40+
41+
called = false
42+
listener = subscriber.listen streams: 1 do |msg|
43+
called = true
44+
end
45+
listener.start
46+
47+
listener_retries = 0
48+
until called
49+
fail "callback was not called" if listener_retries > 100
50+
listener_retries += 1
51+
sleep 0.01
52+
end
53+
54+
listener.stop
55+
listener.wait!
56+
57+
initial_req = stub.requests.first.to_a.first
58+
_(initial_req.protocol_version).must_equal 1
59+
end
60+
61+
it "sends keep-alive pings periodically even when inventory is empty" do
62+
q = StreamingPullStub::RaisableEnumeratorQueue.new
63+
stub = StreamingPullStub.new [[]]
64+
def stub.streaming_pull_internal req, opt = nil
65+
@requests << req
66+
@my_q.each
67+
end
68+
stub.instance_variable_set(:@my_q, q)
69+
subscriber.service.mocked_subscription_admin = stub
70+
71+
listener = subscriber.listen streams: 1 do |msg|
72+
end
73+
listener.start
74+
75+
pong_thread = Thread.new do
76+
10.times do
77+
sleep 0.02
78+
q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: [])
79+
end
80+
end
81+
82+
sleep 0.18
83+
pong_thread.join
84+
85+
listener.stop
86+
listener.wait!
87+
88+
reqs = stub.requests.first.to_a
89+
_(reqs.count).must_be :>=, 2
90+
end
91+
92+
it "restarts stream when keep-alive pong deadline is exceeded" do
93+
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
94+
stub = StreamingPullStub.new [[], [pull_res2]]
95+
subscriber.service.mocked_subscription_admin = stub
96+
97+
called = false
98+
listener = subscriber.listen streams: 1 do |msg|
99+
called = true
100+
end
101+
listener.start
102+
103+
listener_retries = 0
104+
until called
105+
fail "stream did not restart and deliver message" if listener_retries > 200
106+
listener_retries += 1
107+
sleep 0.01
108+
end
109+
110+
listener.stop
111+
listener.wait!
112+
113+
_(stub.requests.count).must_equal 2
114+
end
115+
116+
it "does not restart stream when actively receiving keep-alive pongs" do
117+
q = StreamingPullStub::RaisableEnumeratorQueue.new
118+
stub = StreamingPullStub.new [[]]
119+
def stub.streaming_pull_internal req, opt = nil
120+
@requests << req
121+
@my_q.each
122+
end
123+
stub.instance_variable_set(:@my_q, q)
124+
subscriber.service.mocked_subscription_admin = stub
125+
126+
listener = subscriber.listen streams: 1 do |msg|
127+
end
128+
listener.start
129+
130+
pong_sender = Thread.new do
131+
8.times do
132+
sleep 0.02
133+
empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
134+
q.push empty_pong
135+
end
136+
end
137+
138+
sleep 0.15
139+
pong_sender.join
140+
141+
listener.stop
142+
listener.wait!
143+
144+
_(stub.requests.count).must_equal 1
145+
end
146+
end

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@
7070
subscription: sub_path,
7171
stream_ack_deadline_seconds: 60,
7272
max_outstanding_messages: 1000,
73-
max_outstanding_bytes: 100 * 1000 * 1000
73+
max_outstanding_bytes: 100 * 1000 * 1000,
74+
protocol_version: 1
7475
)]
7576
]
7677

@@ -126,7 +127,8 @@
126127
subscription: sub_path,
127128
stream_ack_deadline_seconds: 60,
128129
max_outstanding_messages: 1000,
129-
max_outstanding_bytes: 100 * 1000 * 1000
130+
max_outstanding_bytes: 100 * 1000 * 1000,
131+
protocol_version: 1
130132
)]
131133
]
132134

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@
7070
subscription: sub_path,
7171
stream_ack_deadline_seconds: 60,
7272
max_outstanding_messages: 1000,
73-
max_outstanding_bytes: 100 * 1000 * 1000
73+
max_outstanding_bytes: 100 * 1000 * 1000,
74+
protocol_version: 1
7475
)]
7576
]
7677

@@ -126,7 +127,8 @@
126127
subscription: sub_path,
127128
stream_ack_deadline_seconds: 60,
128129
max_outstanding_messages: 1000,
129-
max_outstanding_bytes: 100 * 1000 * 1000
130+
max_outstanding_bytes: 100 * 1000 * 1000,
131+
protocol_version: 1
130132
)]
131133
]
132134

0 commit comments

Comments
 (0)