11from crowdgit .services .base .base_service import BaseService
22from aiokafka import AIOKafkaProducer
33import asyncio
4+ import json
5+ import ssl
46from typing import List , Dict
57from crowdgit .settings import (
68 CROWD_KAFKA_TOPIC ,
79 CROWD_KAFKA_BROKERS ,
8- CROWD_KAFKA_PASSWORD ,
9- CROWD_KAFKA_USER ,
10+ CROWD_KAFKA_EXTRA ,
1011)
1112from crowdgit .errors import QueueConnectionError , QueueMessageProduceError
1213
@@ -17,16 +18,32 @@ class QueueService(BaseService):
1718 def __init__ (self ):
1819 super ().__init__ ()
1920 self .kafka_topic = CROWD_KAFKA_TOPIC
21+ self .kafka_producer = AIOKafkaProducer (** self ._build_kafka_config ())
22+ self ._connected = False
23+
24+ def _build_kafka_config (self ):
2025 config = {
2126 "bootstrap_servers" : CROWD_KAFKA_BROKERS ,
2227 "client_id" : self ._CLIENT_ID ,
23- "sasl_mechanism" : "PLAIN" ,
24- "sasl_plain_username" : CROWD_KAFKA_USER ,
25- "sasl_plain_password" : CROWD_KAFKA_PASSWORD ,
2628 "acks" : "all" ,
2729 }
28- self .kafka_producer = AIOKafkaProducer (** config )
29- self ._connected = False
30+
31+ # Parse extra configuration from kafkajs config
32+ extra_config = json .loads (CROWD_KAFKA_EXTRA )
33+
34+ if extra_config .get ("ssl" ):
35+ config ["security_protocol" ] = "SASL_SSL"
36+ ssl_context = ssl .create_default_context ()
37+ config ["ssl_context" ] = ssl_context
38+ else :
39+ config ["security_protocol" ] = "SASL_PLAINTEXT"
40+
41+ sasl_config = extra_config ["sasl" ]
42+ config ["sasl_mechanism" ] = sasl_config ["mechanism" ].upper ()
43+ config ["sasl_plain_username" ] = sasl_config ["username" ]
44+ config ["sasl_plain_password" ] = sasl_config ["password" ]
45+
46+ return config
3047
3148 async def ensure_connected (self ):
3249 """Ensure connection is established and healthy"""
0 commit comments