Skip to content

Commit c36a48e

Browse files
committed
a fix for fd leak
1 parent a788059 commit c36a48e

7 files changed

Lines changed: 93 additions & 35 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsloop"
3-
version = "0.1.12"
3+
version = "0.1.13"
44
edition = "2021"
55
description = "An event loop for asyncio written in Rust"
66
license = "Apache-2.0"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "maturin"
44

55
[project]
66
name = "rsloop"
7-
version = "0.1.12"
7+
version = "0.1.13"
88
description = "An event loop for asyncio written in Rust"
99
readme = "README.md"
1010
license = { file = "LICENSE" }

src/python_api.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ use crate::process_transport::{
3131
use crate::python_names;
3232
use crate::stream_transport::{
3333
create_server as create_py_server, remove_unix_socket_if_present, spawn_read_pipe_transport,
34-
spawn_write_pipe_transport, start_tls_transport, tcp_listener_from_socket_fd,
35-
tcp_server_listener, transport_from_socket, transport_from_socket_server_tls,
36-
transport_from_socket_tls, PyServer, PyStreamTransport, ServerCreateParams,
37-
TransportSpawnContext,
34+
spawn_write_pipe_transport, start_tls_transport, tcp_server_listener, transport_from_socket,
35+
transport_from_socket_server_tls, transport_from_socket_tls, PyServer, PyStreamTransport,
36+
ServerCreateParams, TransportSpawnContext,
3837
};
3938
#[cfg(unix)]
40-
use crate::stream_transport::{unix_listener_from_socket_fd, unix_server_listener};
39+
use crate::stream_transport::{
40+
tcp_listener_from_owned_socket_fd, unix_listener_from_owned_socket_fd, unix_server_listener,
41+
};
4142
use crate::tls::{client_tls_settings, server_tls_settings};
4243

4344
static ASYNCIO_TASK_CLS: OnceLock<Py<PyAny>> = OnceLock::new();
@@ -658,15 +659,15 @@ fn listener_sources_from_sockets(
658659
#[cfg(unix)]
659660
{
660661
listeners.push(if family == libc::AF_UNIX {
661-
unix_server_listener(unix_listener_from_socket_fd(fd)?)
662+
unix_server_listener(unix_listener_from_owned_socket_fd(fd)?)
662663
} else {
663-
tcp_server_listener(tcp_listener_from_socket_fd(fd)?)
664+
tcp_server_listener(tcp_listener_from_owned_socket_fd(fd)?)
664665
});
665666
}
666667
#[cfg(not(unix))]
667668
{
668669
let _ = family;
669-
listeners.push(tcp_server_listener(tcp_listener_from_socket_fd(fd)?));
670+
listeners.push(tcp_server_listener(tcp_listener_from_owned_socket_fd(fd)?));
670671
}
671672
}
672673
Ok(listeners)

src/stream_transport.rs

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,12 +1546,12 @@ impl StreamTransportCore {
15461546
let family = socket.bind(py).getattr("family")?.extract::<i32>()?;
15471547
#[cfg(unix)]
15481548
let stream = if family == libc::AF_UNIX {
1549-
StreamKind::Unix(unix_stream_from_socket_fd(fd)?)
1549+
StreamKind::Unix(unix_stream_from_owned_socket_fd(fd)?)
15501550
} else {
1551-
StreamKind::Tcp(tcp_stream_from_socket_fd(fd)?)
1551+
StreamKind::Tcp(tcp_stream_from_owned_socket_fd(fd)?)
15521552
};
15531553
#[cfg(not(unix))]
1554-
let stream = StreamKind::Tcp(tcp_stream_from_socket_fd(fd)?);
1554+
let stream = StreamKind::Tcp(tcp_stream_from_owned_socket_fd(fd)?);
15551555

15561556
Ok((
15571557
TransportSpawnContext {
@@ -2188,11 +2188,21 @@ pub fn transport_from_socket(
21882188
#[cfg(unix)]
21892189
if family == libc::AF_UNIX {
21902190
let fd = detached_socket_handle(py, &socket_obj)?;
2191-
return spawn_unix_transport(py, spawn_context, unix_stream_from_socket_fd(fd)?, None);
2191+
return spawn_unix_transport(
2192+
py,
2193+
spawn_context,
2194+
unix_stream_from_owned_socket_fd(fd)?,
2195+
None,
2196+
);
21922197
}
21932198

21942199
let fd = detached_socket_handle(py, &socket_obj)?;
2195-
spawn_tcp_transport(py, spawn_context, tcp_stream_from_socket_fd(fd)?, None)
2200+
spawn_tcp_transport(
2201+
py,
2202+
spawn_context,
2203+
tcp_stream_from_owned_socket_fd(fd)?,
2204+
None,
2205+
)
21962206
}
21972207

21982208
pub fn transport_from_socket_tls(
@@ -2220,7 +2230,7 @@ pub fn transport_from_socket_tls(
22202230
return spawn_tls_client_transport(
22212231
py,
22222232
spawn_context,
2223-
StreamKind::Unix(unix_stream_from_socket_fd(fd)?),
2233+
StreamKind::Unix(unix_stream_from_owned_socket_fd(fd)?),
22242234
tls,
22252235
None,
22262236
true,
@@ -2231,7 +2241,7 @@ pub fn transport_from_socket_tls(
22312241
spawn_tls_client_transport(
22322242
py,
22332243
spawn_context,
2234-
StreamKind::Tcp(tcp_stream_from_socket_fd(fd)?),
2244+
StreamKind::Tcp(tcp_stream_from_owned_socket_fd(fd)?),
22352245
tls,
22362246
None,
22372247
true,
@@ -2263,7 +2273,7 @@ pub fn transport_from_socket_server_tls(
22632273
return spawn_tls_server_transport(
22642274
py,
22652275
spawn_context,
2266-
StreamKind::Unix(unix_stream_from_socket_fd(fd)?),
2276+
StreamKind::Unix(unix_stream_from_owned_socket_fd(fd)?),
22672277
tls,
22682278
None,
22692279
true,
@@ -2274,7 +2284,7 @@ pub fn transport_from_socket_server_tls(
22742284
spawn_tls_server_transport(
22752285
py,
22762286
spawn_context,
2277-
StreamKind::Tcp(tcp_stream_from_socket_fd(fd)?),
2287+
StreamKind::Tcp(tcp_stream_from_owned_socket_fd(fd)?),
22782288
tls,
22792289
None,
22802290
true,
@@ -2576,28 +2586,26 @@ pub fn spawn_write_pipe_transport(
25762586
Ok(transport)
25772587
}
25782588

2579-
pub fn tcp_stream_from_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpStream> {
2580-
duplicate_configured_tcp_stream(fd)
2589+
pub fn tcp_stream_from_owned_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpStream> {
2590+
configured_tcp_stream_from_owned_fd(fd)
25812591
}
25822592

25832593
#[cfg(unix)]
2584-
pub fn unix_stream_from_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdUnixStream> {
2585-
let dup = fd_ops::dup_raw_fd(fd).map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
2586-
let stream = unsafe { StdUnixStream::from_raw_fd(raw_fd_for_std(dup)?) };
2594+
pub fn unix_stream_from_owned_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdUnixStream> {
2595+
let stream = unsafe { StdUnixStream::from_raw_fd(raw_fd_for_std(fd)?) };
25872596
stream
25882597
.set_nonblocking(true)
25892598
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
25902599
Ok(stream)
25912600
}
25922601

2593-
pub fn tcp_listener_from_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpListener> {
2594-
duplicate_configured_tcp_listener(fd)
2602+
pub fn tcp_listener_from_owned_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpListener> {
2603+
configured_tcp_listener_from_owned_fd(fd)
25952604
}
25962605

25972606
#[cfg(unix)]
2598-
pub fn unix_listener_from_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdUnixListener> {
2599-
let dup = fd_ops::dup_raw_fd(fd).map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
2600-
let listener = unsafe { StdUnixListener::from_raw_fd(raw_fd_for_std(dup)?) };
2607+
pub fn unix_listener_from_owned_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdUnixListener> {
2608+
let listener = unsafe { StdUnixListener::from_raw_fd(raw_fd_for_std(fd)?) };
26012609
listener
26022610
.set_nonblocking(true)
26032611
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
@@ -2606,17 +2614,20 @@ pub fn unix_listener_from_socket_fd(fd: fd_ops::RawFd) -> PyResult<StdUnixListen
26062614

26072615
fn duplicate_configured_tcp_stream(fd: fd_ops::RawFd) -> PyResult<StdTcpStream> {
26082616
let dup = fd_ops::dup_raw_fd(fd).map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
2609-
let socket = socket_from_owned_raw(dup)?;
2617+
configured_tcp_stream_from_owned_fd(dup)
2618+
}
2619+
2620+
fn configured_tcp_stream_from_owned_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpStream> {
2621+
let socket = socket_from_owned_raw(fd)?;
26102622
socket
26112623
.set_nonblocking(true)
26122624
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
26132625
let _ = socket.set_tcp_nodelay(true);
26142626
Ok(socket.into())
26152627
}
26162628

2617-
fn duplicate_configured_tcp_listener(fd: fd_ops::RawFd) -> PyResult<StdTcpListener> {
2618-
let dup = fd_ops::dup_raw_fd(fd).map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
2619-
let socket = socket_from_owned_raw(dup)?;
2629+
fn configured_tcp_listener_from_owned_fd(fd: fd_ops::RawFd) -> PyResult<StdTcpListener> {
2630+
let socket = socket_from_owned_raw(fd)?;
26202631
socket
26212632
.set_nonblocking(true)
26222633
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

tests/test_run.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import gc
45
import os
56
import shlex
67
import socket
@@ -36,6 +37,51 @@ async def main() -> str:
3637

3738
self.assertEqual(rsloop.run(main()), "ok")
3839

40+
@unittest.skipUnless(
41+
os.name == "posix" and os.path.isdir("/proc/self/fd"),
42+
"requires /proc/self/fd",
43+
)
44+
def test_create_connection_does_not_leak_file_descriptors(self) -> None:
45+
def fd_count() -> int:
46+
return len(os.listdir("/proc/self/fd"))
47+
48+
async def main() -> None:
49+
loop = asyncio.get_running_loop()
50+
51+
class ServerProtocol(asyncio.Protocol):
52+
def connection_made(self, transport: asyncio.BaseTransport) -> None:
53+
transport.close()
54+
55+
server = await loop.create_server(ServerProtocol, "127.0.0.1", 0)
56+
try:
57+
port = server.sockets[0].getsockname()[1]
58+
59+
for _ in range(64):
60+
closed = loop.create_future()
61+
62+
class ClientProtocol(asyncio.Protocol):
63+
def connection_made(
64+
self, transport: asyncio.BaseTransport
65+
) -> None:
66+
transport.close()
67+
68+
def connection_lost(self, exc: Exception | None) -> None:
69+
if not closed.done():
70+
closed.set_result(None)
71+
72+
await loop.create_connection(ClientProtocol, "127.0.0.1", port)
73+
await asyncio.wait_for(closed, 1.0)
74+
await asyncio.sleep(0)
75+
finally:
76+
server.close()
77+
await server.wait_closed()
78+
79+
before = fd_count()
80+
rsloop.run(main())
81+
gc.collect()
82+
after = fd_count()
83+
self.assertLessEqual(after, before + 4)
84+
3985
def test_connect_pipe_round_trip(self) -> None:
4086
async def main() -> tuple[str, str]:
4187
loop = asyncio.get_running_loop()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)