33import ssl
44
55from aiokafka import AIOKafkaProducer
6+ from tenacity import (
7+ retry ,
8+ retry_if_exception_type ,
9+ stop_after_attempt ,
10+ wait_exponential ,
11+ )
612
713from crowdgit .errors import QueueConnectionError , QueueMessageProduceError
814from crowdgit .services .base .base_service import BaseService
@@ -53,17 +59,27 @@ async def ensure_connected(self):
5359 if self ._connected :
5460 self .logger .info ("Connection unhealthy, reconnecting..." )
5561 await self .disconnect ()
56- await self .connect ()
62+ await self ._connect_with_retry ()
5763
5864 async def _is_connection_healthy (self ) -> bool :
5965 """Check if the current connection is healthy"""
6066 try :
61- # Test connection by fetching metadata - this will fail if connection is broken
62- await self .kafka_producer .client .fetch_all_metadata ()
63- return True
67+ return self .kafka_producer ._sender is not None and not self .kafka_producer ._closed
6468 except Exception :
6569 return False
6670
71+ @retry (
72+ stop = stop_after_attempt (3 ),
73+ wait = wait_exponential (multiplier = 1 , min = 2 , max = 10 ),
74+ retry = retry_if_exception_type (QueueConnectionError ),
75+ reraise = True ,
76+ )
77+ async def _connect_with_retry (self ):
78+ """
79+ Connect to Kafka with automatic retries.
80+ """
81+ await self .connect ()
82+
6783 async def connect (self ):
6884 if self ._connected :
6985 return
@@ -77,27 +93,47 @@ async def connect(self):
7793 raise QueueConnectionError () from e
7894
7995 async def disconnect (self ):
80- if self ._connected :
81- try :
82- await self .kafka_producer .stop ()
83- self .logger .info ("Disconnected from kafka" )
84- except Exception as e :
85- self .logger .error (f"Error during disconnect: { e } " )
86- finally :
87- self ._connected = False
88-
89- async def send_message (self , message_id : str , payload : str ):
90- """Send a single message to Kafka in a non-blocking way"""
91- await self .ensure_connected ()
96+ """
97+ Disconnect from Kafka and close producer
98+ """
99+ if self .kafka_producer ._closed :
100+ self .logger .debug ("Producer already closed, skipping" )
101+ return
102+
103+ try :
104+ await self .kafka_producer .stop ()
105+ self .logger .info ("Disconnected from kafka" )
106+ except Exception as e :
107+ self .logger .error (f"Error during disconnect: { e } " )
108+ finally :
109+ self ._connected = False
110+
111+ async def shutdown (self ):
112+ """
113+ Shutdown the queue service and cleanup resources .
114+ Ensures:
115+ - All pending messages are flushed
116+ - Producer is properly stopped
117+ - Connections are closed
118+ - No resource leaks
119+ """
120+ self .logger .info ("Shutting down queue service..." )
121+
92122 try :
93- await self .kafka_producer .send (
94- topic = self .kafka_topic ,
95- key = message_id .encode ("utf-8" ),
96- value = payload .encode ("utf-8" ),
97- )
123+ if self ._connected :
124+ # Flush any pending messages before stopping
125+ try :
126+ await self .kafka_producer .flush ()
127+ self .logger .info ("Flushed pending messages" )
128+ except Exception as e :
129+ self .logger .warning (f"Error flushing messages during shutdown: { e } " )
130+
131+ await self .disconnect ()
132+
133+ self .logger .info ("Queue service shutdown complete" )
98134 except Exception as e :
99- self .logger .error (f"Failed to emit message { message_id } to queue with error : { e } " )
100- raise QueueMessageProduceError () from e
135+ self .logger .error (f"Failed to shutdown queue service : { repr ( e ) } " )
136+ # Don't raise - allow application to continue shutdown
101137
102138 async def send_batch_activities (self , activities_kafka : list [dict [str , str ]]):
103139 """
@@ -106,12 +142,28 @@ async def send_batch_activities(self, activities_kafka: list[dict[str, str]]):
106142 activities_kafka: List of dicts with 'message_id' and 'payload' keys
107143 (prepared by CommitService.prepare_activity_for_db_and_queue)
108144 """
145+ if not activities_kafka :
146+ return
147+
109148 await self .ensure_connected ()
149+
110150 self .logger .info (f"Emitting { len (activities_kafka )} activities to kafka queue..." )
111- await asyncio .gather (
112- * [
113- self .send_message (activity_kafka ["message_id" ], activity_kafka ["payload" ])
114- for activity_kafka in activities_kafka
151+
152+ try :
153+ futures = [
154+ self .kafka_producer .send (
155+ topic = self .kafka_topic ,
156+ key = activity ["message_id" ].encode ("utf-8" ),
157+ value = activity ["payload" ].encode ("utf-8" ),
158+ )
159+ for activity in activities_kafka
115160 ]
116- )
117- self .logger .info (f"Successfully emitted { len (activities_kafka )} activities to queue" )
161+ # Wait for all messages to be sent
162+ await asyncio .gather (* futures , return_exceptions = False )
163+
164+ self .logger .info (f"Successfully emitted { len (activities_kafka )} activities to queue" )
165+ except Exception as e :
166+ self .logger .error (f"Failed to emit batch to queue with error: { repr (e )} " )
167+ raise QueueMessageProduceError (
168+ f"Failed to send { len (activities_kafka )} messages to Kafka"
169+ ) from e
0 commit comments