Skip to content

Commit d8011a0

Browse files
authored
feat!: Enhance compression codec enum. (#2288)
## Which issue does this PR close? This is an intermediate PR for #1731 I'm splitting out changes from #1851 to the compression codec to make it easier to review. Once we decide on approach here and merge it I'll update #1851 accordingly. ## What changes are included in this PR? - Add optional compression level to gzip and zstd (needed for when avro compression usage). - Add Snappy as a compression codec (also will be used for Avro) - Manually code up some previously auto-generated methods as a result. AI helped with an initial version of this PR. ## Are these changes tested? Additional unit tests
1 parent 14f2e14 commit d8011a0

9 files changed

Lines changed: 191 additions & 73 deletions

File tree

crates/iceberg/src/catalog/metadata_location.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ impl MetadataLocation {
114114
))?;
115115

116116
// Check for compression suffix (e.g., .gz)
117-
let gzip_suffix = CompressionCodec::Gzip.suffix()?;
117+
let gzip_suffix = CompressionCodec::gzip_default().suffix()?;
118118
let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) {
119-
(s, CompressionCodec::Gzip)
119+
(s, CompressionCodec::gzip_default())
120120
} else {
121121
(stripped, CompressionCodec::None)
122122
};
@@ -261,7 +261,7 @@ mod test {
261261
table_location: "/abc".to_string(),
262262
version: 1234567,
263263
id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(),
264-
compression_codec: CompressionCodec::Gzip,
264+
compression_codec: CompressionCodec::gzip_default(),
265265
}),
266266
),
267267
// Negative version
@@ -345,10 +345,16 @@ mod test {
345345
"/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json",
346346
)
347347
.unwrap();
348-
assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip);
348+
assert_eq!(
349+
location_gzip.compression_codec,
350+
CompressionCodec::gzip_default()
351+
);
349352

350353
let next_gzip = location_gzip.with_next_version();
351-
assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip);
354+
assert_eq!(
355+
next_gzip.compression_codec,
356+
CompressionCodec::gzip_default()
357+
);
352358
assert_eq!(next_gzip.version, 6);
353359
}
354360

