forked from miguelgrinberg/python-socketio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_redis_manager.py
More file actions
125 lines (111 loc) · 5.15 KB
/
Copy pathasync_redis_manager.py
File metadata and controls
125 lines (111 loc) · 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
import pickle
try: # pragma: no cover
from redis import asyncio as aioredis
from redis.exceptions import RedisError
except ImportError: # pragma: no cover
try:
import aioredis
from aioredis.exceptions import RedisError
except ImportError:
aioredis = None
RedisError = None
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
"""Redis based client manager for asyncio servers.
This class implements a Redis backend for event sharing across multiple
processes.
To use a Redis backend, initialize the :class:`AsyncServer` instance as
follows::
url = 'redis://hostname:port/0'
server = socketio.AsyncServer(
client_manager=socketio.AsyncRedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data):
retry = True
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except RedisError as exc:
if retry:
self._get_logger().error(
'Cannot publish to redis... '
'retrying',
extra={"redis_exception": str(exc)})
retry = False
else:
self._get_logger().error(
'Cannot publish to redis... '
'giving up',
extra={"redis_exception": str(exc)})
break
async def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
while True:
try:
if connect:
self._redis_connect()
await self.pubsub.subscribe(self.channel)
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except RedisError as exc:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep),
extra={"redis_exception": str(exc)})
connect = True
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60
async def _listen(self):
channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
await self.pubsub.unsubscribe(self.channel)