Skip to content

Commit ad59cd2

Browse files
feat(pubsub): implement streaming keep-alive logic
1 parent eeed499 commit ad59cd2

6 files changed

Lines changed: 231 additions & 15 deletions

File tree

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class Stream
4848
##
4949
# @private exactly_once_delivery_enabled.
5050
attr_reader :exactly_once_delivery_enabled
51+
attr_accessor :keepalive_interval, :pong_deadline
5152

5253
##
5354
# @private Create an empty Subscriber::Stream object.
@@ -68,24 +69,55 @@ def initialize subscriber
6869

6970
@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads
7071

72+
@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)
73+
@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
74+
@last_ping_at = nil
75+
@last_pong_at = nil
76+
@stream_opened = false
77+
@reconnect_delay = nil
78+
7179
@stream_keepalive_task = Concurrent::TimerTask.new(
72-
execution_interval: 30
80+
execution_interval: @keepalive_interval
81+
) do
82+
synchronize do
83+
if @stream_opened && !@stopped && @request_queue
84+
subscriber.service.logger.log :info, "subscriber-streams" do
85+
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
86+
end
87+
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
88+
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
89+
end
90+
end
91+
end
92+
93+
@pong_monitor_task = Concurrent::TimerTask.new(
94+
execution_interval: [@keepalive_interval / 5.0, 0.01].max
7395
) do
74-
# push empty request every 30 seconds to keep stream alive
75-
unless inventory.empty?
76-
subscriber.service.logger.log :info, "subscriber-streams" do
77-
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
96+
synchronize do
97+
# Do not check pong deadline if @paused (client flow control inventory full).
98+
# When @paused, background_run waits on condition variable and stops calling enum.next,
99+
# so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated.
100+
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
101+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
102+
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
103+
subscriber.service.logger.log :error, "subscriber-streams" do
104+
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
105+
end
106+
@stream_opened = false
107+
@background_thread&.raise RestartStream
108+
end
78109
end
79-
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
80110
end
81-
end.execute
111+
end
82112
end
83113

84114
def start
85115
synchronize do
86116
break if @background_thread
87117

88118
@inventory.start
119+
@stream_keepalive_task.execute
120+
@pong_monitor_task.execute
89121

90122
start_streaming!
91123
end
@@ -108,6 +140,9 @@ def stop
108140
@stopped = true
109141
@pause_cond.broadcast
110142

143+
@stream_keepalive_task.shutdown
144+
@pong_monitor_task.shutdown
145+
111146
# Now that the reception thread is stopped, immediately stop the
112147
# callback thread pool. All queued callbacks will see the stream
113148
# is stopped and perform a noop.
@@ -219,6 +254,13 @@ class RestartStream < StandardError; end
219254

220255
# rubocop:disable all
221256

257+
def backoff_and_wait!
258+
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
259+
synchronize do
260+
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
261+
end
262+
end
263+
222264
def background_run
223265
synchronize do
224266
# Don't allow a stream to restart if already stopped
@@ -245,11 +287,21 @@ def background_run
245287

246288
# Call the StreamingPull API to get the response enumerator
247289
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
290+
synchronize do
291+
@stream_opened = false
292+
end
248293
enum = @subscriber.service.streaming_pull @request_queue.each, options
249294
subscriber.service.logger.log :info, "subscriber-streams" do
250295
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
251296
end
252297

298+
synchronize do
299+
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
300+
@last_ping_at = now
301+
@last_pong_at = now
302+
@stream_opened = true
303+
end
304+
253305
loop do
254306
synchronize do
255307
if @paused && !@stopped
@@ -264,6 +316,12 @@ def background_run
264316
begin
265317
# Cannot synchronize the enumerator, causes deadlock
266318
response = enum.next
319+
synchronize do
320+
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
321+
# Reset backoff delay only after successfully reading a frame from enum.next.
322+
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
323+
@reconnect_delay = nil
324+
end
267325
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
268326
received_messages = response.received_messages
269327