@@ -369,7 +375,10 @@ mod test {
369375
);
370376
let metadata_gzip = create_test_metadata(props_gzip);
371377
let updated_gzip = location.with_new_metadata(&metadata_gzip);
372-
assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip);
378+
assert_eq!(
379+
updated_gzip.compression_codec,
380+
CompressionCodec::gzip_default()
381+
);
373382
assert_eq!(updated_gzip.version, 0);
374383
assert_eq!(
375384
updated_gzip.to_string(),

crates/iceberg/src/compression.rs

Lines changed: 128 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,101 @@
1717

1818
//! Compression codec support for data compression and decompression.
1919
20+
use std::fmt;
2021
use std::io::{Read, Write};
2122

2223
use flate2::Compression;
2324
use flate2::read::GzDecoder;
2425
use flate2::write::GzEncoder;
25-
use serde::{Deserialize, Serialize};
26+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
2627

2728
use crate::{Error, ErrorKind, Result};
2829

30+
/// Default compression level for Zstandard (zstd).
31+
const ZSTD_DEFAULT_LEVEL: u8 = 3;
32+
/// Default compression level for Gzip.
33+
const GZIP_DEFAULT_LEVEL: u8 = 6;
34+
/// Maximum compression level for Gzip.
35+
const GZIP_MAX_LEVEL: u8 = 9;
36+
2937
/// Data compression formats
30-
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
31-
#[serde(rename_all = "lowercase")]
38+
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
3239
pub enum CompressionCodec {
3340
#[default]
3441
/// No compression
3542
None,
3643
/// LZ4 single compression frame with content size present
3744
Lz4,
38-
/// Zstandard single compression frame with content size present
39-
Zstd,
40-
/// Gzip compression
41-
Gzip,
45+
/// Zstandard single compression frame with content size present.
46+
/// Level range is 0–22, where 0 means default compression level (not no compression).
47+
/// Use [`CompressionCodec::zstd_default`] to construct with the default level.
48+
Zstd(u8),
49+
/// Gzip compression. Level range is 0–9, where 0 means no compression.
50+
/// Use [`CompressionCodec::gzip_default`] to construct with the default level.
51+
Gzip(u8),
52+
/// Snappy compression
53+
Snappy,
54+
}
55+
56+
impl CompressionCodec {
57+
/// Returns a Zstd codec with the default compression level.
58+
pub const fn zstd_default() -> Self {
59+
CompressionCodec::Zstd(ZSTD_DEFAULT_LEVEL)
60+
}
61+
62+
/// Returns a Gzip codec with the default compression level.
63+
pub const fn gzip_default() -> Self {
64+
CompressionCodec::Gzip(GZIP_DEFAULT_LEVEL)
65+
}
66+
67+
/// Returns the codec name as used in serialization and error messages.
68+
pub fn name(&self) -> &'static str {
69+
match self {
70+
CompressionCodec::None => "none",
71+
CompressionCodec::Lz4 => "lz4",
72+
CompressionCodec::Zstd(_) => "zstd",
73+
CompressionCodec::Gzip(_) => "gzip",
74+
CompressionCodec::Snappy => "snappy",
75+
}
76+
}
77+
}
78+
79+
// Note: serialize/deserialize do not round-trip the compression level. Iceberg configuration
80+
// only the codec name (e.g. "zstd"), not the level, so deserialization always produces the
81+
// default level. A `Zstd(5)` written to metadata will be read back as `Zstd(3)`. Some
82+
// compression configuration (e.g. Avro metadata) has a separate level field alongside the codec name.
83+
impl Serialize for CompressionCodec {
84+
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
85+
serializer.serialize_str(self.name())
86+
}
87+
}
88+
89+
impl<'de> Deserialize<'de> for CompressionCodec {
90+
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
91+
let s = String::deserialize(deserializer)?;
92+
match s.to_lowercase().as_str() {
93+
"none" => Ok(CompressionCodec::None),
94+
"lz4" => Ok(CompressionCodec::Lz4),
95+
"zstd" => Ok(CompressionCodec::zstd_default()),
96+
"gzip" => Ok(CompressionCodec::gzip_default()),
97+
"snappy" => Ok(CompressionCodec::Snappy),
98+
other => Err(serde::de::Error::unknown_variant(other, &[
99+
"none", "lz4", "zstd", "gzip", "snappy",
100+
])),
101+
}
102+
}
103+
}
104+
105+
impl fmt::Display for CompressionCodec {
106+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107+
match self {
108+
CompressionCodec::None => write!(f, "None"),
109+
CompressionCodec::Lz4 => write!(f, "Lz4"),
110+
CompressionCodec::Zstd(level) => write!(f, "Zstd(level={level})"),
111+
CompressionCodec::Gzip(level) => write!(f, "Gzip(level={level})"),
112+
CompressionCodec::Snappy => write!(f, "Snappy"),
113+
}
114+
}
42115
}
43116

44117
impl CompressionCodec {
@@ -49,13 +122,17 @@ impl CompressionCodec {
49122
ErrorKind::FeatureUnsupported,
50123
"LZ4 decompression is not supported currently",
51124
)),
52-
CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?),
53-
CompressionCodec::Gzip => {
125+
CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?),
126+
CompressionCodec::Gzip(_) => {
54127
let mut decoder = GzDecoder::new(&bytes[..]);
55128
let mut decompressed = Vec::new();
56129
decoder.read_to_end(&mut decompressed)?;
57130
Ok(decompressed)
58131
}
132+
CompressionCodec::Snappy => Err(Error::new(
133+
ErrorKind::FeatureUnsupported,
134+
"Snappy decompression is not supported currently",
135+
)),
59136
}
60137
}
61138

