Skip to content

Commit 2c85319

Browse files
authored
Fix redis connect (#877)
* fix all redis pubsub connection * remove ping check in pubsub workers * add success subscribe log * update * cache roles in redis timeout 600 * update * change some log message level
1 parent 14e15ab commit 2c85319

14 files changed

Lines changed: 260 additions & 89 deletions

File tree

dtable_events/activities/handlers.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,23 @@ def __init__(self, app):
3030
Thread.__init__(self)
3131
self._finished = Event()
3232
self._db_session_class = init_db_session_class()
33-
self._redis_client = RedisClient()
33+
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
34+
health_check_interval=30, retry_on_timeout=True)
3435
self.app = app
36+
self._pubsub_channel_name = 'table-events'
37+
self._pubsub_no_message_timeout = 5 * 60
3538

3639
def run(self):
3740
logger.info('Starting handle table activities...')
38-
subscriber = self._redis_client.get_subscriber('table-events')
39-
41+
subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
42+
last_pubsub_message_time = time.time()
4043
while not self._finished.is_set():
4144
try:
4245
message = subscriber.get_message()
4346
if message is not None:
47+
if message.get('type') != 'message':
48+
continue
49+
last_pubsub_message_time = time.time()
4450
event = json.loads(message['data'])
4551
if event['op_type'] not in self.SUPPORT_OPERATION_TYPES:
4652
continue
@@ -56,7 +62,13 @@ def run(self):
5662
finally:
5763
session.close()
5864
else:
65+
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
66+
subscriber = self._redis_client.refresh_subscriber(
67+
subscriber, self._pubsub_channel_name, 'no message timeout')
68+
last_pubsub_message_time = time.time()
69+
continue
5970
time.sleep(0.5)
6071
except Exception as e:
61-
logger.error('Failed get message from redis: %s' % e)
62-
subscriber = self._redis_client.get_subscriber('table-events')
72+
logger.error('redis pubsub receive error: %s', e)
73+
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
74+
last_pubsub_message_time = time.time()

dtable_events/api_calls/api_calls_counter.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
logger = logging.getLogger(__name__)
2020

2121

22-
class APICallsCounter:
22+
class APICallsCounter(object):
2323
def __init__(self):
2424
self._finished = Event()
2525
self._db_session_class = init_db_session_class()
26-
self._redis_client = RedisClient()
26+
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
27+
health_check_interval=30, retry_on_timeout=True)
2728
self.keep_months = 3 # including this month
29+
self._pubsub_channel_name = 'stats_api_calls'
30+
self._pubsub_no_message_timeout = 5 * 60
2831

2932
def count_api_gateway(self, info, db_session):
3033
try:
@@ -126,12 +129,16 @@ def count_api_calls(self, info, db_session):
126129

127130
def count(self):
128131
logger.info('Starting count api calls...')
129-
subscriber = self._redis_client.get_subscriber('stats_api_calls')
132+
subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
133+
last_pubsub_message_time = time.time()
130134

131135
while not self._finished.is_set():
132136
try:
133137
message = subscriber.get_message()
134138
if message is not None:
139+
if message.get('type') != 'message':
140+
continue
141+
last_pubsub_message_time = time.time()
135142
msg = json.loads(message['data'])
136143
session = self._db_session_class()
137144
try:
@@ -141,10 +148,16 @@ def count(self):
141148
finally:
142149
session.close()
143150
else:
151+
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
152+
subscriber = self._redis_client.refresh_subscriber(
153+
subscriber, self._pubsub_channel_name, 'no message timeout')
154+
last_pubsub_message_time = time.time()
155+
continue
144156
time.sleep(0.5)
145157
except Exception as e:
146-
logger.error('Failed get message from redis: %s' % e)
147-
subscriber = self._redis_client.get_subscriber('stats_api_calls')
158+
logger.error('redis pubsub receive error: %s', e)
159+
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
160+
last_pubsub_message_time = time.time()
148161

149162
def clean(self):
150163
logger.info('Starting schedule clean api calls...')
@@ -185,7 +198,7 @@ def timed_job():
185198
session.close()
186199

187200
try:
188-
self._redis_client.connection.publish('exceed_api_quota', json.dumps({'changed': True}))
201+
self._redis_client.publish('exceed_api_quota', json.dumps({'changed': True}))
189202
except Exception as e:
190203
logger.exception('publish exceed_api_quota error: %s', e)
191204

dtable_events/app/event_redis.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,45 @@
1414

1515
class RedisClient(object):
1616

