Skip to content

Commit cf6ee57

Browse files
committed
feat: add connect_with_socket() for custom transports
Add the ability to establish MySQL and PostgreSQL connections over pre-connected sockets, enabling custom transports such as in-memory pipes, simulation frameworks (e.g. turmoil for deterministic simulation testing), SSH tunnels, or SOCKS proxies. Changes: - Add `net::connect_with()` in sqlx-core as the analog of `connect_tcp` and `connect_uds` for pre-established sockets - Add `MySqlConnectOptions::connect_with_socket()` and `MySqlConnection::connect_with_socket()` for MySQL - Add `PgConnectOptions::connect_with_socket()` and `PgConnection::connect_with_socket()` for PostgreSQL - Re-export `Socket` trait and `connect_with` from top-level `sqlx::net` - Refactor MySQL post-connect init into shared `after_connect()` to avoid duplication between `connect()` and `connect_with_socket()` The Socket trait was already public; these changes simply expose a way to pass a pre-connected Socket into the connection establishment flow that was previously only reachable through connect_tcp/connect_uds.
1 parent 6984316 commit cf6ee57

File tree

9 files changed

+200
-48
lines changed

9 files changed

+200
-48
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlx-core/src/net/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ mod socket;
22
pub mod tls;
33

44
pub use socket::{
5-
connect_tcp, connect_uds, BufferedSocket, Socket, SocketIntoBox, WithSocket, WriteBuffer,
5+
connect_tcp, connect_uds, connect_with, BufferedSocket, Socket, SocketIntoBox, WithSocket,
6+
WriteBuffer,
67
};

sqlx-core/src/net/socket/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,16 @@ async fn connect_tcp_async_io(host: &str, port: u16) -> crate::Result<impl Socke
249249
.into())
250250
}
251251

252+
/// Use a pre-connected socket, passing it to the given [`WithSocket`] handler.
253+
///
254+
/// This is the analog of [`connect_tcp`] and [`connect_uds`] for sockets
255+
/// that have already been established by the caller. This enables custom
256+
/// transports such as in-memory pipes, simulation frameworks (e.g. turmoil),
257+
/// SSH tunnels, or SOCKS proxies.
258+
pub async fn connect_with<S: Socket, Ws: WithSocket>(socket: S, with_socket: Ws) -> Ws::Output {
259+
with_socket.with_socket(socket).await
260+
}
261+
252262
/// Connect a Unix Domain Socket at the given path.
253263
///
254264
/// Returns an error if Unix Domain Sockets are not supported on this platform.

sqlx-mysql/src/connection/establish.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,31 @@ impl MySqlConnection {
2020
None => crate::net::connect_tcp(&options.host, options.port, do_handshake).await?,
2121
};
2222

23-
let stream = handshake?;
23+
Self::from_stream(options, handshake?)
24+
}
25+
26+
/// Establish a connection over a pre-connected socket.
27+
///
28+
/// The provided socket must already be connected to a MySQL-compatible
29+
/// server. The MySQL handshake and authentication will be performed
30+
/// using the credentials from `options`.
31+
///
32+
/// This enables custom transports such as in-memory pipes, simulation
33+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
34+
///
35+
/// Note: this only performs the low-level handshake and authentication.
36+
/// Use [`MySqlConnectOptions::connect_with_socket()`] for a fully
37+
/// initialized connection (with `SET NAMES`, `sql_mode`, etc.).
38+
pub async fn connect_with_socket<S: Socket>(
39+
options: &MySqlConnectOptions,
40+
socket: S,
41+
) -> Result<Self, Error> {
42+
let do_handshake = DoHandshake::new(options)?;
43+
let stream = crate::net::connect_with(socket, do_handshake).await?;
44+
Self::from_stream(options, stream)
45+
}
2446

