Skip to content
45 changes: 44 additions & 1 deletion azure/functions/decorators/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
HttpMethod
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod, \
KafkaMessageKeyType
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
Expand Down Expand Up @@ -1244,12 +1245,18 @@ def kafka_trigger(self,
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
Comment thread
hallvictoria marked this conversation as resolved.
Outdated
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
Expand Down Expand Up @@ -1287,6 +1294,10 @@ def kafka_trigger(self,
Azure Event Hubs).
:param consumer_group: Kafka consumer group used by the trigger.
:param avro_schema: Used only if a generic Avro record should be generated.
:param key_avro_schema: Avro schema for the message key. Used only if a
generic Avro record should be generated for the key.
:param key_data_type: Data type of the message key. Valid values: Int, Long,
String, Binary. Default is String. Ignored if key_avro_schema is set.
Comment thread
hallvictoria marked this conversation as resolved.
:param username: SASL username for use with the PLAIN or SASL-SCRAM mechanisms.
Equivalent to 'sasl.username' in librdkafka. Default is empty string.
:param password: SASL password for use with the PLAIN or SASL-SCRAM mechanisms.
Expand Down Expand Up @@ -1338,12 +1349,19 @@ def decorator():
event_hub_connection_string=event_hub_connection_string, # noqa: E501
consumer_group=consumer_group,
avro_schema=avro_schema,
key_avro_schema=key_avro_schema,
key_data_type=parse_singular_param_to_enum(
key_data_type, KafkaMessageKeyType),
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
ssl_certificate_pem=ssl_certificate_pem,
ssl_key_pem=ssl_key_pem,
ssl_ca_pem=ssl_ca_pem,
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
Expand Down Expand Up @@ -2588,12 +2606,18 @@ def kafka_output(self,
topic: str,
broker_list: str,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
Comment thread
hallvictoria marked this conversation as resolved.
Outdated
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
Expand Down Expand Up @@ -2630,6 +2654,10 @@ def kafka_output(self,
:param topic: The Kafka topic to which messages are published.
:param broker_list: The list of Kafka brokers to which the producer connects.
:param avro_schema: Optional. Avro schema to generate a generic record.
:param key_avro_schema: Avro schema for the message key. Used only if a
generic Avro record should be generated for the key.
:param key_data_type: Data type of the message key. Valid values: Int, Long,
String, Binary. Default is String. Ignored if key_avro_schema is set.
:param username: SASL username for use with the PLAIN and SASL-SCRAM
mechanisms. Equivalent to `'sasl.username'` in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM
Expand All @@ -2642,6 +2670,14 @@ def kafka_output(self,
Equivalent to `'ssl.certificate.location'` in librdkafka.
:param ssl_key_password: Password for the client's SSL key.
Equivalent to `'ssl.key.password'` in librdkafka.
:param ssl_certificate_pem: Client certificate in PEM format.
Equivalent to 'ssl.certificate.pem' in librdkafka.
:param ssl_key_pem: Client private key in PEM format.
Equivalent to 'ssl.key.pem' in librdkafka.
:param ssl_ca_pem: CA certificate for verifying the broker's certificate in PEM format.
Equivalent to 'ssl.ca.pem' in librdkafka.
:param ssl_certificate_and_key_pem: Client certificate and key in PEM format.
Additional configuration for KeyVault support (certificate with private key).
:param schema_registry_url: URL of the Avro Schema Registry.
:param schema_registry_username: Username for accessing the Schema Registry.
:param schema_registry_password: Password for accessing the Schema Registry.
Expand Down Expand Up @@ -2695,12 +2731,19 @@ def decorator():
topic=topic,
broker_list=broker_list,
avro_schema=avro_schema,
key_avro_schema=key_avro_schema,
key_data_type=parse_singular_param_to_enum(
key_data_type, KafkaMessageKeyType),
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
ssl_certificate_pem=ssl_certificate_pem,
ssl_key_pem=ssl_key_pem,
ssl_ca_pem=ssl_ca_pem,
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
Expand Down
73 changes: 52 additions & 21 deletions azure/functions/decorators/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class OAuthBearerMethod(StringifyEnum):
OIDC = 1


class KafkaMessageKeyType(StringifyEnum):
INT = 0
LONG = 1
STRING = 2
BINARY = 3


class KafkaOutput(OutputBinding):
@staticmethod
def get_binding_name() -> str:
Expand All @@ -39,15 +46,21 @@ def __init__(self,
topic: str,
broker_list: str,
avro_schema: Optional[str],
username: Optional[str],
password: Optional[str],
ssl_key_location: Optional[str],
ssl_ca_location: Optional[str],
ssl_certificate_location: Optional[str],
ssl_key_password: Optional[str],
schema_registry_url: Optional[str],
schema_registry_username: Optional[str],
schema_registry_password: Optional[str],
key_avro_schema: Optional[str] = None,
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
Expand All @@ -68,12 +81,18 @@ def __init__(self,
self.topic = topic
self.broker_list = broker_list
self.avro_schema = avro_schema
self.key_avro_schema = key_avro_schema
self.key_data_type = key_data_type
self.username = username
self.password = password
self.ssl_key_location = ssl_key_location
self.ssl_ca_location = ssl_ca_location
self.ssl_certificate_location = ssl_certificate_location
self.ssl_key_password = ssl_key_password
self.ssl_certificate_pem = ssl_certificate_pem
self.ssl_key_pem = ssl_key_pem
self.ssl_ca_pem = ssl_ca_pem
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
self.schema_registry_url = schema_registry_url
self.schema_registry_username = schema_registry_username
self.schema_registry_password = schema_registry_password
Expand Down Expand Up @@ -104,18 +123,24 @@ def __init__(self,
name: str,
topic: str,
broker_list: str,
event_hub_connection_string: Optional[str],
consumer_group: Optional[str],
avro_schema: Optional[str],
username: Optional[str],
password: Optional[str],
ssl_key_location: Optional[str],
ssl_ca_location: Optional[str],
ssl_certificate_location: Optional[str],
ssl_key_password: Optional[str],
schema_registry_url: Optional[str],
schema_registry_username: Optional[str],
schema_registry_password: Optional[str],
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
Expand All @@ -133,12 +158,18 @@ def __init__(self,
self.event_hub_connection_string = event_hub_connection_string
self.consumer_group = consumer_group
self.avro_schema = avro_schema
self.key_avro_schema = key_avro_schema
self.key_data_type = key_data_type
self.username = username
self.password = password
self.ssl_key_location = ssl_key_location
self.ssl_ca_location = ssl_ca_location
self.ssl_certificate_location = ssl_certificate_location
self.ssl_key_password = ssl_key_password
self.ssl_certificate_pem = ssl_certificate_pem
self.ssl_key_pem = ssl_key_pem
self.ssl_ca_pem = ssl_ca_pem
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
self.schema_registry_url = schema_registry_url
self.schema_registry_username = schema_registry_username
self.schema_registry_password = schema_registry_password
Expand Down
70 changes: 69 additions & 1 deletion tests/decorators/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azure.functions.decorators.core import BindingDirection, Cardinality, \
DataType
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol
BrokerAuthenticationMode, BrokerProtocol, KafkaMessageKeyType


class TestKafka(unittest.TestCase):
Expand Down Expand Up @@ -102,3 +102,71 @@ def test_kafka_output_valid_creation(self):
'topic': 'topic',
'type': KAFKA,
'username': 'username'})

def test_kafka_trigger_with_key_data_type_and_pem(self):
trigger = KafkaTrigger(name="arg_name",
topic="topic",
broker_list="broker_list",
key_avro_schema="key_avro_schema",
key_data_type=KafkaMessageKeyType.LONG,
ssl_certificate_pem="cert_pem",
ssl_key_pem="key_pem",
ssl_ca_pem="ca_pem",
ssl_certificate_and_key_pem="cert_and_key_pem",
data_type=DataType.UNDEFINED)

self.assertEqual(trigger.get_binding_name(), "kafkaTrigger")
dict_repr = trigger.get_dict_repr()
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.LONG)
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")

def test_kafka_output_with_key_data_type_and_pem(self):
output = KafkaOutput(name="arg_name",
topic="topic",
broker_list="broker_list",
key_avro_schema="key_avro_schema",
key_data_type=KafkaMessageKeyType.BINARY,
ssl_certificate_pem="cert_pem",
ssl_key_pem="key_pem",
ssl_ca_pem="ca_pem",
ssl_certificate_and_key_pem="cert_and_key_pem",
data_type=DataType.UNDEFINED)

self.assertEqual(output.get_binding_name(), "kafka")
dict_repr = output.get_dict_repr()
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.BINARY)
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")

def test_kafka_message_key_type_enum(self):
"""Test that KafkaMessageKeyType enum has the correct values"""
self.assertEqual(KafkaMessageKeyType.INT, 0)
self.assertEqual(KafkaMessageKeyType.LONG, 1)
self.assertEqual(KafkaMessageKeyType.STRING, 2)
self.assertEqual(KafkaMessageKeyType.BINARY, 3)

def test_kafka_trigger_key_data_type_default(self):
"""Test that key_data_type defaults to STRING"""
trigger = KafkaTrigger(name="arg_name",
topic="topic",
broker_list="broker_list")

dict_repr = trigger.get_dict_repr()
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)

def test_kafka_output_key_data_type_default(self):
"""Test that key_data_type defaults to STRING"""
output = KafkaOutput(name="arg_name",
topic="topic",
broker_list="broker_list",
avro_schema="schema")

dict_repr = output.get_dict_repr()
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)
Loading