17-
def __init__(self, socket_connect_timeout=30, socket_timeout=None):
18-
self._host = '127.0.0.1'
19-
self._port = 6379
20-
self._password = None
21-
self._parse_config()
17+
def __init__(self, socket_connect_timeout=30, socket_timeout=None,
18+
health_check_interval=None, retry_on_timeout=None):
19+
self._host = REDIS_HOST
20+
self._port = REDIS_PORT
21+
self._password = REDIS_PASSWORD
22+
23+
self._connection_kwargs = {
24+
'host': self._host,
25+
'port': self._port,
26+
'password': self._password,
27+
'socket_timeout': socket_timeout,
28+
'socket_connect_timeout': socket_connect_timeout,
29+
'decode_responses': True,
30+
}
31+
if health_check_interval is not None:
32+
self._connection_kwargs['health_check_interval'] = health_check_interval
33+
if retry_on_timeout is not None:
34+
self._connection_kwargs['retry_on_timeout'] = retry_on_timeout
2235

2336
"""
2437
By default, each Redis instance created will in turn create its own connection pool.
2538
Every caller using redis client will has it's own pool with config caller passed.
2639
"""
27-
self.connection = redis.Redis(
28-
host=self._host, port=self._port, password=self._password,
29-
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
30-
decode_responses=True
31-
)
32-
33-
def _parse_config(self):
40+
self._redis = redis.Redis(**self._connection_kwargs)
3441

35-
self._host = REDIS_HOST
36-
self._port = REDIS_PORT
37-
self._password = REDIS_PASSWORD
42+
def reconnect(self):
43+
try:
44+
self._redis.connection_pool.disconnect()
45+
except Exception:
46+
pass
47+
self._redis = redis.Redis(**self._connection_kwargs)
48+
return self._redis
3849

3950
def get_subscriber(self, channel_name):
4051
while True:
4152
try:
42-
subscriber = self.connection.pubsub(ignore_subscribe_messages=True)
53+
subscriber = self._redis.pubsub(ignore_subscribe_messages=True)
4354
subscriber.subscribe(channel_name)
55+
logger.info('redis pubsub success, success subscribe %s', channel_name)
4456
except redis.AuthenticationError as e:
4557
logger.critical('connect to redis auth error: %s', e)
4658
raise e
@@ -50,20 +62,42 @@ def get_subscriber(self, channel_name):
5062
else:
5163
return subscriber
5264

65+
def close_subscriber(self, subscriber):
66+
if not subscriber:
67+
return
68+
try:
69+
subscriber.close()
70+
except Exception as e:
71+
logger.debug('close redis subscriber failed: %s', e)
72+
73+
def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'):
74+
logger.info('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason)
75+
self.close_subscriber(subscriber)
76+
try:
77+
self.reconnect()
78+
except Exception as e:
79+
logger.error('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e)
80+
return self.get_subscriber(pubsub_channel_name)
81+
5382
def get(self, key):
54-
return self.connection.get(key)
83+
return self._redis.get(key)
5584

5685
def set(self, key, value, timeout=None):
5786
if not timeout:
58-
return self.connection.set(key, value)
87+
return self._redis.set(key, value)
5988
else:
60-
return self.connection.setex(key, timeout, value)
89+
return self._redis.setex(key, timeout, value)
6190

6291
def delete(self, key):
63-
return self.connection.delete(key)
92+
return self._redis.delete(key)
6493

6594
def publish(self, channel_name, message):
66-
return self.connection.publish(channel_name, message)
95+
try:
96+
return self._redis.publish(channel_name, message)
97+
except Exception as e:
98+
logger.warning('redis publish failed on %s: %s', channel_name, e)
99+
self.reconnect()
100+
return self._redis.publish(channel_name, message)
67101

68102

69103
class RedisCache(object):

dtable_events/app/stats_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def __init__(self):
1616

1717
def send(self, channel: str, info: dict):
1818
try:
19-
self._redis_client.connection.publish(channel, json.dumps(info))
19+
self._redis_client.publish(channel, json.dumps(info))
2020
except Exception as e:
2121
logger.warning('send info to channel: %s error: %s', channel, e)
2222

dtable_events/automations/automations_pipeline.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,24 @@ def get_percent(self, owner, org_id, workers):
6565
return self.counters.get(limit_key, 0) / (self.window_secs * workers)
6666

6767

68-
class AutomationsPipeline:
68+
class AutomationsPipeline(object):
6969

7070
def __init__(self):
7171
self.workers = 5
7272
self.automations_queue: Queue[AutomationRule] = Queue()
7373
self.results_queue: Queue[AutomationResult] = Queue()
74+
self._pubsub_no_message_timeout = 5 * 60
7475

7576
self._db_session_class = init_db_session_class()
7677

77-
self._redis_client = RedisClient(socket_timeout=10)
78+
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=10,
79+
health_check_interval=30, retry_on_timeout=True)
7880
self.per_update_channel = 'automation-rule-triggered'
7981

8082
self.rate_limiter = RateLimiter()
8183

8284
self.automations_stats_manager = AutomationsStatsManager()
8385

84-
self.log_none_message_timeout = 10 * 60
85-
8686
# metrics
8787
self.realtime_trigger_count = 0
8888
self.scheduled_trigger_count = 0
@@ -143,15 +143,17 @@ def get_automation_rule(self, db_session, event_data):
143143
def receive(self):
144144
auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}")
145145
subscriber = self._redis_client.get_subscriber(self.per_update_channel)
146-
last_message_time = datetime.now()
146+
last_pubsub_message_time = time.time()
147147
while True:
148148
try:
149149
message = subscriber.get_message()
150150
self.realtime_automation_heartbeat = time.time()
151151
if message is not None:
152+
if message.get('type') != 'message':
153+
continue
154+
last_pubsub_message_time = time.time()
152155
event = json.loads(message['data'])
153156
auto_rule_logger.info(f"subscribe event {event}")
154-
last_message_time = datetime.now()
155157

