Skip to content

Commit b68895c

Browse files
committed
feat: add connect_with_socket() for custom transports
Add the ability to establish MySQL and PostgreSQL connections over pre-connected sockets, enabling custom transports such as in-memory pipes, simulation frameworks (e.g. turmoil for deterministic simulation testing), SSH tunnels, or SOCKS proxies. Changes: - Add `net::connect_with()` in sqlx-core as the analog of `connect_tcp` and `connect_uds` for pre-established sockets - Add `MySqlConnectOptions::connect_with_socket()` and `MySqlConnection::connect_with_socket()` for MySQL - Add `PgConnectOptions::connect_with_socket()` and `PgConnection::connect_with_socket()` for PostgreSQL - Re-export `Socket` trait and `connect_with` from top-level `sqlx::net` - Refactor MySQL post-connect init into shared `after_connect()` to avoid duplication between `connect()` and `connect_with_socket()` The Socket trait was already public; these changes simply expose a way to pass a pre-connected Socket into the connection establishment flow that was previously only reachable through connect_tcp/connect_uds.
1 parent bab1b02 commit b68895c

File tree

8 files changed

+221
-61
lines changed

8 files changed

+221
-61
lines changed

sqlx-core/src/net/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ mod socket;
22
pub mod tls;
33

44
pub use socket::{
5-
connect_tcp, connect_uds, BufferedSocket, Socket, SocketIntoBox, WithSocket, WriteBuffer,
5+
connect_tcp, connect_uds, connect_with, BufferedSocket, Socket, SocketIntoBox, WithSocket,
6+
WriteBuffer,
67
};

sqlx-core/src/net/socket/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,16 @@ pub async fn connect_tcp<Ws: WithSocket>(
242242
}
243243
}
244244

245+
/// Use a pre-connected socket, passing it to the given [`WithSocket`] handler.
246+
///
247+
/// This is the analog of [`connect_tcp`] and [`connect_uds`] for sockets
248+
/// that have already been established by the caller. This enables custom
249+
/// transports such as in-memory pipes, simulation frameworks (e.g. turmoil),
250+
/// SSH tunnels, or SOCKS proxies.
251+
pub async fn connect_with<S: Socket, Ws: WithSocket>(socket: S, with_socket: Ws) -> Ws::Output {
252+
with_socket.with_socket(socket).await
253+
}
254+
245255
/// Connect a Unix Domain Socket at the given path.
246256
///
247257
/// Returns an error if Unix Domain Sockets are not supported on this platform.

sqlx-mysql/src/connection/establish.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,31 @@ impl MySqlConnection {
2121
None => crate::net::connect_tcp(&options.host, options.port, do_handshake).await?,
2222
};
2323

24-
let stream = handshake?;
24+
Self::from_stream(options, handshake?)
25+
}
26+
27+
/// Establish a connection over a pre-connected socket.
28+
///
29+
/// The provided socket must already be connected to a MySQL-compatible
30+
/// server. The MySQL handshake and authentication will be performed
31+
/// using the credentials from `options`.
32+
///
33+
/// This enables custom transports such as in-memory pipes, simulation
34+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
35+
///
36+
/// Note: this only performs the low-level handshake and authentication.
37+
/// Use [`MySqlConnectOptions::connect_with_socket()`] for a fully
38+
/// initialized connection (with `SET NAMES`, `sql_mode`, etc.).
39+
pub async fn connect_with_socket<S: Socket>(
40+
options: &MySqlConnectOptions,
41+
socket: S,
42+
) -> Result<Self, Error> {
43+
let do_handshake = DoHandshake::new(options)?;
44+
let stream = crate::net::connect_with(socket, do_handshake).await?;
45+
Self::from_stream(options, stream)
46+
}
2547

