Skip to content

Commit a1530d7

Browse files
authored
chore(pubsub): Add sample for subscribe avro records with revisions (#33900)
Add sample for pubsub_subscribe_avro_records_with_revisions. The design is based on SubscribeWithAvroSchemaRevisionsExample.java: googleapis/java-pubsub#1469
1 parent df84fa9 commit a1530d7

3 files changed

Lines changed: 158 additions & 0 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"type":"record",
3+
"name":"State",
4+
"namespace":"utilities",
5+
"doc":"A list of states in the United States of America.",
6+
"fields":[
7+
{
8+
"name":"name",
9+
"type":"string",
10+
"doc":"The common name of the state."
11+
},
12+
{
13+
"name":"post_abbr",
14+
"type":"string",
15+
"doc":"The postal code abbreviation of the state."
16+
},
17+
{
18+
"name":"population",
19+
"type":"long",
20+
"default":0,
21+
"doc":"The population of the state."
22+
}
23+
]
24+
}

google-cloud-pubsub/samples/acceptance/schemas_test.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
require_relative "../pubsub_subscribe_avro_records"
2929
require_relative "../pubsub_publish_proto_messages"
3030
require_relative "../pubsub_subscribe_proto_messages"
31+
require_relative "../pubsub_subscribe_avro_records_with_revisions"
3132

3233

3334
describe "schemas" do
@@ -36,6 +37,7 @@
3637
let(:topic_id) { random_topic_id }
3738
let(:subscription_id) { random_subscription_id }
3839
let(:avsc_file) { File.expand_path "data/us-states.avsc", __dir__ }
40+
let(:avsc_revision_file) { File.expand_path "data/us-states-plus.avsc", __dir__ }
3941
let(:topic_admin) { pubsub.topic_admin }
4042
let(:subscription_admin) { pubsub.subscription_admin }
4143
let(:schemas) { pubsub.schemas }
@@ -216,6 +218,53 @@
216218

217219
assert_includes out, schema1.revision_id
218220
end
221+
222+
it "supports pubsub_subscribe_avro_records_with_revisions" do
223+
# Commit Rev B first (Rev A is already created in before block).
224+
schema_b = nil
225+
out, _err = capture_io do
226+
schema_b = commit_avro_schema schema_id: schema_id, avsc_file: avsc_revision_file
227+
end
228+
229+
rev_a_id = @schema.revision_id
230+
rev_b_id = schema_b.revision_id
231+
232+
# Create topic with schema range allowing both revisions.
233+
schema_settings = Google::Cloud::PubSub::V1::SchemaSettings.new schema: pubsub.schema_path(schema_id),
234+
encoding: :BINARY,
235+
first_revision_id: rev_a_id,
236+
last_revision_id: rev_b_id
237+
@topic = topic_admin.create_topic name: pubsub.topic_path(random_topic_id),
238+
schema_settings: schema_settings
239+
240+
@subscription = subscription_admin.create_subscription name: pubsub.subscription_path(random_subscription_id),
241+
topic: @topic.name,
242+
ack_deadline_seconds: 60
243+
244+
# Publish message 1 (Old format - valid for both).
245+
writer = Avro::IO::DatumWriter.new avro_schema
246+
buffer = StringIO.new
247+
writer.write record, Avro::IO::BinaryEncoder.new(buffer)
248+
publisher = pubsub.publisher @topic.name
249+
publisher.publish buffer
250+
251+
# Publish message 2 (New format - valid only for Rev B).
252+
avsc_definition_plus = File.read avsc_revision_file
253+
avro_schema_plus = Avro::Schema.parse avsc_definition_plus
254+
record_plus = { "name" => "California", "post_abbr" => "CA", "population" => 39000000 }
255+
256+
writer_plus = Avro::IO::DatumWriter.new avro_schema_plus
257+
buffer_plus = StringIO.new
258+
writer_plus.write record_plus, Avro::IO::BinaryEncoder.new(buffer_plus)
259+
publisher.publish buffer_plus
260+
261+
# Verify we can subscribe and decode both.
262+
expect_with_retry "pubsub_subscribe_avro_records_with_revisions" do
263+
assert_output /Received a binary-encoded message:.*Alaska.*Received a binary-encoded message:.*California/m do
264+
subscribe_avro_records_with_revisions subscription_id: @subscription.name
265+
end
266+
end
267+
end
219268
end
220269

221270
describe "PROTOCOL_BUFFER" do
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "google/cloud/pubsub"
16+
17+
def subscribe_avro_records_with_revisions subscription_id:
18+
# [START pubsub_subscribe_avro_records_with_revisions]
19+
# subscription_id = "your-subscription-id"
20+
21+
pubsub = Google::Cloud::PubSub.new
22+
subscriber = pubsub.subscriber subscription_id
23+
24+
# Cache for the parsed Avro schemas mapped by revision ID.
25+
schema_cache = {}
26+
cache_mutex = Mutex.new
27+
28+
listener = subscriber.listen do |received_message|
29+
schema_name = received_message.attributes["googclient_schemaname"]
30+
revision_id = received_message.attributes["googclient_schemarevisionid"]
31+
encoding = received_message.attributes["googclient_schemaencoding"]
32+
33+
# Prevent concurrent threads from racing to fetch and parse the same schema.
34+
avro_schema = cache_mutex.synchronize { schema_cache[revision_id] }
35+
36+
if avro_schema.nil?
37+
begin
38+
require "avro"
39+
# The resource name format is projects/{project}/schemas/{schema}@{revision}.
40+
schema_resource = pubsub.schemas.get_schema name: "#{schema_name}@#{revision_id}"
41+
42+
avro_schema = Avro::Schema.parse schema_resource.definition
43+
44+
cache_mutex.synchronize { schema_cache[revision_id] = avro_schema }
45+
rescue StandardError => e
46+
puts "Could not get schema for revision #{revision_id}: #{e.message}"
47+
received_message.reject!
48+
next
49+
end
50+
end
51+
52+
begin
53+
case encoding
54+
when "BINARY"
55+
require "avro"
56+
buffer = StringIO.new received_message.data
57+
decoder = Avro::IO::BinaryDecoder.new buffer
58+
reader = Avro::IO::DatumReader.new avro_schema
59+
message_data = reader.read decoder
60+
puts "Received a binary-encoded message:\n#{message_data}"
61+
when "JSON"
62+
require "json"
63+
message_data = JSON.parse received_message.data
64+
puts "Received a JSON-encoded message:\n#{message_data}"
65+
else
66+
puts "Unknown message encoding: #{encoding}. Rejecting message."
67+
received_message.reject!
68+
next
69+
end
70+
71+
received_message.acknowledge!
72+
rescue StandardError => e
73+
puts "Failed to process message: #{e.message}"
74+
received_message.reject!
75+
end
76+
end
77+
78+
listener.start
79+
80+
# Let the main thread sleep for 60 seconds so the thread for listening
81+
# messages does not quit.
82+
sleep 60
83+
listener.stop.wait!
84+
# [END pubsub_subscribe_avro_records_with_revisions]
85+
end

0 commit comments

Comments
 (0)