Skip to content

Commit fd0666e

Browse files
committed
Refactor Broadcaster initialization and improve message processing
- Updated the Broadcaster class to use a more descriptive attribute name for initialization. - Replaced the event instantiation with a direct import from gevent for clarity. - Ensured consistent use of gevent.sleep for message processing delays.
1 parent 695872a commit fd0666e

1 file changed

Lines changed: 6 additions & 5 deletions

File tree

utils/socket.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
from utils.kafka import consumer, producer
44
from flask import current_app as app
55
from collections import defaultdict
6+
from gevent.event import Event
7+
from gevent.lock import RLock
68
from flask_sock import Sock
79
import gevent
8-
from gevent.lock import RLock
910
import time
1011
import json
1112

@@ -25,12 +26,12 @@ def __new__(cls, *args, **kwargs):
2526
return cls._instance
2627

2728
def __init__(self):
28-
if not hasattr(self, 'initialized'):
29+
if not hasattr(self, 'init'):
2930
self.subscriptions = defaultdict(set)
30-
self._stop_event = gevent.event.Event()
3131
self._broadcaster_greenlet = None
32+
self._stop_event = Event()
3233
self._lock = RLock()
33-
self.initialized = True
34+
self.init = True
3435

3536
def init_app(self, app):
3637
self.app = app
@@ -76,7 +77,7 @@ def _run(self):
7677
# Reduce batch size and timeout for more frequent processing
7778
msgs = consumer.consume(1000, timeout=0.1)
7879
if not msgs:
79-
gevent.sleep(0.1) # Use gevent.sleep instead of time.sleep
80+
gevent.sleep(0.1)
8081
continue
8182

8283
# Process messages in smaller batches

0 commit comments

Comments
 (0)