This repository was archived by the owner on Apr 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreams.py
More file actions
executable file
·130 lines (99 loc) · 2.59 KB
/
streams.py
File metadata and controls
executable file
·130 lines (99 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/python
# -*- coding: : utf-8 -*-
import logging
from collections import deque
from tornado import gen
class ClosableStream(object):
def __init__(self,closable_stream):
super(ClosableStream,self).__init__()
self._stream = closable_stream
def close(self):
if self._stream is not None:
self._stream.close()
self.__dict__.clear()
def stream(self):
return self._stream
def switch(self,stream):
self._stream = stream
class BufferedStream(ClosableStream):
def __init__(self,tornado_stream):
super(BufferedStream,self).__init__(tornado_stream)
self._buffer = ''
@gen.coroutine
def poll(self,hint):
# localized
local = self._buffer
stream = self.stream()
# do read
while len(local) < hint:
# reallocate
local = '%s%s' %(
''.join(local),
''.join((yield stream.read_bytes(hint-len(local),partial=False))),
)
# back
self._buffer = local
@gen.coroutine
def peek(self,hint):
yield self.poll(hint)
raise gen.Return(self._buffer[:hint])
@gen.coroutine
def read(self,hint):
yield self.poll(hint)
content = self._buffer[:hint]
# slice
self._buffer = self._buffer[hint:]
raise gen.Return(content)
@gen.coroutine
def write(self,content):
yield self.stream().write(content)
def readable(self):
return len(self._buffer)
class SwitchableStream(ClosableStream):
def __init__(self,candidates):
super(SwitchableStream,self).__init__(None)
self._candidates = deque(candidates)
self._readed = 0
@gen.coroutine
def _unsafe_read(self,hint):
current = self.stream()
if current is None:
try:
# pop and initialize
current = self._candidates.pop()()
# ugly and tricky
if isinstance(current,gen.Future):
current = yield current
except IndexError:
# no more
raise gen.Return(None)
# got new stream
# now recover offest
# use seek if avaliable
if hasattr(current,'seek') and callable(current.seek):
yield current.seek(self._readed)
else:
yield current.read(self._readed)
# switch
self.switch(current)
# a readbale stream
readed = yield current.read(hint)
# update state
if readed is not None:
self._readed += len(readed)
raise gen.Return(readed)
@gen.coroutine
def read(self,hint):
try:
raise gen.Return((yield self._unsafe_read(hint)))
except gen.Return as result:
raise result
except:
# close it if open
current = self.stream()
logging.exception('fail of stream:%s,try next one...' % current)
if current is not None:
current.close()
self.switch(None)
# now next
raise gen.Return((yield self.read(hint)))