Skip to content

Commit f7dcf2d

Browse files
committed
feat: add custom socket transport support for Postgres and MySQL
Add `connect_socket()` methods to `PgConnection` and `MySqlConnection` that accept any pre-connected socket implementing the `Socket` trait. This enables using custom transport layers (e.g., vsock for AWS Nitro Enclaves, QUIC, or other non-TCP/UDS transports) without forking sqlx. Re-export `Socket` and `ReadBuf` traits from `sqlx::net` so users can implement custom socket types.
1 parent 05c8dc1 commit f7dcf2d

File tree

9 files changed

+152
-18
lines changed

9 files changed

+152
-18
lines changed

sqlx-core/src/net/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ mod socket;
22
pub mod tls;
33

44
pub use socket::{
5-
connect_tcp, connect_uds, BufferedSocket, Socket, SocketIntoBox, WithSocket, WriteBuffer,
5+
connect_socket, connect_tcp, connect_uds, BufferedSocket, Socket, SocketIntoBox, WithSocket,
6+
WriteBuffer,
67
};

sqlx-core/src/net/socket/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ pub async fn connect_tcp<Ws: WithSocket>(
202202
}
203203
}
204204

205+
/// Connect using a pre-connected socket that implements [`Socket`].
206+
///
207+
/// This allows using custom transport layers (e.g., vsock, QUIC, or any
208+
/// `AsyncRead + AsyncWrite` type) with SQLx database connections.
209+
///
210+
/// The socket will be passed through the `with_socket` handler, which
211+
/// typically performs TLS upgrade negotiation.
212+
pub async fn connect_socket<S: Socket, Ws: WithSocket>(
213+
socket: S,
214+
with_socket: Ws,
215+
) -> crate::Result<Ws::Output> {
216+
Ok(with_socket.with_socket(socket).await)
217+
}
218+
205219
/// Open a TCP socket to `host` and `port`.
206220
///
207221
/// If `host` is a hostname, attempt to connect to each address it resolves to.

sqlx-mysql/src/connection/establish.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,29 @@ impl MySqlConnection {
2222

2323
let stream = handshake?;
2424

25-
Ok(Self {
25+
Ok(Self::establish_with_stream(stream, options))
26+
}
27+
28+
pub(crate) async fn establish_with_socket<S: Socket>(
29+
socket: S,
30+
options: &MySqlConnectOptions,
31+
) -> Result<Self, Error> {
32+
let do_handshake = DoHandshake::new(options)?;
33+
let stream = do_handshake.with_socket(socket).await?;
34+
35+
Ok(Self::establish_with_stream(stream, options))
36+
}
37+
38+
fn establish_with_stream(stream: MySqlStream, options: &MySqlConnectOptions) -> Self {
39+
Self {
2640
inner: Box::new(MySqlConnectionInner {
2741
stream,
2842
transaction_depth: 0,
2943
status_flags: Default::default(),
3044
cache_statement: StatementCache::new(options.statement_cache_capacity),
3145
log_settings: options.log_settings.clone(),
3246
}),
33-
})
47+
}
3448
}
3549
}
3650

sqlx-mysql/src/connection/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,38 @@ pub(crate) struct MySqlConnectionInner {
5252
}
5353

