@@ -64,21 +64,20 @@ def __init__(
6464 key_schema : str | None = None ,
6565 key_output_serializer : Any | None = None ,
6666 ):
67- # Validate schema requirements for value
68- if value_schema_type in ["AVRO" , "PROTOBUF" ] and value_schema is None :
69- raise KafkaConsumerMissingSchemaError (
70- f"value_schema must be provided when value_schema_type is { value_schema_type } " ,
71- )
72-
73- # Validate schema requirements for key
74- if key_schema_type in ["AVRO" , "PROTOBUF" ] and key_schema is None :
75- raise KafkaConsumerMissingSchemaError (
76- f"value_schema must be provided when key_schema_type is { key_schema_type } " ,
77- )
67+ # Validate schema requirements
68+ self ._validate_schema_requirements (value_schema_type , value_schema , "value" )
69+ self ._validate_schema_requirements (key_schema_type , key_schema , "key" )
7870
7971 self .value_schema_type = value_schema_type
8072 self .value_schema = value_schema
8173 self .value_output_serializer = value_output_serializer
8274 self .key_schema_type = key_schema_type
8375 self .key_schema = key_schema
8476 self .key_output_serializer = key_output_serializer
77+
78+ def _validate_schema_requirements (self , schema_type : str | None , schema : str | None , prefix : str ) -> None :
79+ """Validate that schema is provided when required by schema_type."""
80+ if schema_type in ["AVRO" , "PROTOBUF" ] and schema is None :
81+ raise KafkaConsumerMissingSchemaError (
82+ f"{ prefix } _schema must be provided when { prefix } _schema_type is { schema_type } " ,
83+ )
0 commit comments