Skip to content

Commit a42c101

Browse files
committed
merge more of winloop into uvloop, add comment about streams returning 0 if AF_Unix
1 parent 250a656 commit a42c101

File tree

11 files changed

+1337
-1153
lines changed

11 files changed

+1337
-1153
lines changed

tests/test_pipes.py

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,42 @@
22
import io
33
import os
44
import socket
5+
import sys
6+
import unittest
57

68
from uvloop import _testbase as tb
79

8-
910
# All tests are copied from asyncio (mostly as-is)
1011

1112

1213
class MyReadPipeProto(asyncio.Protocol):
1314
done = None
1415

1516
def __init__(self, loop=None):
16-
self.state = ['INITIAL']
17+
self.state = ["INITIAL"]
1718
self.nbytes = 0
1819
self.transport = None
1920
if loop is not None:
2021
self.done = asyncio.Future(loop=loop)
2122

2223
def connection_made(self, transport):
2324
self.transport = transport
24-
assert self.state == ['INITIAL'], self.state
25-
self.state.append('CONNECTED')
25+
assert self.state == ["INITIAL"], self.state
26+
self.state.append("CONNECTED")
2627

2728
def data_received(self, data):
28-
assert self.state == ['INITIAL', 'CONNECTED'], self.state
29+
assert self.state == ["INITIAL", "CONNECTED"], self.state
2930
self.nbytes += len(data)
3031

3132
def eof_received(self):
32-
assert self.state == ['INITIAL', 'CONNECTED'], self.state
33-
self.state.append('EOF')
33+
assert self.state == ["INITIAL", "CONNECTED"], self.state
34+
self.state.append("EOF")
3435

3536
def connection_lost(self, exc):
36-
if 'EOF' not in self.state:
37-
self.state.append('EOF') # It is okay if EOF is missed.
38-
assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
39-
self.state.append('CLOSED')
37+
if "EOF" not in self.state:
38+
self.state.append("EOF") # It is okay if EOF is missed.
39+
assert self.state == ["INITIAL", "CONNECTED", "EOF"], self.state
40+
self.state.append("CLOSED")
4041
if self.done:
4142
self.done.set_result(None)
4243

@@ -46,19 +47,19 @@ class MyWritePipeProto(asyncio.BaseProtocol):
4647
paused = False
4748

4849
def __init__(self, loop=None):
49-
self.state = 'INITIAL'
50+
self.state = "INITIAL"
5051
self.transport = None
5152
if loop is not None:
5253
self.done = asyncio.Future(loop=loop)
5354

5455
def connection_made(self, transport):
5556
self.transport = transport
56-
assert self.state == 'INITIAL', self.state
57-
self.state = 'CONNECTED'
57+
assert self.state == "INITIAL", self.state
58+
self.state = "CONNECTED"
5859

5960
def connection_lost(self, exc):
60-
assert self.state == 'CONNECTED', self.state
61-
self.state = 'CLOSED'
61+
assert self.state == "CONNECTED", self.state
62+
self.state = "CLOSED"
6263
if self.done:
6364
self.done.set_result(None)
6465

@@ -69,62 +70,70 @@ def resume_writing(self):
6970
self.paused = False
7071

7172

73+
# Winloop comment: on Windows the asyncio event loop does not support pipes.
74+
# For instance, running in ..\Lib\test\test_asyncio the unit test
75+
# test_events.ProactorEventLoopTests.test_unclosed_pipe_transport
76+
# gives: ... skipped "Don't support pipes for Windows"
77+
# See also: https://github.com/python/cpython/issues/71019 from that:
78+
# "On Windows, sockets and named pipes are supported, on Linux fifo, sockets,
79+
# pipes and character devices are supported, no idea about macOS."
7280
class _BasePipeTest:
7381
def test_read_pipe(self):
82+
if sys.platform == "win32" and self.is_asyncio_loop():
83+
raise unittest.SkipTest("do not support pipes for Windows")
84+
7485
proto = MyReadPipeProto(loop=self.loop)
7586

7687
rpipe, wpipe = os.pipe()
77-
pipeobj = io.open(rpipe, 'rb', 1024)
88+
pipeobj = io.open(rpipe, "rb", 1024)
7889

7990
async def connect():
80-
t, p = await self.loop.connect_read_pipe(
81-
lambda: proto, pipeobj)
91+
t, p = await self.loop.connect_read_pipe(lambda: proto, pipeobj)
8292
self.assertIs(p, proto)
8393
self.assertIs(t, proto.transport)
84-
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
94+
self.assertEqual(["INITIAL", "CONNECTED"], proto.state)
8595
self.assertEqual(0, proto.nbytes)
8696

8797
self.loop.run_until_complete(connect())
8898