47+
fn from_stream(options: &MySqlConnectOptions, stream: MySqlStream) -> Result<Self, Error> {
2548
Ok(Self {
2649
inner: Box::new(MySqlConnectionInner {
2750
stream,

sqlx-mysql/src/options/connect.rs

Lines changed: 75 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,68 @@
11
use crate::connection::ConnectOptions;
22
use crate::error::Error;
33
use crate::executor::Executor;
4+
use crate::net::Socket;
45
use crate::{MySqlConnectOptions, MySqlConnection};
56
use log::LevelFilter;
67
use sqlx_core::sql_str::AssertSqlSafe;
78
use sqlx_core::Url;
89
use std::time::Duration;
910

10-
impl ConnectOptions for MySqlConnectOptions {
11-
type Connection = MySqlConnection;
12-
13-
fn from_url(url: &Url) -> Result<Self, Error> {
14-
Self::parse_from_url(url)
15-
}
16-
17-
fn to_url_lossy(&self) -> Url {
18-
self.build_url()
11+
impl MySqlConnectOptions {
12+
/// Establish a fully initialized connection over a pre-connected socket.
13+
///
14+
/// This performs the MySQL handshake, authentication, and post-connect
15+
/// initialization (`SET NAMES`, `sql_mode`, `time_zone`) over the
16+
/// provided socket.
17+
///
18+
/// The socket must already be connected to a MySQL-compatible server.
19+
/// This enables custom transports such as in-memory pipes, simulation
20+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
21+
///
22+
/// # Example
23+
///
24+
/// ```rust,ignore
25+
/// use sqlx::mysql::MySqlConnectOptions;
26+
///
27+
/// let options = MySqlConnectOptions::new()
28+
/// .username("root")
29+
/// .database("mydb");
30+
///
31+
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:3306").await?;
32+
/// let conn = options.connect_with_socket(stream).await?;
33+
/// ```
34+
pub async fn connect_with_socket<S: Socket>(
35+
&self,
36+
socket: S,
37+
) -> Result<MySqlConnection, Error> {
38+
let mut conn = MySqlConnection::connect_with_socket(self, socket).await?;
39+
self.after_connect(&mut conn).await?;
40+
Ok(conn)
1941
}
2042

21-
async fn connect(&self) -> Result<Self::Connection, Error>
22-
where
23-
Self::Connection: Sized,
24-
{
25-
let mut conn = MySqlConnection::establish(self).await?;
26-
27-
// After the connection is established, we initialize by configuring a few
28-
// connection parameters
29-
30-
// https://mariadb.com/kb/en/sql-mode/
31-
32-
// PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator.
33-
// This means that "A" || "B" can be used in place of CONCAT("A", "B").
34-
35-
// NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is
36-
// not available, a warning is given and the default storage
37-
// engine is used instead.
38-
39-
// NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust.
40-
41-
// NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust.
42-
43-
// --
44-
45-
// Setting the time zone allows us to assume that the output
46-
// from a TIMESTAMP field is UTC
47-
48-
// --
49-
50-
// https://mathiasbynens.be/notes/mysql-utf8mb4
51-
43+
/// Post-connection initialization shared between `connect()` and
44+
/// `connect_with_socket()`.
45+
///
46+
/// After the connection is established, we initialize by configuring a few
47+
/// connection parameters:
48+
///
49+
/// - <https://mariadb.com/kb/en/sql-mode/>
50+
///
51+
/// - `PIPES_AS_CONCAT` - Allows using the pipe character (ASCII 124) as string concatenation
52+
/// operator. This means that "A" || "B" can be used in place of CONCAT("A", "B").
53+
///
54+
/// - `NO_ENGINE_SUBSTITUTION` - If not set, if the available storage engine specified by a
55+
/// CREATE TABLE is not available, a warning is given and the default storage engine is used
56+
/// instead.
57+
///
58+
/// - `NO_ZERO_DATE` - Don't allow '0000-00-00'. This is invalid in Rust.
59+
///
60+
/// - `NO_ZERO_IN_DATE` - Don't allow 'YYYY-00-00'. This is invalid in Rust.
61+
///
62+
/// Setting the time zone allows us to assume that the output from a TIMESTAMP field is UTC.
63+
///
64+
/// - <https://mathiasbynens.be/notes/mysql-utf8mb4>
65+
async fn after_connect(&self, conn: &mut MySqlConnection) -> Result<(), Error> {
5266
let mut sql_mode = Vec::new();
5367
if self.pipes_as_concat {
5468
sql_mode.push(r#"PIPES_AS_CONCAT"#);
@@ -64,11 +78,9 @@ impl ConnectOptions for MySqlConnectOptions {
6478
sql_mode.join(",")
6579
));
6680
}
67-
6881
if let Some(timezone) = &self.timezone {
6982
options.push(format!(r#"time_zone='{}'"#, timezone));
7083
}
71-
7284
if self.set_names {
7385
// As it turns out, we don't _have_ to set a collation if we don't want to.
7486
// We can let the server choose the default collation for the charset.
@@ -88,6 +100,27 @@ impl ConnectOptions for MySqlConnectOptions {
88100
.await?;
89101
}
90102

103+
Ok(())
104+
}
105+
}
106+
107+
impl ConnectOptions for MySqlConnectOptions {
108+
type Connection = MySqlConnection;
109+
110+
fn from_url(url: &Url) -> Result<Self, Error> {
111+
Self::parse_from_url(url)
112+
}
113+
114+
fn to_url_lossy(&self) -> Url {
115+
self.build_url()
116+
}
117+
118+
async fn connect(&self) -> Result<Self::Connection, Error>
119+
where
120+
Self::Connection: Sized,
121+
{
122+
let mut conn = MySqlConnection::establish(self).await?;
123+
self.after_connect(&mut conn).await?;
91124
Ok(conn)
92125
}
93126

sqlx-postgres/src/connection/establish.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::io::StatementId;
77
use crate::message::{
88
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
99
};
10+
use crate::net::Socket;
1011
use crate::{PgConnectOptions, PgConnection};
1112

1213
use super::PgConnectionInner;
@@ -16,9 +17,35 @@ use super::PgConnectionInner;
1617

1718
impl PgConnection {
1819
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
19-
// Upgrade to TLS if we were asked to and the server supports it
20-
let mut stream = PgStream::connect(options).await?;
20+
let stream = PgStream::connect(options).await?;
21+
Self::establish_with_stream(options, stream).await
22+
}
23+
24+
/// Establish a connection over a pre-connected socket.
25+
///
26+
/// The provided socket must already be connected to a
27+
/// PostgreSQL-compatible server. The startup handshake, TLS upgrade
28+
/// (if configured), and authentication will be performed over this
29+
/// socket.
30+
///
31+
/// This enables custom transports such as in-memory pipes, simulation
32+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
33+
///
34+
/// Note: this only performs the low-level handshake and authentication.
35+
/// Use [`PgConnectOptions::connect_with_socket()`] for a fully
36+
/// initialized connection.
37+
pub async fn connect_with_socket<S: Socket>(
38+
options: &PgConnectOptions,
39+
socket: S,
40+
) -> Result<Self, Error> {
41+
let stream = PgStream::connect_with_socket(options, socket).await?;
42+
Self::establish_with_stream(options, stream).await
43+
}
2144

45+
async fn establish_with_stream(
46+
options: &PgConnectOptions,
47+
mut stream: PgStream,
48+
) -> Result<Self, Error> {
2249
// To begin a session, a frontend opens a connection to the server
2350
// and sends a startup message.
2451

sqlx-postgres/src/connection/stream.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ impl PgStream {
5757
})
5858
}
5959

60+
/// Create a stream from a pre-connected socket.
61+
///
62+
/// The socket must already be connected to a PostgreSQL server.
63+
/// TLS upgrade will be attempted if configured in `options`.
64+
pub(super) async fn connect_with_socket<S: Socket>(
65+
options: &PgConnectOptions,
66+
socket: S,
67+
) -> Result<Self, Error> {
68+
let socket = net::connect_with(socket, MaybeUpgradeTls(options)).await?;
69+
70+
Ok(Self {
71+
inner: BufferedSocket::new(socket),
72+
notifications: None,
73+
parameter_statuses: BTreeMap::default(),
74+
server_version_num: None,
75+
})
76+
}
77+
6078
#[inline(always)]
6179
pub(crate) fn write_msg(&mut self, message: impl FrontendMessage) -> Result<(), Error> {
6280
self.write(EncodeMessage(message))

sqlx-postgres/src/options/connect.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,39 @@
11
use crate::connection::ConnectOptions;
22
use crate::error::Error;
3+
use crate::net::Socket;
34
use crate::{PgConnectOptions, PgConnection};
45
use log::LevelFilter;
56
use sqlx_core::Url;
67
use std::future::Future;
78
use std::time::Duration;
89

10+
impl PgConnectOptions {
11+
/// Establish a connection over a pre-connected socket.
12+
///
13+
/// This performs the PostgreSQL startup handshake, TLS upgrade
14+
/// (if configured), and authentication over the provided socket.
15+
///
16+
/// The socket must already be connected to a PostgreSQL-compatible server.
17+
/// This enables custom transports such as in-memory pipes, simulation
18+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
19+
///
20+
/// # Example
21+
///
22+
/// ```rust,ignore
23+
/// use sqlx::postgres::PgConnectOptions;
24+
///
25+
/// let options = PgConnectOptions::new()
26+
/// .username("postgres")
27+
/// .database("mydb");
28+
///
29+
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:5432").await?;
30+
/// let conn = options.connect_with_socket(stream).await?;
31+
/// ```
32+
pub async fn connect_with_socket<S: Socket>(&self, socket: S) -> Result<PgConnection, Error> {
33+
PgConnection::connect_with_socket(self, socket).await
34+
}
35+
}
36+
937
impl ConnectOptions for PgConnectOptions {
1038
type Connection = PgConnection;
1139

src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ pub use sqlx_core::types::Type;
3838
pub use sqlx_core::value::{Value, ValueRef};
3939
pub use sqlx_core::Either;
4040

41+
/// Networking primitives used by SQLx database drivers.
42+
///
43+
/// The [`Socket`][net::Socket] trait allows implementing custom transports
44+
/// for database connections (e.g. in-memory pipes, simulation frameworks,
45+
/// SSH tunnels, SOCKS proxies). Use with
46+
/// [`MySqlConnectOptions::connect_with_socket()`][crate::mysql::MySqlConnectOptions::connect_with_socket]
47+
/// or [`PgConnectOptions::connect_with_socket()`][crate::postgres::PgConnectOptions::connect_with_socket].
48+
pub mod net {
49+
pub use sqlx_core::io::ReadBuf;
50+
pub use sqlx_core::net::{connect_with, Socket};
51+
}
52+
4153
#[doc(inline)]
4254
pub use sqlx_core::error::{self, Error, Result};
4355

0 commit comments

Comments
 (0)