5454
impl MySqlConnection {
55+
/// Connect to a MySQL database using a pre-connected socket.
56+
///
57+
/// This allows using custom transport layers such as vsock, QUIC,
58+
/// or any type that implements [`sqlx_core::net::Socket`].
59+
///
60+
/// The provided socket will go through TLS upgrade negotiation based on the
61+
/// SSL mode configured in `options`.
62+
///
63+
/// # Example
64+
///
65+
/// ```rust,ignore
66+
/// use sqlx::mysql::{MySqlConnectOptions, MySqlConnection};
67+
///
68+
/// # async fn example() -> sqlx::Result<()> {
69+
/// let socket: tokio::net::TcpStream = todo!();
70+
/// let options = MySqlConnectOptions::new()
71+
/// .username("root")
72+
/// .database("mydb");
73+
///
74+
/// let _conn = MySqlConnection::connect_socket(socket, &options).await?;
75+
/// # Ok(())
76+
/// # }
77+
/// ```
78+
pub async fn connect_socket<S: sqlx_core::net::Socket>(
79+
socket: S,
80+
options: &MySqlConnectOptions,
81+
) -> Result<Self, Error> {
82+
let mut conn = Self::establish_with_socket(socket, options).await?;
83+
options.configure_session(&mut conn).await?;
84+
Ok(conn)
85+
}
86+
5587
pub(crate) fn in_transaction(&self) -> bool {
5688
self.inner
5789
.status_flags

sqlx-mysql/src/options/connect.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,26 @@ impl ConnectOptions for MySqlConnectOptions {
2424
{
2525
let mut conn = MySqlConnection::establish(self).await?;
2626

27-
// After the connection is established, we initialize by configuring a few
28-
// connection parameters
27+
self.configure_session(&mut conn).await?;
2928

29+
Ok(conn)
30+
}
31+
32+
fn log_statements(mut self, level: LevelFilter) -> Self {
33+
self.log_settings.log_statements(level);
34+
self
35+
}
36+
37+
fn log_slow_statements(mut self, level: LevelFilter, duration: Duration) -> Self {
38+
self.log_settings.log_slow_statements(level, duration);
39+
self
40+
}
41+
}
42+
43+
impl MySqlConnectOptions {
44+
/// After the connection is established, initialize by configuring
45+
/// connection parameters (sql_mode, time_zone, charset).
46+
pub(crate) async fn configure_session(&self, conn: &mut MySqlConnection) -> Result<(), Error> {
3047
// https://mariadb.com/kb/en/sql-mode/
3148

3249
// PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator.
@@ -88,16 +105,6 @@ impl ConnectOptions for MySqlConnectOptions {
88105
.await?;
89106
}
90107

91-
Ok(conn)
92-
}
93-
94-
fn log_statements(mut self, level: LevelFilter) -> Self {
95-
self.log_settings.log_statements(level);
96-
self
97-
}
98-
99-
fn log_slow_statements(mut self, level: LevelFilter, duration: Duration) -> Self {
100-
self.log_settings.log_slow_statements(level, duration);
101-
self
108+
Ok(())
102109
}
103110
}

sqlx-postgres/src/connection/establish.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::io::StatementId;
77
use crate::message::{
88
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
99
};
10+
use crate::net::Socket;
1011
use crate::{PgConnectOptions, PgConnection};
1112

1213
use super::PgConnectionInner;
@@ -16,9 +17,22 @@ use super::PgConnectionInner;
1617

1718
impl PgConnection {
1819
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
19-
// Upgrade to TLS if we were asked to and the server supports it
20-
let mut stream = PgStream::connect(options).await?;
20+
let stream = PgStream::connect(options).await?;
21+
Self::establish_with_stream(stream, options).await
22+
}
23+
24+
pub(crate) async fn establish_with_socket<S: Socket>(
25+
socket: S,
26+
options: &PgConnectOptions,
27+
) -> Result<Self, Error> {
28+
let stream = PgStream::connect_socket(socket, options).await?;
29+
Self::establish_with_stream(stream, options).await
30+
}
2131

32+
async fn establish_with_stream(
33+
mut stream: PgStream,
34+
options: &PgConnectOptions,
35+
) -> Result<Self, Error> {
2236
// To begin a session, a frontend opens a connection to the server
2337
// and sends a startup message.
2438

sqlx-postgres/src/connection/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,36 @@ pub(crate) struct TableColumns {
8585
}
8686

8787
impl PgConnection {
88+
/// Connect to a PostgreSQL database using a pre-connected socket.
89+
///
90+
/// This allows using custom transport layers such as vsock, QUIC,
91+
/// or any type that implements [`sqlx_core::net::Socket`].
92+
///
93+
/// The provided socket will go through TLS upgrade negotiation based on the
94+
/// SSL mode configured in `options`.
95+
///
96+
/// # Example
97+
///
98+
/// ```rust,ignore
99+
/// use sqlx::postgres::{PgConnectOptions, PgConnection};
100+
///
101+
/// # async fn example() -> sqlx::Result<()> {
102+
/// let socket: tokio::net::TcpStream = todo!();
103+
/// let options = PgConnectOptions::new()
104+
/// .username("postgres")
105+
/// .database("mydb");
106+
///
107+
/// let _conn = PgConnection::connect_socket(socket, &options).await?;
108+
/// # Ok(())
109+
/// # }
110+
/// ```
111+
pub async fn connect_socket<S: sqlx_core::net::Socket>(
112+
socket: S,
113+
options: &PgConnectOptions,
114+
) -> Result<Self, Error> {
115+
Self::establish_with_socket(socket, options).await
116+
}
117+
88118
/// the version number of the server in `libpq` format
89119
pub fn server_version_num(&self) -> Option<u32> {
90120
self.inner.stream.server_version_num

sqlx-postgres/src/connection/stream.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ impl PgStream {
5757
})
5858
}
5959

60+
pub(super) async fn connect_socket<S: Socket>(
61+
socket: S,
62+
options: &PgConnectOptions,
63+
) -> Result<Self, Error> {
64+
let socket = net::connect_socket(socket, MaybeUpgradeTls(options)).await?;
65+
66+
let socket = socket?;
67+
68+
Ok(Self {
69+
inner: BufferedSocket::new(socket),
70+
notifications: None,
71+
parameter_statuses: BTreeMap::default(),
72+
server_version_num: None,
73+
})
74+
}
75+
6076
#[inline(always)]
6177
pub(crate) fn write_msg(&mut self, message: impl FrontendMessage) -> Result<(), Error> {
6278
self.write(EncodeMessage(message))

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ pub mod decode {
153153

154154
pub use self::decode::Decode;
155155

156+
/// Networking traits for custom transport implementations.
157+
pub mod net {
158+
pub use sqlx_core::io::ReadBuf;
159+
pub use sqlx_core::net::Socket;
160+
}
161+
156162
/// Types and traits for the `query` family of functions and macros.
157163
pub mod query {
158164
pub use sqlx_core::query::{Map, Query};

0 commit comments

Comments
 (0)