48+
fn from_stream(options: &MySqlConnectOptions, stream: MySqlStream) -> Result<Self, Error> {
2649
Ok(Self {
2750
inner: Box::new(MySqlConnectionInner {
2851
stream,

sqlx-mysql/src/options/connect.rs

Lines changed: 95 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,106 @@
11
use crate::connection::ConnectOptions;
22
use crate::error::Error;
33
use crate::executor::Executor;
4+
use crate::net::Socket;
45
use crate::{MySqlConnectOptions, MySqlConnection};
56
use futures_core::future::BoxFuture;
67
use log::LevelFilter;
78
use sqlx_core::Url;
89
use std::time::Duration;
910

11+
impl MySqlConnectOptions {
12+
/// Establish a fully initialized connection over a pre-connected socket.
13+
///
14+
/// This performs the MySQL handshake, authentication, and post-connect
15+
/// initialization (`SET NAMES`, `sql_mode`, `time_zone`) over the
16+
/// provided socket.
17+
///
18+
/// The socket must already be connected to a MySQL-compatible server.
19+
/// This enables custom transports such as in-memory pipes, simulation
20+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
21+
///
22+
/// # Example
23+
///
24+
/// ```rust,no_run
25+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26+
/// use sqlx::mysql::MySqlConnectOptions;
27+
///
28+
/// let options = MySqlConnectOptions::new()
29+
/// .username("root")
30+
/// .database("mydb");
31+
///
32+
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:3306").await?;
33+
/// let conn = options.connect_with_socket(stream).await?;
34+
/// # Ok(())
35+
/// # }
36+
/// ```
37+
pub async fn connect_with_socket<S: Socket>(
38+
&self,
39+
socket: S,
40+
) -> Result<MySqlConnection, Error> {
41+
let mut conn = MySqlConnection::connect_with_socket(self, socket).await?;
42+
self.after_connect(&mut conn).await?;
43+
Ok(conn)
44+
}
45+
46+
/// Post-connection initialization shared between `connect()` and
47+
/// `connect_with_socket()`.
48+
///
49+
/// After the connection is established, we initialize by configuring a few
50+
/// connection parameters:
51+
///
52+
/// - <https://mariadb.com/kb/en/sql-mode/>
53+
///
54+
/// - `PIPES_AS_CONCAT` - Allows using the pipe character (ASCII 124) as string concatenation
55+
/// operator. This means that "A" || "B" can be used in place of CONCAT("A", "B").
56+
///
57+
/// - `NO_ENGINE_SUBSTITUTION` - If not set, if the available storage engine specified by a
58+
/// CREATE TABLE is not available, a warning is given and the default storage engine is used
59+
/// instead.
60+
///
61+
/// - `NO_ZERO_DATE` - Don't allow '0000-00-00'. This is invalid in Rust.
62+
///
63+
/// - `NO_ZERO_IN_DATE` - Don't allow 'YYYY-00-00'. This is invalid in Rust.
64+
///
65+
/// Setting the time zone allows us to assume that the output from a TIMESTAMP field is UTC.
66+
///
67+
/// - <https://mathiasbynens.be/notes/mysql-utf8mb4>
68+
async fn after_connect(&self, conn: &mut MySqlConnection) -> Result<(), Error> {
69+
let mut sql_mode = Vec::new();
70+
if self.pipes_as_concat {
71+
sql_mode.push(r#"PIPES_AS_CONCAT"#);
72+
}
73+
if self.no_engine_substitution {
74+
sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#);
75+
}
76+
77+
let mut options = Vec::new();
78+
if !sql_mode.is_empty() {
79+
options.push(format!(
80+
r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#,
81+
sql_mode.join(",")
82+
));
83+
}
84+
if let Some(timezone) = &self.timezone {
85+
options.push(format!(r#"time_zone='{}'"#, timezone));
86+
}
87+
if self.set_names {
88+
options.push(format!(
89+
r#"NAMES {} COLLATE {}"#,
90+
conn.inner.stream.charset.as_str(),
91+
conn.inner.stream.collation.as_str()
92+
))
93+
}
94+
95+
if !options.is_empty() {
96+
conn.execute(&*format!(r#"SET {};"#, options.join(",")))
97+
.await?;
98+
}
99+
100+
Ok(())
101+
}
102+
}
103+
10104
impl ConnectOptions for MySqlConnectOptions {
11105
type Connection = MySqlConnection;
12106

@@ -24,63 +118,7 @@ impl ConnectOptions for MySqlConnectOptions {
24118
{
25119
Box::pin(async move {
26120
let mut conn = MySqlConnection::establish(self).await?;
27-
28-
// After the connection is established, we initialize by configuring a few
29-
// connection parameters
30-
31-
// https://mariadb.com/kb/en/sql-mode/
32-
33-
// PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator.
34-
// This means that "A" || "B" can be used in place of CONCAT("A", "B").
35-
36-
// NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is
37-
// not available, a warning is given and the default storage
38-
// engine is used instead.
39-
40-
// NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust.
41-
42-
// NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust.
43-
44-
// --
45-
46-
// Setting the time zone allows us to assume that the output
47-
// from a TIMESTAMP field is UTC
48-
49-
// --
50-
51-
// https://mathiasbynens.be/notes/mysql-utf8mb4
52-
53-
let mut sql_mode = Vec::new();
54-
if self.pipes_as_concat {
55-
sql_mode.push(r#"PIPES_AS_CONCAT"#);
56-
}
57-
if self.no_engine_substitution {
58-
sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#);
59-
}
60-
61-
let mut options = Vec::new();
62-
if !sql_mode.is_empty() {
63-
options.push(format!(
64-
r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#,
65-
sql_mode.join(",")
66-
));
67-
}
68-
if let Some(timezone) = &self.timezone {
69-
options.push(format!(r#"time_zone='{}'"#, timezone));
70-
}
71-
if self.set_names {
72-
options.push(format!(
73-
r#"NAMES {} COLLATE {}"#,
74-
conn.inner.stream.charset.as_str(),
75-
conn.inner.stream.collation.as_str()
76-
))
77-
}
78-
79-
if !options.is_empty() {
80-
conn.execute(&*format!(r#"SET {};"#, options.join(",")))
81-
.await?;
82-
}
83-
121+
self.after_connect(&mut conn).await?;
84122
Ok(conn)
85123
})
86124
}

sqlx-postgres/src/connection/establish.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::io::StatementId;
77
use crate::message::{
88
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
99
};
10+
use crate::net::Socket;
1011
use crate::{PgConnectOptions, PgConnection};
1112

1213
use super::PgConnectionInner;
@@ -16,9 +17,35 @@ use super::PgConnectionInner;
1617

1718
impl PgConnection {
1819
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
19-
// Upgrade to TLS if we were asked to and the server supports it
20-
let mut stream = PgStream::connect(options).await?;
20+
let stream = PgStream::connect(options).await?;
21+
Self::establish_with_stream(options, stream).await
22+
}
23+
24+
/// Establish a connection over a pre-connected socket.
25+
///
26+
/// The provided socket must already be connected to a
27+
/// PostgreSQL-compatible server. The startup handshake, TLS upgrade
28+
/// (if configured), and authentication will be performed over this
29+
/// socket.
30+
///
31+
/// This enables custom transports such as in-memory pipes, simulation
32+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
33+
///
34+
/// Note: this only performs the low-level handshake and authentication.
35+
/// Use [`PgConnectOptions::connect_with_socket()`] for a fully
36+
/// initialized connection.
37+
pub async fn connect_with_socket<S: Socket>(
38+
options: &PgConnectOptions,
39+
socket: S,
40+
) -> Result<Self, Error> {
41+
let stream = PgStream::connect_with_socket(options, socket).await?;
42+
Self::establish_with_stream(options, stream).await
43+
}
2144

45+
async fn establish_with_stream(
46+
options: &PgConnectOptions,
47+
mut stream: PgStream,
48+
) -> Result<Self, Error> {
2249
// To begin a session, a frontend opens a connection to the server
2350
// and sends a startup message.
2451

sqlx-postgres/src/connection/stream.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ impl PgStream {
5757
})
5858
}
5959

60+
/// Create a stream from a pre-connected socket.
61+
///
62+
/// The socket must already be connected to a PostgreSQL server.
63+
/// TLS upgrade will be attempted if configured in `options`.
64+
pub(super) async fn connect_with_socket<S: Socket>(
65+
options: &PgConnectOptions,
66+
socket: S,
67+
) -> Result<Self, Error> {
68+
let socket = net::connect_with(socket, MaybeUpgradeTls(options)).await?;
69+
70+
Ok(Self {
71+
inner: BufferedSocket::new(socket),
72+
notifications: None,
73+
parameter_statuses: BTreeMap::default(),
74+
server_version_num: None,
75+
})
76+
}
77+
6078
#[inline(always)]
6179
pub(crate) fn write_msg(&mut self, message: impl FrontendMessage) -> Result<(), Error> {
6280
self.write(EncodeMessage(message))

sqlx-postgres/src/options/connect.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,42 @@
11
use crate::connection::ConnectOptions;
22
use crate::error::Error;
3+
use crate::net::Socket;
34
use crate::{PgConnectOptions, PgConnection};
45
use futures_core::future::BoxFuture;
56
use log::LevelFilter;
67
use sqlx_core::Url;
78
use std::time::Duration;
89

10+
impl PgConnectOptions {
11+
/// Establish a connection over a pre-connected socket.
12+
///
13+
/// This performs the PostgreSQL startup handshake, TLS upgrade
14+
/// (if configured), and authentication over the provided socket.
15+
///
16+
/// The socket must already be connected to a PostgreSQL-compatible server.
17+
/// This enables custom transports such as in-memory pipes, simulation
18+
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
19+
///
20+
/// # Example
21+
///
22+
/// ```rust,no_run
23+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24+
/// use sqlx::postgres::PgConnectOptions;
25+
///
26+
/// let options = PgConnectOptions::new()
27+
/// .username("postgres")
28+
/// .database("mydb");
29+
///
30+
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:5432").await?;
31+
/// let conn = options.connect_with_socket(stream).await?;
32+
/// # Ok(())
33+
/// # }
34+
/// ```
35+
pub async fn connect_with_socket<S: Socket>(&self, socket: S) -> Result<PgConnection, Error> {
36+
PgConnection::connect_with_socket(self, socket).await
37+
}
38+
}
39+
940
impl ConnectOptions for PgConnectOptions {
1041
type Connection = PgConnection;
1142

src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ pub use sqlx_core::types::Type;
3636
pub use sqlx_core::value::{Value, ValueRef};
3737
pub use sqlx_core::Either;
3838

39+
/// Networking primitives used by SQLx database drivers.
40+
///
41+
/// The [`Socket`][net::Socket] trait allows implementing custom transports
42+
/// for database connections (e.g. in-memory pipes, simulation frameworks,
43+
/// SSH tunnels, SOCKS proxies). Use with
44+
/// [`MySqlConnectOptions::connect_with_socket()`][crate::mysql::MySqlConnectOptions::connect_with_socket]
45+
/// or [`PgConnectOptions::connect_with_socket()`][crate::postgres::PgConnectOptions::connect_with_socket].
46+
pub mod net {
47+
pub use sqlx_core::io::ReadBuf;
48+
pub use sqlx_core::net::{connect_with, Socket};
49+
}
50+
3951
#[doc(inline)]
4052
pub use sqlx_core::error::{self, Error, Result};
4153

0 commit comments

Comments
 (0)