89-
os.write(wpipe, b'1')
99+
os.write(wpipe, b"1")
90100
tb.run_until(self.loop, lambda: proto.nbytes >= 1)
91101
self.assertEqual(1, proto.nbytes)
92102

93-
os.write(wpipe, b'2345')
103+
os.write(wpipe, b"2345")
94104
tb.run_until(self.loop, lambda: proto.nbytes >= 5)
95-
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
105+
self.assertEqual(["INITIAL", "CONNECTED"], proto.state)
96106
self.assertEqual(5, proto.nbytes)
97107

98108
os.close(wpipe)
99109
self.loop.run_until_complete(proto.done)
100-
self.assertEqual(
101-
['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
110+
self.assertEqual(["INITIAL", "CONNECTED", "EOF", "CLOSED"], proto.state)
102111
# extra info is available
103-
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
112+
self.assertIsNotNone(proto.transport.get_extra_info("pipe"))
104113

114+
@unittest.skipIf(sys.platform == "win32", "no os.openpty on Windows")
105115
def test_read_pty_output(self):
106116
proto = MyReadPipeProto(loop=self.loop)
107117

108118
master, slave = os.openpty()
109-
master_read_obj = io.open(master, 'rb', 0)
119+
master_read_obj = io.open(master, "rb", 0)
110120

111121
async def connect():
112-
t, p = await self.loop.connect_read_pipe(
113-
lambda: proto, master_read_obj)
122+
t, p = await self.loop.connect_read_pipe(lambda: proto, master_read_obj)
114123
self.assertIs(p, proto)
115124
self.assertIs(t, proto.transport)
116-
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
125+
self.assertEqual(["INITIAL", "CONNECTED"], proto.state)
117126
self.assertEqual(0, proto.nbytes)
118127

119128
self.loop.run_until_complete(connect())
120129

121-
os.write(slave, b'1')
130+
os.write(slave, b"1")
122131
tb.run_until(self.loop, lambda: proto.nbytes)
123132
self.assertEqual(1, proto.nbytes)
124133

125-
os.write(slave, b'2345')
134+
os.write(slave, b"2345")
126135
tb.run_until(self.loop, lambda: proto.nbytes >= 5)
127-
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
136+
self.assertEqual(["INITIAL", "CONNECTED"], proto.state)
128137
self.assertEqual(5, proto.nbytes)
129138

130139
# On Linux, transport raises EIO when slave is closed --
@@ -134,24 +143,29 @@ async def connect():
134143
proto.transport.close()
135144
self.loop.run_until_complete(proto.done)
136145

137-
self.assertEqual(
138-
['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
146+
self.assertEqual(["INITIAL", "CONNECTED", "EOF", "CLOSED"], proto.state)
139147
# extra info is available
140-
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
148+
self.assertIsNotNone(proto.transport.get_extra_info("pipe"))
141149

142150
def test_write_pipe(self):
151+
if sys.platform == "win32" and self.is_asyncio_loop():
152+
raise unittest.SkipTest("do not support pipes for Windows")
153+
154+
if sys.platform == "win32" and sys.version_info[:3] < (3, 12, 0):
155+
raise unittest.SkipTest("no os.set_blocking() on Windows")
156+
143157
rpipe, wpipe = os.pipe()
144158
os.set_blocking(rpipe, False)
145-
pipeobj = io.open(wpipe, 'wb', 1024)
159+
pipeobj = io.open(wpipe, "wb", 1024)
146160

147161
proto = MyWritePipeProto(loop=self.loop)
148162
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
149163
transport, p = self.loop.run_until_complete(connect)
150164
self.assertIs(p, proto)
151165
self.assertIs(transport, proto.transport)
152-
self.assertEqual('CONNECTED', proto.state)
166+
self.assertEqual("CONNECTED", proto.state)
153167

154-
transport.write(b'1')
168+
transport.write(b"1")
155169

156170
data = bytearray()
157171

@@ -164,59 +178,61 @@ def reader(data):
164178
return len(data)
165179

166180
tb.run_until(self.loop, lambda: reader(data) >= 1)
167-
self.assertEqual(b'1', data)
181+
self.assertEqual(b"1", data)
168182

169-
transport.write(b'2345')
183+
transport.write(b"2345")
170184
tb.run_until(self.loop, lambda: reader(data) >= 5)
171-
self.assertEqual(b'12345', data)
172-
self.assertEqual('CONNECTED', proto.state)
185+
self.assertEqual(b"12345", data)
186+
self.assertEqual("CONNECTED", proto.state)
173187

174188
os.close(rpipe)
175189

176190
# extra info is available
177-
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
191+
self.assertIsNotNone(proto.transport.get_extra_info("pipe"))
178192

179193
# close connection
180194
proto.transport.close()
181195
self.loop.run_until_complete(proto.done)
182-
self.assertEqual('CLOSED', proto.state)
196+
self.assertEqual("CLOSED", proto.state)
183197

198+
@unittest.skipIf(sys.platform == "win32", "no Unix sockets on Windows")
184199
def test_write_pipe_disconnect_on_close(self):
185200
rsock, wsock = socket.socketpair()
186201
rsock.setblocking(False)
187202

188-
pipeobj = io.open(wsock.detach(), 'wb', 1024)
203+
pipeobj = io.open(wsock.detach(), "wb", 1024)
189204

190205
proto = MyWritePipeProto(loop=self.loop)
191206
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
192207
transport, p = self.loop.run_until_complete(connect)
193208
self.assertIs(p, proto)
194209
self.assertIs(transport, proto.transport)
195-
self.assertEqual('CONNECTED', proto.state)
210+
self.assertEqual("CONNECTED", proto.state)
196211

197-
transport.write(b'1')
212+
transport.write(b"1")
198213
data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
199-
self.assertEqual(b'1', data)
214+
self.assertEqual(b"1", data)
200215

201216
rsock.close()
202217

203218
self.loop.run_until_complete(proto.done)
204-
self.assertEqual('CLOSED', proto.state)
219+
self.assertEqual("CLOSED", proto.state)
205220

221+
@unittest.skipIf(sys.platform == "win32", "no os.openpty on Windows")
206222
def test_write_pty(self):
207223
master, slave = os.openpty()
208224
os.set_blocking(master, False)
209225

210-
slave_write_obj = io.open(slave, 'wb', 0)
226+
slave_write_obj = io.open(slave, "wb", 0)
211227

212228
proto = MyWritePipeProto(loop=self.loop)
213229
connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
214230
transport, p = self.loop.run_until_complete(connect)
215231
self.assertIs(p, proto)
216232
self.assertIs(transport, proto.transport)
217-
self.assertEqual('CONNECTED', proto.state)
233+
self.assertEqual("CONNECTED", proto.state)
218234

219-
transport.write(b'1')
235+
transport.write(b"1")
220236

221237
data = bytearray()
222238

@@ -228,48 +244,49 @@ def reader(data):
228244
data += chunk
229245
return len(data)
230246

231-
tb.run_until(self.loop, lambda: reader(data) >= 1,
232-
timeout=10)
233-
self.assertEqual(b'1', data)
247+
tb.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
248+
self.assertEqual(b"1", data)
234249

235-
transport.write(b'2345')
236-
tb.run_until(self.loop, lambda: reader(data) >= 5,
237-
timeout=10)
238-
self.assertEqual(b'12345', data)
239-
self.assertEqual('CONNECTED', proto.state)
250+
transport.write(b"2345")
251+
tb.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
252+
self.assertEqual(b"12345", data)
253+
self.assertEqual("CONNECTED", proto.state)
240254

241255
os.close(master)
242256

243257
# extra info is available
244-
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
258+
self.assertIsNotNone(proto.transport.get_extra_info("pipe"))
245259

246260
# close connection
247261
proto.transport.close()
248262
self.loop.run_until_complete(proto.done)
249-
self.assertEqual('CLOSED', proto.state)
263+
self.assertEqual("CLOSED", proto.state)
250264

251265
def test_write_buffer_full(self):
266+
if sys.platform == "win32":
267+
raise unittest.SkipTest("do not support pipes for Windows")
268+
252269
rpipe, wpipe = os.pipe()
253-
pipeobj = io.open(wpipe, 'wb', 1024)
270+
pipeobj = io.open(wpipe, "wb", 1024)
254271

255272
proto = MyWritePipeProto(loop=self.loop)
256273
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
257274
transport, p = self.loop.run_until_complete(connect)
258275
self.assertIs(p, proto)
259276
self.assertIs(transport, proto.transport)
260-
self.assertEqual('CONNECTED', proto.state)
277+
self.assertEqual("CONNECTED", proto.state)
261278

262279
for i in range(32):
263-
transport.write(b'x' * 32768)
280+
transport.write(b"x" * 32768)
264281
if proto.paused:
265-
transport.write(b'x' * 32768)
282+
transport.write(b"x" * 32768)
266283
break
267284
else:
268285
self.fail("Didn't reach a full buffer")
269286

270287
os.close(rpipe)
271288
self.loop.run_until_complete(asyncio.wait_for(proto.done, 1))
272-
self.assertEqual('CLOSED', proto.state)
289+
self.assertEqual("CLOSED", proto.state)
273290

274291

275292
class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase):

0 commit comments

Comments
 (0)