Skip to content

Commit 1a94c9f

Browse files
authored
Add ssl context support (#19)
* Add ssl context support * Update version * Enrich example with ssl context
1 parent aef970d commit 1a94c9f

3 files changed

Lines changed: 25 additions & 5 deletions

File tree

examples/kafka_proto_event_producing.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import ssl
23

34
import pydantic
45

@@ -30,16 +31,32 @@ def proto(self) -> UserJoinedECSTProtobuf:
3031
)
3132

3233

34+
def create_kafka_producer(ssl_context: ssl.SSLContext | None = None) -> kafka_adapters.KafkaProducer:
35+
dsn = "localhost:9092"
36+
value_serializer = protobuf.protobuf_value_serializer
37+
if ssl_context is None:
38+
return kafka_adapters.kafka_producer_factory(
39+
security_protocol="PLAINTEXT",
40+
sasl_mechanism="PLAIN",
41+
dsn=dsn,
42+
value_serializer=value_serializer,
43+
)
44+
return kafka_adapters.kafka_producer_factory(
45+
security_protocol="SASL_SSL",
46+
sasl_mechanism="SCRAM-SHA-256",
47+
ssl_context=ssl_context,
48+
dsn=dsn,
49+
value_serializer=value_serializer,
50+
)
51+
52+
3353
async def main():
3454
event = UserJoinedECST(
3555
event_name="user_joined_ecst",
3656
topic="user_joined_proto",
3757
payload=UserJoinedECSTPayload(user_id="123", meeting_id="456"),
3858
)
39-
kafka_producer = kafka_adapters.kafka_producer_factory(
40-
dsn="localhost:9092",
41-
value_serializer=protobuf.protobuf_value_serializer,
42-
)
59+
kafka_producer = create_kafka_producer(ssl_context=None)
4360
broker = kafka.KafkaMessageBroker(
4461
producer=kafka_producer,
4562
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
2929
name = "python-cqrs"
3030
readme = "README.md"
3131
requires-python = ">=3.10"
32-
version = "3.0.1"
32+
version = "3.1.1"
3333

3434
[project.optional-dependencies]
3535
dev = [

src/cqrs/adapters/kafka.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import functools
33
import logging
4+
import ssl
45
import typing
56

67
import aiokafka
@@ -89,6 +90,7 @@ def kafka_producer_factory(
8990
dsn: typing.Text,
9091
security_protocol: SecurityProtocol = "PLAINTEXT",
9192
sasl_mechanism: SaslMechanism = "PLAIN",
93+
ssl_context: ssl.SSLContext | None = None,
9294
retry_count: int = 3,
9395
retry_delay: int = 1,
9496
user: typing.Text | None = None,
@@ -105,6 +107,7 @@ def kafka_producer_factory(
105107
sasl_mechanism=sasl_mechanism,
106108
sasl_plain_username=user,
107109
sasl_plain_password=password,
110+
ssl_context=ssl_context,
108111
loop=loop,
109112
)
110113
return KafkaProducer(

0 commit comments

Comments
 (0)