-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathwsdaemon.py
More file actions
118 lines (101 loc) · 3.91 KB
/
wsdaemon.py
File metadata and controls
118 lines (101 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import sys
import os
import json
from urllib.parse import urlparse
from twisted.internet import reactor, protocol, defer, task
from twisted.python import log
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from autobahn.websocket.types import ConnectionDeny
import pika
from pika import exceptions
from pika.adapters import twisted_connection
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lastwill.settings')
import django
django.setup()
from django.http.cookie import parse_cookie
from django.contrib.sessions.models import Session
from django.contrib.auth.models import User
from django.conf import settings
class WSP(WebSocketServerProtocol):
user = None
def check_origin(self, origin):
if origin not in ('http://dev.mywish.io', 'http://contracts.mywish.io'):
raise ConnectionDeny(404)
def check_auth(self, cookie):
session_key = cookie.get(settings.SESSION_COOKIE_NAME, '')
try:
session = Session.objects.get(session_key=session_key)
user_id = session.get_decoded().get('_auth_user_id')
self.user = User.objects.get(id=user_id, is_active=True)
except:
raise ConnectionDeny(403)
def onConnect(self, request):
origin = request.headers.get('origin', '')
# self.check_origin(origin)
cookie = parse_cookie(request.headers.get('cookie', ''))
self.check_auth(cookie)
return super().onConnect(request)
def onOpen(self):
if self.user.id in self.factory.connections_dict.keys():
self.factory.connections_dict[self.user.id].append(self)
else:
self.factory.connections_dict[self.user.id] = [self]
self.factory.pings_lost[self.peer] = 0
self.run = True
self.doPing()
def onMessage(self, payload, isBinary):
return
def onClose(self, wasClean, code, reason):
self.run = False
if self.user and self in self.factory.connections_dict[self.user.id]:
self.factory.connections_dict[self.user.id].remove(self)
self.factory.pings_lost.pop(self.peer, None)
def doPing(self):
if self.run:
if self.factory.pings_lost[self.peer] >= 3:
print('closing due to timeout')
self.sendClose()
else:
self.sendPing()
self.factory.pings_lost[self.peer] += 1
reactor.callLater(20, self.doPing)
def onPong(self, payload):
self.factory.pings_lost[self.peer] = 0
@defer.inlineCallbacks
def run(connection, proto_dict):
channel = yield connection.channel()
queue = yield channel.queue_declare(queue='websockets', durable=True, auto_delete=False, exclusive=False)
queue_object, consumer_tag = yield channel.basic_consume('websockets', auto_ack=True)
l = task.LoopingCall(read, queue_object, proto_dict)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object, proto_dict):
ch, method, properties, body = yield queue_object.get()
user = int(properties.type)
print('sending to', user)
for c in proto_dict.get(user, []):
yield c.sendMessage(body, False)
print('sended to', user)
if __name__ == '__main__':
log.startLogging(sys.stdout)
factory = WebSocketServerFactory("ws://127.0.0.1:8077")
factory.protocol = WSP
factory.setProtocolOptions(maxConnections=10000)
factory.connections_dict = {}
factory.pings_lost = {}
reactor.listenTCP(8077, factory)
cc = protocol.ClientCreator(
reactor,
twisted_connection.TwistedProtocolConnection,
pika.ConnectionParameters(
'127.0.0.1',
5672,
'mywill',
pika.PlainCredentials('java', 'java'),
heartbeat=0,
)
)
d = cc.connectTCP('127.0.0.1', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run, factory.connections_dict)
reactor.run()