Skip to content

Commit 28668fb

Browse files
committed
use multi-threaded XZ decompression via liblzma
Replace xz2 crate with liblzma 0.4 (parallel feature) to enable lzma_stream_decoder_mt() This helps remove the decompression bottleneck from XZ decompression speeding flashing up by ~4x Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com> Assisted-by: claude-opus-4.6
1 parent 6565ee0 commit 28668fb

10 files changed

Lines changed: 219 additions & 250 deletions

File tree

Cargo.lock

Lines changed: 22 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ nix = { version = "0.29", features = ["ioctl"] }
2020
# Enable vendored OpenSSL for cross-compilation to musl targets
2121
# This ensures OpenSSL builds from source with musl compatibility
2222
openssl = { version = "0.10", features = ["vendored"] }
23-
xz2 = "0.1"
23+
liblzma = { version = "0.4", features = ["parallel"] }
2424

2525
[dev-dependencies]
2626
http-body = "1.0.1"

src/fls/decompress.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,30 @@
1+
use crate::fls::byte_channel::ByteBoundedReceiver;
2+
use crate::fls::compression::Compression;
3+
use crate::fls::stream_utils::ChannelReader;
4+
use bytes::Bytes;
5+
use std::io::Read;
16
use tokio::io::AsyncReadExt;
27
use tokio::process::{Child, Command};
38
use tokio::sync::mpsc;
49

10+
fn mb_to_bytes(mb: u64) -> u64 {
11+
mb.saturating_mul(1024 * 1024)
12+
}
13+
14+
pub(crate) fn create_xz_decoder<R: Read>(
15+
reader: R,
16+
memlimit_mb: u64,
17+
) -> Result<liblzma::read::XzDecoder<R>, String> {
18+
let memlimit = mb_to_bytes(memlimit_mb);
19+
let stream = liblzma::stream::Stream::new_stream_decoder(memlimit, 0).map_err(|e| {
20+
format!(
21+
"Failed to create XZ decoder with {}MB limit: {}",
22+
memlimit_mb, e
23+
)
24+
})?;
25+
Ok(liblzma::read::XzDecoder::new_stream(reader, stream))
26+
}
27+
528
/// Determines the appropriate decompression command based on URL extension
629
fn get_decompressor_command(url: &str) -> &'static str {
730
let extension = url.rsplit('.').next().unwrap_or("").to_lowercase();
@@ -75,6 +98,83 @@ fn spawn_decompressor(
7598
Ok((process, cmd))
7699
}
77100

101+
pub(crate) fn get_compression_from_url(url: &str) -> Compression {
102+
let path = url.split('?').next().unwrap_or(url);
103+
let path = path.split('#').next().unwrap_or(path);
104+
let extension = path.rsplit('.').next().unwrap_or("").to_lowercase();
105+
match extension.as_str() {
106+
"gz" => Compression::Gzip,
107+
"xz" => Compression::Xz,
108+
"zst" | "zstd" => Compression::Zstd,
109+
_ => Compression::None,
110+
}
111+
}
112+
113+
type DecompressorResult = (
114+
mpsc::Receiver<Vec<u8>>,
115+
std::thread::JoinHandle<Result<(), String>>,
116+
);
117+
118+
pub(crate) fn start_inprocess_decompressor(
119+
buffer_rx: ByteBoundedReceiver<Bytes>,
120+
compression: Compression,
121+
consumed_progress_tx: mpsc::UnboundedSender<u64>,
122+
xz_memlimit_mb: u64,
123+
) -> Result<DecompressorResult, Box<dyn std::error::Error>> {
124+
let (decompressed_tx, decompressed_rx) = mpsc::channel::<Vec<u8>>(8);
125+
126+
let handle = std::thread::Builder::new()
127+
.name("decompressor".to_string())
128+
.spawn(move || {
129+
let channel_reader =
130+
ChannelReader::new_byte_bounded(buffer_rx).with_progress(consumed_progress_tx);
131+
132+
let mut decoder: Box<dyn Read + Send> = match compression {
133+
Compression::Xz => {
134+
let num_threads = std::thread::available_parallelism()
135+
.map(|n| n.get() as u32)
136+
.unwrap_or(2);
137+
let memlimit = mb_to_bytes(xz_memlimit_mb);
138+
eprintln!(
139+
"XZ decompression: {} threads, memory limit {}MB",
140+
num_threads, xz_memlimit_mb
141+
);
142+
let stream = liblzma::stream::MtStreamBuilder::new()
143+
.threads(num_threads)
144+
.memlimit_threading(memlimit)
145+
.memlimit_stop(memlimit)
146+
.decoder()
147+
.map_err(|e| format!("Failed to create MT XZ decoder: {}", e))?;
148+
Box::new(liblzma::read::XzDecoder::new_stream(channel_reader, stream))
149+
}
150+
Compression::Gzip => Box::new(flate2::read::GzDecoder::new(channel_reader)),
151+
Compression::None => Box::new(channel_reader),
152+
Compression::Zstd => {
153+
return Err("Zstd in-process decompression is not supported".to_string());
154+
}
155+
};
156+
157+
let mut buf = vec![0u8; 8 * 1024 * 1024];
158+
loop {
159+
let n = decoder
160+
.read(&mut buf)
161+
.map_err(|e| format!("Decompression error: {}", e))?;
162+
if n == 0 {
163+
break;
164+
}
165+
if decompressed_tx.blocking_send(buf[..n].to_vec()).is_err() {
166+
return Err("Writer task closed, stopping decompression".to_string());
167+
}
168+
}
169+
Ok(())
170+
})
171+
.map_err(|e| -> Box<dyn std::error::Error> {
172+
format!("Failed to spawn decompressor thread: {}", e).into()
173+
})?;
174+
175+
Ok((decompressed_rx, handle))
176+
}
177+
78178
pub(crate) async fn spawn_stderr_reader(
79179
mut stderr: tokio::process::ChildStderr,
80180
error_tx: mpsc::UnboundedSender<String>,

0 commit comments

Comments
 (0)