1717
1818
1919# This example uses Azure IMDS for credential-less authentication
20- # to Kafka on Confluent Cloud
20+ # to Schema Registry on Confluent Cloud
2121
2222import argparse
2323import logging
2424
2525from confluent_kafka import Producer
26- from confluent_kafka .serialization import StringSerializer
26+ from confluent_kafka .schema_registry import SchemaRegistryClient
27+ from confluent_kafka .schema_registry .json_schema import JSONSerializer
28+ from confluent_kafka .serialization import MessageField , SerializationContext , StringSerializer
29+
30+
31+ class User (object ):
32+ """
33+ User record
34+
35+ Args:
36+ name (str): User's name
37+
38+ favorite_number (int): User's favorite number
39+
40+ favorite_color (str): User's favorite color
41+
42+ address(str): User's address; confidential
43+ """
44+
45+ def __init__ (self , name , address , favorite_number , favorite_color ):
46+ self .name = name
47+ self .favorite_number = favorite_number
48+ self .favorite_color = favorite_color
49+ # address should not be serialized, see user_to_dict()
50+ self ._address = address
51+
52+
53+ def user_to_dict (user , ctx ):
54+ """
55+ Returns a dict representation of a User instance for serialization.
56+
57+ Args:
58+ user (User): User instance.
59+
60+ ctx (SerializationContext): Metadata pertaining to the serialization
61+ operation.
62+
63+ Returns:
64+ dict: Dict populated with user attributes to be serialized.
65+ """
66+
67+ # User._address must not be serialized; omit from dict
68+ return dict (name = user .name , favorite_number = user .favorite_number , favorite_color = user .favorite_color )
2769
2870
2971def producer_config (args ):
@@ -38,7 +80,7 @@ def producer_config(args):
3880 'sasl.oauthbearer.config' : f'query={ args .query } ' ,
3981 }
4082 # These two parameters are only applicable when producing to
41- # Confluent Cloud where some sasl extensions are required.
83+ # confluent cloud where some sasl extensions are required.
4284 if args .logical_cluster and args .identity_pool_id :
4385 params ['sasl.oauthbearer.extensions' ] = (
4486 'logicalCluster=' + args .logical_cluster + ',identityPoolId=' + args .identity_pool_id
@@ -47,12 +89,27 @@ def producer_config(args):
4789 return params
4890
4991
92+ def schema_registry_config (args ):
93+ params = {
94+ 'url' : args .schema_registry ,
95+ 'bearer.auth.credentials.source' : 'OAUTHBEARER_AZURE_IMDS' ,
96+ 'bearer.auth.issuer.endpoint.query' : args .query ,
97+ }
98+ # These two parameters are only applicable when producing to
99+ # confluent cloud where some sasl extensions are required.
100+ if args .logical_schema_registry_cluster and args .identity_pool_id :
101+ params ['bearer.auth.logical.cluster' ] = args .logical_schema_registry_cluster
102+ params ['bearer.auth.identity.pool.id' ] = args .identity_pool_id
103+
104+ return params
105+
106+
50107def delivery_report (err , msg ):
51108 """
52109 Reports the failure or success of a message delivery.
53110
54111 Args:
55- err (KafkaError): The error that occurred, or None on success.
112+ err (KafkaError): The error that occurred on None on success.
56113
57114 msg (Message): The message that was produced or failed.
58115
@@ -80,15 +137,45 @@ def main(args):
80137 producer_conf = producer_config (args )
81138 producer = Producer (producer_conf )
82139 string_serializer = StringSerializer ('utf_8' )
140+ schema_str = """
141+ {
142+ "$schema": "http://json-schema.org/draft-07/schema#",
143+ "title": "User",
144+ "description": "A Confluent Kafka Python User",
145+ "type": "object",
146+ "properties": {
147+ "name": {
148+ "description": "User's name",
149+ "type": "string"
150+ },
151+ "favorite_number": {
152+ "description": "User's favorite number",
153+ "type": "number",
154+ "exclusiveMinimum": 0
155+ },
156+ "favorite_color": {
157+ "description": "User's favorite color",
158+ "type": "string"
159+ }
160+ },
161+ "required": [ "name", "favorite_number", "favorite_color" ]
162+ }
163+ """
164+ schema_registry_conf = schema_registry_config (args )
165+ schema_registry_client = SchemaRegistryClient (schema_registry_conf )
166+
167+ json_serializer = JSONSerializer (schema_str , schema_registry_client , user_to_dict )
83168
84169 print ('Producing records to topic {}. ^C to exit.' .format (topic ))
85170 while True :
86171 # Serve on_delivery callbacks from previous calls to produce()
87172 producer .poll (0.0 )
88173 try :
89174 name = input (">" )
175+ user = User (name = name , address = "NA" , favorite_color = "blue" , favorite_number = 7 )
176+ serialized_user = json_serializer (user , SerializationContext (topic , MessageField .VALUE ))
90177 producer .produce (
91- topic = topic , key = string_serializer (name ), value = string_serializer ( name ) , on_delivery = delivery_report
178+ topic = topic , key = string_serializer (name ), value = serialized_user , on_delivery = delivery_report
92179 )
93180 except KeyboardInterrupt :
94181 break
@@ -98,11 +185,18 @@ def main(args):
98185
99186
100187if __name__ == '__main__' :
101- parser = argparse .ArgumentParser (description = "OAuth/OIDC example using Azure IMDS metadata-based authentication " )
188+ parser = argparse .ArgumentParser (description = "OAUTH example with client credentials grant " )
102189 parser .add_argument ('-b' , dest = "bootstrap_servers" , required = True , help = "Bootstrap broker(s) (host[:port])" )
103190 parser .add_argument ('-t' , dest = "topic" , default = "example_producer_oauth" , help = "Topic name" )
191+ parser .add_argument ('-s' , dest = "schema_registry" , required = True , help = "Schema Registry (http(s)://host[:port]" )
104192 parser .add_argument ('--query' , dest = "query" , required = True , help = "Query parameters for Azure IMDS token endpoint" )
105193 parser .add_argument ('--logical-cluster' , dest = "logical_cluster" , required = False , help = "Logical Cluster." )
194+ parser .add_argument (
195+ '--logical-schema-registry-cluster' ,
196+ dest = "logical_schema_registry_cluster" ,
197+ required = False ,
198+ help = "Logical Schema Registry Cluster." ,
199+ )
106200 parser .add_argument ('--identity-pool-id' , dest = "identity_pool_id" , required = False , help = "Identity Pool ID." )
107201
108202 main (parser .parse_args ())
0 commit comments