|
1 | 1 | # async-smux |
2 | 2 |
|
3 | | -[crates.io](https://crates.io/crates/async_smux) |
| 3 | +[](https://github.com/black-binary/async-smux/actions/workflows/ci.yml) |
| 4 | +[](https://crates.io/crates/async_smux) |
4 | 5 |
|
5 | | -A lightweight asynchronous [smux](https://github.com/xtaci/smux) (Simple MUltipleXing) library for smol/async-std and any async runtime compatible to `futures`. |
| 6 | +A lightweight asynchronous [smux](https://github.com/xtaci/smux) (Simple |
| 7 | +MUltipleXing) library for Tokio. Wraps any `AsyncRead + AsyncWrite + |
| 8 | +Unpin` transport (`TcpStream`, `TlsStream`, …) and lets you spawn many |
| 9 | +bi-directional `MuxStream`s over it — each one looks and behaves like a |
| 10 | +plain TCP stream. |
6 | 11 |
|
7 | 12 |  |
8 | 13 |
|
9 | | -`async-smux` consumes a struct implementing `AsyncRead + AsyncWrite + Unpin + Send`, like `TcpStream` and `TlsStream`, to create a `Mux<T>` struct. And then you may spawn multiple `MuxStream<T>`s (up to 4294967295) over `Mux<T>`, which also implements `AsyncRead + AsyncWrite + Unpin + Send`. |
10 | | - |
11 | | -## Benchmark |
12 | | - |
13 | | -Here is a simple benchmarking result on my local machine, comparing to the original version smux (written in go). |
14 | | - |
15 | | -| Implementation | Throughput (TCP) | Handshake | |
16 | | -| ----------------- | ---------------- | ---------- | |
17 | | -| smux (go) | 0.4854 GiB/s | 17.070 K/s | |
18 | | -| async-smux (rust) | 1.0550 GiB/s | 81.774 K/s | |
19 | | - |
20 | | -Run `cargo bench` to test it by yourself. Check out `/benches` directory for more details. |
21 | | - |
22 | | -## Laziness |
23 | | - |
24 | | -No thread or task will be spawned by this library. It just spawns a few `future`s. So it's totally runtime-independent. |
| 14 | +## Quickstart |
| 15 | + |
| 16 | +```rust,ignore |
| 17 | +use async_smux::MuxBuilder; |
| 18 | +use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| 19 | +use tokio::net::TcpStream; |
| 20 | +
|
| 21 | +#[tokio::main] |
| 22 | +async fn main() { |
| 23 | + let tcp = TcpStream::connect("127.0.0.1:12345").await.unwrap(); |
| 24 | +
|
| 25 | + // build() returns three pieces: |
| 26 | + // connector — open new outgoing streams |
| 27 | + // acceptor — receive peer-initiated streams |
| 28 | + // worker — the future that drives I/O; spawn it |
| 29 | + let (connector, mut acceptor, worker) = |
| 30 | + MuxBuilder::client().with_connection(tcp).build(); |
| 31 | + tokio::spawn(worker); |
| 32 | +
|
| 33 | + // Outgoing |
| 34 | + let mut s = connector.connect().unwrap(); |
| 35 | + s.write_all(b"hello").await.unwrap(); |
| 36 | + let mut buf = [0u8; 5]; |
| 37 | + s.read_exact(&mut buf).await.unwrap(); |
| 38 | +
|
| 39 | + // Incoming |
| 40 | + while let Some(mut peer) = acceptor.accept().await { |
| 41 | + // ... |
| 42 | + } |
| 43 | +} |
| 44 | +``` |
25 | 45 |
|
26 | | -`Mux` and `MuxStream` are completely lazy and will DO NOTHING if you don't `poll()` them. |
| 46 | +The server side is identical except for `MuxBuilder::server()`. Client |
| 47 | +and server differ only in stream-id parity (odd vs. even) so two peers |
| 48 | +never collide on locally-allocated ids. |
| 49 | + |
| 50 | +A complete working example is in [`examples/echo.rs`](examples/echo.rs). |
| 51 | + |
| 52 | +## Lifecycle |
| 53 | + |
| 54 | +Three handles share ownership of the connection state: |
| 55 | + |
| 56 | +- **`MuxConnector`** — opens streams. `Clone`-able. |
| 57 | +- **`MuxAcceptor`** — receives peer-initiated streams. Implements |
| 58 | + `Stream<Item = MuxStream>`. |
| 59 | +- **`MuxWorker`** — the future that drains the underlying transport |
| 60 | + and dispatches frames. Spawn it with `tokio::spawn(worker)`. |
| 61 | + |
| 62 | +The worker exits when: |
| 63 | + |
| 64 | +- All public handles (connectors + acceptors + streams) are dropped, or |
| 65 | +- `MuxConnector::close().await` is called explicitly, or |
| 66 | +- The peer closes the transport, or |
| 67 | +- A keep-alive timeout fires (if configured). |
| 68 | + |
| 69 | +`close()` performs an orderly shutdown: any frames already accepted by |
| 70 | +`AsyncWrite::poll_write` are drained to the wire before the underlying |
| 71 | +sink is closed. It also works without the worker being polled — useful |
| 72 | +in test setups or sync teardown paths. |
| 73 | + |
| 74 | +## Configuration |
| 75 | + |
| 76 | +```rust,ignore |
| 77 | +use async_smux::MuxBuilder; |
| 78 | +use std::num::{NonZeroU64, NonZeroUsize}; |
| 79 | +
|
| 80 | +let (connector, acceptor, worker) = MuxBuilder::server() |
| 81 | + // Send a NOP frame every N seconds to keep the connection alive. |
| 82 | + .with_keep_alive_interval(NonZeroU64::new(15).unwrap()) |
| 83 | + // If we don't see any frame from the peer for this many seconds, |
| 84 | + // declare the connection dead and tear everything down. |
| 85 | + // Defaults to 3 × keep_alive_interval. |
| 86 | + .with_keep_alive_timeout(NonZeroU64::new(45).unwrap()) |
| 87 | + // Per-stream idle timeout (seconds): close streams with no |
| 88 | + // recent traffic. |
| 89 | + .with_idle_timeout(NonZeroU64::new(60).unwrap()) |
| 90 | + // Backpressure thresholds: cap how many frames may sit in the |
| 91 | + // tx/rx queues before poll_write / poll_read park. |
| 92 | + .with_max_tx_queue(NonZeroUsize::new(1024).unwrap()) |
| 93 | + .with_max_rx_queue(NonZeroUsize::new(1024).unwrap()) |
| 94 | + .with_connection(connection) |
| 95 | + .build(); |
| 96 | +``` |
27 | 97 |
|
28 | | -Any polling operation, including `.read()` ,`.write()`, `accept()` and `connect()`, will push `Mux` and `MuxStream` working. |
| 98 | +All of these knobs are optional. Keep-alive and idle timeout are off |
| 99 | +unless explicitly enabled. |
29 | 100 |
|
30 | | -## Specification |
| 101 | +## Frame format (smux v1) |
31 | 102 |
|
32 | 103 | ```text |
33 | | -VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH) |
| 104 | +VERSION(1B) | CMD(1B) | LENGTH(2B LE) | STREAMID(4B LE) | DATA(LENGTH) |
34 | 105 |
|
35 | 106 | VERSION: 1 |
36 | | -
|
37 | 107 | CMD: |
38 | | - SYN(0) |
39 | | - FIN(1) |
40 | | - PSH(2) |
41 | | - NOP(3) |
42 | | -
|
43 | | -STREAMID: Randomly chosen number |
| 108 | + SYN(0) open stream (LENGTH must be 0) |
| 109 | + FIN(1) close stream (LENGTH must be 0) |
| 110 | + PSH(2) payload |
| 111 | + NOP(3) keep-alive (LENGTH must be 0; STREAMID is 0) |
44 | 112 | ``` |
45 | 113 |
|
46 | | -## Example |
47 | | - |
48 | | -```rust |
49 | | -use async_smux::{Mux, MuxConfig}; |
50 | | -use async_std::net::{TcpListener, TcpStream}; |
51 | | -use async_std::prelude::*; |
52 | | - |
53 | | -async fn echo_server() { |
54 | | - let listener = TcpListener::bind("0.0.0.0:12345").await.unwrap(); |
55 | | - let (stream, _) = listener.accept().await.unwrap(); |
56 | | - let mux = Mux::new(stream, MuxConfig::default()); |
57 | | - loop { |
58 | | - let mut mux_stream = mux.accept().await.unwrap(); |
59 | | - let mut buf = [0u8; 1024]; |
60 | | - let size = mux_stream.read(&mut buf).await.unwrap(); |
61 | | - mux_stream.write(&buf[..size]).await.unwrap(); |
62 | | - } |
63 | | -} |
| 114 | +Stream id 0 is reserved for NOP; the library never hands it out and |
| 115 | +rejects any peer SYN that uses it. |
64 | 116 |
|
65 | | -fn main() { |
66 | | - async_std::task::spawn(echo_server()); |
67 | | - async_std::task::block_on(async { |
68 | | - smol::Timer::after(std::time::Duration::from_secs(1)).await; |
69 | | - let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap(); |
70 | | - let mux = Mux::new(stream, MuxConfig::default()); |
71 | | - for i in 0..100 { |
72 | | - let mut mux_stream = mux.connect().await.unwrap(); |
73 | | - let mut buf = [0u8; 1024]; |
74 | | - mux_stream.write(b"hello").await.unwrap(); |
75 | | - let size = mux_stream.read(&mut buf).await.unwrap(); |
76 | | - let reply = String::from_utf8(buf[..size].to_vec()).unwrap(); |
77 | | - println!("{}: {}", i, reply); |
78 | | - } |
79 | | - }); |
80 | | -} |
81 | | -``` |
| 117 | +## Benchmark |
| 118 | + |
| 119 | +| Implementation | Throughput (TCP) | Handshake | |
| 120 | +| ----------------- | ---------------- | ---------- | |
| 121 | +| smux (go) | 0.4854 GiB/s | 17.070 K/s | |
| 122 | +| async-smux (rust) | 1.0550 GiB/s | 81.774 K/s | |
| 123 | + |
| 124 | +`benches/bench.rs` uses the unstable `test` crate so it requires a |
| 125 | +nightly toolchain: `cargo +nightly bench`. |
| 126 | + |
| 127 | +## License |
| 128 | + |
| 129 | +MIT — see [LICENSE](LICENSE). |
0 commit comments