Skip to content

Commit 5d41252

Browse files
fix(file source): handle concatenated gzip streams (#25614)
Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent 2d0d0fb commit 5d41252

13 files changed

Lines changed: 72 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ antithesis-instrumentation = { version = "0.1", default-features = false, featur
141141
antithesis_sdk = { version = "0.2", default-features = false, features = [] }
142142
anyhow = { version = "1.0.102", default-features = false, features = ["std"] }
143143
arc-swap = { version = "1.8.2", default-features = false }
144+
async-compression = { version = "0.4.42", default-features = false, features = ["tokio", "gzip"] }
144145
async-stream = { version = "0.3.6", default-features = false }
145146
async-trait = { version = "0.1.89", default-features = false }
146147
axum = { version = "0.6.20", default-features = false }
@@ -362,7 +363,7 @@ greptimedb-ingester = { version = "0.17.0", default-features = false, optional =
362363

363364
# External libs
364365
arc-swap = { workspace = true, default-features = false, optional = true }
365-
async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
366+
async-compression = { workspace = true, features = ["zstd"], optional = true }
366367
arrow = { version = "58.2.0", default-features = false, features = ["ipc"], optional = true }
367368
arrow-schema = { version = "58.2.0", default-features = false, optional = true }
368369
parquet = { version = "58.2.0", default-features = false, features = [
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the `file` source silently dropping all but the first member of concatenated (multi-stream) gzip files. This regression was introduced in v0.50.0.
2+
3+
authors: thomasqueirozb

clippy.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ allow-unwrap-in-tests = true
66
disallowed-methods = [
77
{ path = "std::io::Write::write", reason = "This doesn't handle short writes, use `write_all` instead." },
88
{ path = "vrl::stdlib::all", reason = "Use `vector_vrl_functions::all()` instead for consistency across all Vector VRL functions." },
9+
{ path = "async_compression::tokio::bufread::GzipDecoder::new", reason = "Use `vector_common::compression::gzip_multiple_decoder` instead, which enables `multiple_members` to handle concatenated gzip streams." },
910
]
1011

1112
disallowed-macros = [

lib/file-source-common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ serde_json.workspace = true
2020
bstr = { version = "1.12", default-features = false }
2121
bytes = { version = "1.11.1", default-features = false, features = ["serde"] }
2222
dashmap = { version = "6.1", default-features = false }
23-
async-compression = { version = "0.4.42", features = ["tokio", "gzip"] }
23+
async-compression.workspace = true
2424
vector-common = { path = "../vector-common", default-features = false }
2525
vector-config = { path = "../vector-config", default-features = false }
2626
tokio = { workspace = true, features = ["full"] }

lib/file-source-common/src/fingerprinter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use std::{
55
time,
66
};
77

8-
use async_compression::tokio::bufread::GzipDecoder;
98
use crc::Crc;
109
use serde::{Deserialize, Serialize};
1110
use tokio::{
1211
fs::{self, File},
1312
io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader},
1413
};
14+
use vector_common::compression::gzip_multiple_decoder;
1515
use vector_common::constants::GZIP_MAGIC;
1616

1717
use crate::{
@@ -128,7 +128,7 @@ impl UncompressedReader for UncompressedReaderImpl {
128128
// To support new compression algorithms, add them below
129129
match Self::check(fp).await? {
130130
Some(SupportedCompressionAlgorithms::Gzip) => Ok(Box::new(BufReader::new(
131-
GzipDecoder::new(BufReader::new(fp)),
131+
gzip_multiple_decoder(BufReader::new(fp)),
132132
))),
133133
// No compression, or read the raw bytes
134134
None => Ok(Box::new(BufReader::new(fp))),

lib/file-source/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ futures = { version = "0.3.31", default-features = false, features = ["executor"
2121
futures-util.workspace = true
2222
vector-common = { path = "../vector-common", default-features = false }
2323
file-source-common = { path = "../file-source-common" }
24-
async-compression = { version = "0.4.42", features = ["tokio", "gzip"] }
24+
async-compression.workspace = true
2525

2626
[dev-dependencies]
2727
tokio = { workspace = true, features = ["full"] }

lib/file-source/src/file_watcher/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use async_compression::tokio::bufread::GzipDecoder;
21
use bytes::{Bytes, BytesMut};
32
use chrono::{DateTime, Utc};
43
use std::{
@@ -18,6 +17,7 @@ use file_source_common::{
1817
AsyncFileInfo, FilePosition, PortableFileExt, ReadFrom,
1918
buffer::{ReadResult, read_until_with_max_size},
2019
};
20+
use vector_common::compression::gzip_multiple_decoder;
2121

2222
const EOF_READ_BACKOFF_MIN: Duration = Duration::from_millis(1);
2323
const EOF_READ_BACKOFF_MAX: Duration = Duration::from_millis(250);
@@ -132,7 +132,7 @@ impl FileWatcher {
132132
(Box::new(null_reader()), 0)
133133
}
134134
(true, false, ReadFrom::Beginning) => {
135-
(Box::new(BufReader::new(GzipDecoder::new(reader))), 0)
135+
(Box::new(BufReader::new(gzip_multiple_decoder(reader))), 0)
136136
}
137137
(false, true, _) => {
138138
let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
@@ -189,7 +189,7 @@ impl FileWatcher {
189189
if self.file_position != 0 {
190190
Box::new(null_reader())
191191
} else {
192-
Box::new(BufReader::new(GzipDecoder::new(reader)))
192+
Box::new(BufReader::new(gzip_multiple_decoder(reader)))
193193
}
194194
} else {
195195
reader.seek(io::SeekFrom::Start(self.file_position)).await?;

lib/file-source/src/file_watcher/tests/mod.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,51 @@ impl Arbitrary for FileWatcherAction {
175175
}
176176
}
177177

178+
#[tokio::test]
179+
async fn gzip_multi_stream_reads_all_members() {
180+
use async_compression::tokio::bufread::GzipEncoder;
181+
use std::fs;
182+
use tokio::io::AsyncReadExt as _;
183+
184+
let dir = tempfile::TempDir::new().expect("could not create tempdir");
185+
let path = dir.path().join("multi.gz");
186+
187+
async fn encode(data: &[u8]) -> Vec<u8> {
188+
let mut out = Vec::new();
189+
GzipEncoder::new(data).read_to_end(&mut out).await.unwrap();
190+
out
191+
}
192+
193+
// Write two separate gzip members into one file — the bug dropped the second.
194+
let mut bytes = encode(b"first\n").await;
195+
bytes.extend(encode(b"second\n").await);
196+
fs::write(&path, &bytes).unwrap();
197+
198+
let mut fw = FileWatcher::new(
199+
path,
200+
file_source_common::ReadFrom::Beginning,
201+
None,
202+
100_000,
203+
Bytes::from("\n"),
204+
)
205+
.await
206+
.expect("FileWatcher::new failed");
207+
208+
let mut lines = Vec::new();
209+
for _ in 0..10 {
210+
fw.track_read_attempt();
211+
let result = fw.read_line().await.expect("read_line error");
212+
if let Some(raw) = result.raw_line {
213+
lines.push(String::from_utf8(raw.bytes.to_vec()).unwrap());
214+
}
215+
if lines.len() == 2 {
216+
break;
217+
}
218+
}
219+
220+
assert_eq!(lines, vec!["first", "second"]);
221+
}
222+
178223
fn watcher_for_timing() -> FileWatcher {
179224
let now = Instant::now();
180225

lib/vector-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ test = []
3434
tokenize = []
3535

3636
[dependencies]
37+
async-compression.workspace = true
3738
async-stream = "0.3.6"
3839
bytes = { version = "1.11.1", default-features = false, optional = true }
3940
chrono.workspace = true

0 commit comments

Comments
 (0)