@@ -310,18 +368,21 @@ def background_run
310368
"#{status_code}; will be retried."
311369
end
312370
# Restart the stream with an incremental back for a retriable error.
371+
backoff_and_wait!
313372
retry
314373
rescue RestartStream
315374
subscriber.service.logger.log :info, "subscriber-streams" do
316375
"Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried."
317376
end
377+
backoff_and_wait!
318378
retry
319379
rescue StandardError => e
320380
subscriber.service.logger.log :error, "subscriber-streams" do
321381
"error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}"
322382
end
323383
@subscriber.error! e
324384

385+
backoff_and_wait!
325386
retry
326387
end
327388

@@ -443,6 +504,7 @@ def initial_input_request
443504
req.client_id = @subscriber.service.client_id
444505
req.max_outstanding_messages = @inventory.limit
445506
req.max_outstanding_bytes = @inventory.bytesize
507+
req.protocol_version = 1
446508
end
447509
end
448510

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
subscription: sub_path,
7272
stream_ack_deadline_seconds: 60,
7373
max_outstanding_messages: 1000,
74-
max_outstanding_bytes: 100 * 1000 * 1000
74+
max_outstanding_bytes: 100 * 1000 * 1000,
75+
protocol_version: 1
7576
)]
7677
]
7778

@@ -132,7 +133,8 @@
132133
subscription: sub_path,
133134
stream_ack_deadline_seconds: 60,
134135
max_outstanding_messages: 1000,
135-
max_outstanding_bytes: 100 * 1000 * 1000
136+
max_outstanding_bytes: 100 * 1000 * 1000,
137+
protocol_version: 1
136138
)]
137139
]
138140

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676
subscription: sub_path,
7777
stream_ack_deadline_seconds: 60,
7878
max_outstanding_messages: 1000,
79-
max_outstanding_bytes: 100 * 1000 * 1000
79+
max_outstanding_bytes: 100 * 1000 * 1000,
80+
protocol_version: 1
8081
)]
8182
]
8283