@@ -66,19 +143,24 @@ impl CompressionCodec {
66143
ErrorKind::FeatureUnsupported,
67144
"LZ4 compression is not supported currently",
68145
)),
69-
CompressionCodec::Zstd => {
146+
CompressionCodec::Zstd(level) => {
70147
let writer = Vec::<u8>::new();
71-
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
148+
let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?;
72149
encoder.include_checksum(true)?;
73150
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
74151
std::io::copy(&mut &bytes[..], &mut encoder)?;
75152
Ok(encoder.finish()?)
76153
}
77-
CompressionCodec::Gzip => {
78-
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
154+
CompressionCodec::Gzip(level) => {
155+
let compression = Compression::new((*level).min(GZIP_MAX_LEVEL) as u32);
156+
let mut encoder = GzEncoder::new(Vec::new(), compression);
79157
encoder.write_all(&bytes)?;
80158
Ok(encoder.finish()?)
81159
}
160+
CompressionCodec::Snappy => Err(Error::new(
161+
ErrorKind::FeatureUnsupported,
162+
"Snappy compression is not supported currently",
163+
)),
82164
}
83165
}
84166

@@ -95,8 +177,10 @@ impl CompressionCodec {
95177
pub fn suffix(&self) -> Result<&'static str> {
96178
match self {
97179
CompressionCodec::None => Ok(""),
98-
CompressionCodec::Gzip => Ok(".gz"),
99-
codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd) => Err(Error::new(
180+
CompressionCodec::Gzip(_) => Ok(".gz"),
181+
codec @ (CompressionCodec::Lz4
182+
| CompressionCodec::Zstd(_)
183+
| CompressionCodec::Snappy) => Err(Error::new(
100184
ErrorKind::FeatureUnsupported,
101185
format!("suffix not defined for {codec:?}"),
102186
)),
@@ -123,7 +207,10 @@ mod tests {
123207
async fn test_compression_codec_compress() {
124208
let bytes_vec = [0_u8; 100].to_vec();
125209

126-
let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip];
210+
let compression_codecs = [
211+
CompressionCodec::zstd_default(),
212+
CompressionCodec::gzip_default(),
213+
];
127214

128215
for codec in compression_codecs {
129216
let compressed = codec.compress(bytes_vec.clone()).unwrap();
@@ -135,7 +222,10 @@ mod tests {
135222

136223
#[tokio::test]
137224
async fn test_compression_codec_unsupported() {
138-
let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
225+
let unsupported_codecs = [
226+
(CompressionCodec::Lz4, "LZ4"),
227+
(CompressionCodec::Snappy, "Snappy"),
228+
];
139229
let bytes_vec = [0_u8; 100].to_vec();
140230

141231
for (codec, name) in unsupported_codecs {
@@ -153,18 +243,34 @@ mod tests {
153243

154244
#[test]
155245
fn test_suffix() {
156-
// Test supported codecs
157246
assert_eq!(CompressionCodec::None.suffix().unwrap(), "");
158-
assert_eq!(CompressionCodec::Gzip.suffix().unwrap(), ".gz");
247+
assert_eq!(CompressionCodec::gzip_default().suffix().unwrap(), ".gz");
159248

160-
// Test unsupported codecs return errors
161249
assert!(CompressionCodec::Lz4.suffix().is_err());
162-
assert!(CompressionCodec::Zstd.suffix().is_err());
250+
assert!(CompressionCodec::zstd_default().suffix().is_err());
251+
assert!(CompressionCodec::Snappy.suffix().is_err());
163252

164253
let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err();
165254
assert!(lz4_err.to_string().contains("suffix not defined for Lz4"));
166255

167-
let zstd_err = CompressionCodec::Zstd.suffix().unwrap_err();
256+
let zstd_err = CompressionCodec::zstd_default().suffix().unwrap_err();
168257
assert!(zstd_err.to_string().contains("suffix not defined for Zstd"));
169258
}
259+
260+
#[test]
261+
fn test_display() {
262+
assert_eq!(CompressionCodec::None.to_string(), "None");
263+
assert_eq!(CompressionCodec::Lz4.to_string(), "Lz4");
264+
assert_eq!(
265+
CompressionCodec::zstd_default().to_string(),
266+
"Zstd(level=3)"
267+
);
268+
assert_eq!(CompressionCodec::Zstd(5).to_string(), "Zstd(level=5)");
269+
assert_eq!(
270+
CompressionCodec::gzip_default().to_string(),
271+
"Gzip(level=6)"
272+
);
273+
assert_eq!(CompressionCodec::Gzip(9).to_string(), "Gzip(level=9)");
274+
assert_eq!(CompressionCodec::Snappy.to_string(), "Snappy");
275+
}
170276
}

crates/iceberg/src/puffin/metadata.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,9 @@ mod tests {
985985
assert!(result.is_ok());
986986
let metadata = result.unwrap();
987987
assert_eq!(metadata.blobs.len(), 1);
988-
assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip);
988+
assert_eq!(
989+
metadata.blobs[0].compression_codec,
990+
CompressionCodec::gzip_default()
991+
);
989992
}
990993
}

crates/iceberg/src/puffin/mod.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,22 @@ pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1};
2626

2727
pub use crate::compression::CompressionCodec;
2828

29-
/// Compression codecs supported by the Puffin spec.
30-
const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[
31-
CompressionCodec::None,
32-
CompressionCodec::Lz4,
33-
CompressionCodec::Zstd,
34-
];
35-
3629
/// Validates that the compression codec is supported for Puffin files.
3730
/// Returns an error if the codec is not supported.
3831
fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> {
39-
if !SUPPORTED_PUFFIN_CODECS.contains(&codec) {
40-
let supported_names: Vec<String> = SUPPORTED_PUFFIN_CODECS
41-
.iter()
42-
.map(|c| format!("{c:?}"))
43-
.collect();
44-
return Err(Error::new(
32+
match codec {
33+
CompressionCodec::None | CompressionCodec::Lz4 | CompressionCodec::Zstd(_) => Ok(()),
34+
other => Err(Error::new(
4535
ErrorKind::DataInvalid,
4636
format!(
47-
"Compression codec {codec:?} is not supported for Puffin files. Only {} are supported.",
48-
supported_names.join(", ")
37+
"Compression codec {} is not supported for Puffin files. Only {}, {}, and {} are supported.",
38+
other.name(),
39+
CompressionCodec::None.name(),
40+
CompressionCodec::Lz4.name(),
41+
CompressionCodec::zstd_default().name()
4942
),
50-
));
43+
)),
5144
}
52-
Ok(())
5345
}
5446

5547
mod metadata;
@@ -70,12 +62,13 @@ mod tests {
7062

7163
#[test]
7264
fn test_puffin_codec_validation() {
73-
// All codecs in SUPPORTED_PUFFIN_CODECS should be valid
74-
for codec in SUPPORTED_PUFFIN_CODECS {
75-
assert!(validate_puffin_compression(*codec).is_ok());
76-
}
65+
// Supported codecs
66+
assert!(validate_puffin_compression(CompressionCodec::None).is_ok());
67+
assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok());
68+
assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok());
69+
assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok());
7770

78-
// Gzip should not be supported for Puffin files
79-
assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err());
71+
// Unsupported codecs
72+
assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err());
8073
}
8174
}

crates/iceberg/src/puffin/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ mod tests {
144144
sequence_number: 1,
145145
offset: 4,
146146
length: 10,
147-
compression_codec: CompressionCodec::Gzip,
147+
compression_codec: CompressionCodec::gzip_default(),
148148
properties: HashMap::new(),
149149
};
150150

@@ -153,7 +153,7 @@ mod tests {
153153
assert!(result.is_err());
154154
let err = result.unwrap_err();
155155
assert_eq!(err.kind(), ErrorKind::DataInvalid);
156-
assert!(err.to_string().contains("Gzip"));
156+
assert!(err.to_string().contains("gzip"));
157157
assert!(
158158
err.to_string()
159159
.contains("is not supported for Puffin files")

crates/iceberg/src/puffin/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
7777
sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
7878
offset: 4,
7979
length: 22,
80-
compression_codec: CompressionCodec::Zstd,
80+
compression_codec: CompressionCodec::zstd_default(),
8181
properties: HashMap::new(),
8282
}
8383
}
@@ -134,7 +134,7 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
134134
sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
135135
offset: 26,
136136
length: 77,
137-
compression_codec: CompressionCodec::Zstd,
137+
compression_codec: CompressionCodec::zstd_default(),
138138
properties: HashMap::new(),
139139
}
140140
}

0 commit comments

Comments
 (0)