156158
db_session = self._db_session_class()
157159
try:
@@ -192,14 +194,17 @@ def receive(self):
192194
finally:
193195
db_session.close()
194196
else:
195-
if (datetime.now() - last_message_time).seconds >= self.log_none_message_timeout:
196-
auto_rule_logger.info(f'No message for {self.log_none_message_timeout}s...')
197-
last_message_time = datetime.now()
197+
if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout:
198+
auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout)
199+
subscriber = self._redis_client.refresh_subscriber(
200+
subscriber, self.per_update_channel, 'no message timeout')
201+
last_pubsub_message_time = time.time()
202+
continue
198203
time.sleep(0.5)
199204
except Exception as e:
200-
auto_rule_logger.exception('Failed get automation rules message from redis: %s' % e)
201-
subscriber = self._redis_client.get_subscriber('automation-rule-triggered')
202-
last_message_time = datetime.now()
205+
auto_rule_logger.error('redis pubsub receive error: %s', e)
206+
subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e))
207+
last_pubsub_message_time = time.time()
203208

204209
def worker(self):
205210
while True:

dtable_events/automations/automations_stats_manager.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from dtable_events.utils import get_dtable_admins
1010

1111
from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT
12+
from dtable_events.app.event_redis import redis_cache
1213
from dtable_events.automations.actions import AutomationResult
1314
from dtable_events.utils.dtable_web_api import DTableWebAPI
1415

@@ -17,15 +18,19 @@ class AutomationsStatsManager:
1718

1819
def __init__(self):
1920
self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL)
20-
self.roles = None
21+
self.roles_cache_key = 'DTABLE_WEB_ROLES'
22+
self.roles_cache_timeout = 600
2123

2224
self.ccnet_db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME
2325

2426
def get_roles(self):
25-
if self.roles:
26-
return self.roles
27-
self.roles = self.dtable_web_api.internal_roles()
28-
return self.roles
27+
roles_json = redis_cache.get(self.roles_cache_key)
28+
if not roles_json:
29+
roles = self.dtable_web_api.internal_roles()
30+
roles_json = json.dumps(roles)
31+
redis_cache.set(self.roles_cache_key, roles_json, timeout=self.roles_cache_timeout)
32+
return roles
33+
return json.loads(roles_json)
2934

3035
def get_user_quota(self, db_session, username):
3136
sql = "SELECT username, monthly_automation_limit_per_user FROM user_quota WHERE username=:username"

dtable_events/notification_rules/handler.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,22 @@ def __init__(self):
1515
Thread.__init__(self)
1616
self._finished = Event()
1717
self._db_session_class = init_db_session_class()
18-
self._redis_client = RedisClient()
18+
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
19+
health_check_interval=30, retry_on_timeout=True)
20+
self._pubsub_channel_name = 'notification-rule-triggered'
21+
self._pubsub_no_message_timeout = 5 * 60
1922

2023
def run(self):
2124
logger.info('Starting handle notification rules...')
22-
subscriber = self._redis_client.get_subscriber('notification-rule-triggered')
23-
25+
subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
26+
last_pubsub_message_time = time.time()
2427
while not self._finished.is_set():
2528
try:
2629
message = subscriber.get_message()
2730
if message is not None:
31+
if message.get('type') != 'message':
32+
continue
33+
last_pubsub_message_time = time.time()
2834
event = json.loads(message['data'])
2935
session = self._db_session_class()
3036
try:
@@ -34,7 +40,13 @@ def run(self):
3440
finally:
3541
session.close()
3642
else:
43+
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
44+
subscriber = self._redis_client.refresh_subscriber(
45+
subscriber, self._pubsub_channel_name, 'no message timeout')
46+
last_pubsub_message_time = time.time()
47+
continue
3748
time.sleep(0.5)
3849
except Exception as e:
39-
logger.error('Failed get notification rules message from redis: %s' % e)
40-
subscriber = self._redis_client.get_subscriber('notification-rule-triggered')
50+
logger.error('redis pubsub receive error: %s', e)
51+
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
52+
last_pubsub_message_time = time.time()

0 commit comments

Comments
 (0)