@@ -141,7 +142,8 @@
141142
subscription: sub_path,
142143
stream_ack_deadline_seconds: 60,
143144
max_outstanding_messages: 1000,
144-
max_outstanding_bytes: 100 * 1000 * 1000
145+
max_outstanding_bytes: 100 * 1000 * 1000,
146+
protocol_version: 1
145147
)]
146148
]
147149

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "helper"
16+
17+
describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do
18+
let(:topic_name) { "topic-name-goes-here" }
19+
let(:sub_name) { "subscription-name-goes-here" }
20+
let(:sub_hash) { subscription_hash topic_name, sub_name }
21+
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
22+
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }
23+
let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
24+
rec_message_hash("rec_message1-msg-goes-here", 1111) }
25+
26+
before do
27+
ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05"
28+
ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05"
29+
end
30+
31+
after do
32+
ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL"
33+
ENV.delete "PUBSUB_TEST_PONG_DEADLINE"
34+
end
35+
36+
it "sends protocol_version = 1 in initial streaming pull request" do
37+
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
38+
stub = StreamingPullStub.new [[pull_res1]]
39+
subscriber.service.mocked_subscription_admin = stub
40+
41+
called = false
42+
listener = subscriber.listen streams: 1 do |msg|
43+
called = true
44+
end
45+
listener.start
46+
47+
listener_retries = 0
48+
until called
49+
fail "callback was not called" if listener_retries > 100
50+
listener_retries += 1
51+
sleep 0.01
52+
end
53+
54+
listener.stop
55+
listener.wait!
56+
57+
initial_req = stub.requests.first.to_a.first
58+
_(initial_req.protocol_version).must_equal 1
59+
end
60+
61+
it "sends keep-alive pings periodically even when inventory is empty" do
62+
q = StreamingPullStub::RaisableEnumeratorQueue.new
63+
stub = StreamingPullStub.new [[]]
64+
def stub.streaming_pull_internal req, opt = nil
65+
@requests << req
66+
@my_q.each
67+
end
68+
stub.instance_variable_set(:@my_q, q)
69+
subscriber.service.mocked_subscription_admin = stub
70+
71+
listener = subscriber.listen streams: 1 do |msg|
72+
end
73+
listener.start
74+
75+
pong_thread = Thread.new do
76+
10.times do
77+
sleep 0.02
78+
q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: [])
79+
end
80+
end
81+
82+
sleep 0.18
83+
pong_thread.join
84+
85+
listener.stop
86+
listener.wait!
87+
88+
reqs = stub.requests.first.to_a
89+
_(reqs.count).must_be :>=, 2
90+
end
91+
92+
it "restarts stream when keep-alive pong deadline is exceeded" do
93+
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
94+
stub = StreamingPullStub.new [[], [pull_res2]]
95+
subscriber.service.mocked_subscription_admin = stub
96+
97+
called = false
98+
listener = subscriber.listen streams: 1 do |msg|
99+
called = true
100+
end
101+
listener.start
102+
103+
listener_retries = 0
104+
until called
105+
fail "stream did not restart and deliver message" if listener_retries > 200
106+
listener_retries += 1
107+
sleep 0.01
108+
end
109+
110+
listener.stop
111+
listener.wait!
112+
113+
_(stub.requests.count).must_equal 2
114+
end
115+
116+
it "does not restart stream when actively receiving keep-alive pongs" do
117+
q = StreamingPullStub::RaisableEnumeratorQueue.new
118+
stub = StreamingPullStub.new [[]]
119+
def stub.streaming_pull_internal req, opt = nil
120+
@requests << req
121+
@my_q.each
122+
end
123+
stub.instance_variable_set(:@my_q, q)
124+
subscriber.service.mocked_subscription_admin = stub
125+
126+
listener = subscriber.listen streams: 1 do |msg|
127+
end
128+
listener.start
129+
130+
pong_sender = Thread.new do
131+
8.times do
132+
sleep 0.02
133+
empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
134+
q.push empty_pong
135+
end
136+
end
137+
138+
sleep 0.15
139+
pong_sender.join
140+
141+
listener.stop
142+
listener.wait!
143+
144+
_(stub.requests.count).must_equal 1
145+
end
146+
end

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@
7070
subscription: sub_path,
7171
stream_ack_deadline_seconds: 60,
7272
max_outstanding_messages: 1000,
73-
max_outstanding_bytes: 100 * 1000 * 1000
73+
max_outstanding_bytes: 100 * 1000 * 1000,
74+
protocol_version: 1
7475
)]
7576
]
7677

@@ -126,7 +127,8 @@
126127
subscription: sub_path,
127128
stream_ack_deadline_seconds: 60,
128129
max_outstanding_messages: 1000,
129-
max_outstanding_bytes: 100 * 1000 * 1000
130+
max_outstanding_bytes: 100 * 1000 * 1000,
131+
protocol_version: 1
130132
)]
131133
]
132134

google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@
7070
subscription: sub_path,
7171
stream_ack_deadline_seconds: 60,
7272
max_outstanding_messages: 1000,
73-
max_outstanding_bytes: 100 * 1000 * 1000
73+
max_outstanding_bytes: 100 * 1000 * 1000,
74+
protocol_version: 1
7475
)]
7576
]
7677

@@ -126,7 +127,8 @@
126127
subscription: sub_path,
127128
stream_ack_deadline_seconds: 60,
128129
max_outstanding_messages: 1000,
129-
max_outstanding_bytes: 100 * 1000 * 1000
130+
max_outstanding_bytes: 100 * 1000 * 1000,
131+
protocol_version: 1
130132
)]
131133
]
132134

0 commit comments

Comments
 (0)