Skip to content

Commit 31faaee

Browse files
committed
feat(encryption) [4/N] Support encryption: StandardKeyMetadata
1 parent 4e8cf7f commit 31faaee

3 files changed

Lines changed: 233 additions & 4 deletions

File tree

crates/iceberg/src/encryption/crypto.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{Error, ErrorKind, Result};
4343
/// containing `SensitiveBytes` can safely derive or implement `Debug`
4444
/// without risk of leaking key material.
4545
#[derive(Clone, PartialEq, Eq)]
46-
struct SensitiveBytes(Zeroizing<Box<[u8]>>);
46+
pub struct SensitiveBytes(Zeroizing<Box<[u8]>>);
4747

4848
impl SensitiveBytes {
4949
/// Wraps the given bytes as sensitive material.
@@ -57,13 +57,11 @@ impl SensitiveBytes {
5757
}
5858

5959
/// Returns the number of bytes.
60-
#[allow(dead_code)] // Encryption work is ongoing so currently unused
6160
pub fn len(&self) -> usize {
6261
self.0.len()
6362
}
6463

6564
/// Returns `true` if the byte slice is empty.
66-
#[allow(dead_code)] // Encryption work is ongoing so currently unused
6765
pub fn is_empty(&self) -> bool {
6866
self.0.is_empty()
6967
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Avro-serialized key metadata format compatible with Java's
19+
//! `org.apache.iceberg.encryption.StandardKeyMetadata`.
20+
21+
use std::fmt;
22+
use std::io::Cursor;
23+
use std::sync::LazyLock;
24+
25+
use apache_avro::{Schema as AvroSchema, from_avro_datum, from_value, to_avro_datum, to_value};
26+
use serde::{Deserialize, Serialize};
27+
28+
use super::SensitiveBytes;
29+
use crate::{Error, ErrorKind, Result};
30+
31+
const V1: u8 = 1;
32+
33+
/// Avro schema for StandardKeyMetadata V1, matching Java's layout.
34+
static AVRO_SCHEMA_V1: LazyLock<AvroSchema> = LazyLock::new(|| {
35+
AvroSchema::parse_str(
36+
r#"{
37+
"type": "record",
38+
"name": "StandardKeyMetadata",
39+
"namespace": "org.apache.iceberg.encryption",
40+
"fields": [
41+
{
42+
"name": "encryption_key",
43+
"type": "bytes",
44+
"field-id": 0
45+
},
46+
{
47+
"name": "aad_prefix",
48+
"type": ["null", "bytes"],
49+
"default": null,
50+
"field-id": 1
51+
},
52+
{
53+
"name": "file_length",
54+
"type": ["null", "long"],
55+
"default": null,
56+
"field-id": 2
57+
}
58+
]
59+
}"#,
60+
)
61+
.expect("Failed to parse StandardKeyMetadata Avro schema")
62+
});
63+
64+
/// Standard key metadata for Iceberg table encryption.
65+
///
66+
/// Contains the Data Encryption Key (DEK), AAD prefix, and optional file
67+
/// length. Byte-compatible with Java's `StandardKeyMetadata` via Avro
68+
/// serialization.
69+
///
70+
/// Wire format: `[version byte (0x01)] [Avro binary datum]`
71+
#[derive(Clone, PartialEq, Eq)]
72+
pub struct StandardKeyMetadata {
73+
encryption_key: SensitiveBytes,
74+
aad_prefix: Box<[u8]>,
75+
file_length: Option<i64>,
76+
}
77+
78+
impl fmt::Debug for StandardKeyMetadata {
79+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80+
f.debug_struct("StandardKeyMetadata")
81+
.field("encryption_key", &self.encryption_key)
82+
.field("aad_prefix", &format!("[{} bytes]", self.aad_prefix.len()))
83+
.field("file_length", &self.file_length)
84+
.finish()
85+
}
86+
}
87+
88+
impl StandardKeyMetadata {
89+
/// Creates a new `StandardKeyMetadata`.
90+
pub fn new(encryption_key: &[u8], aad_prefix: &[u8]) -> Self {
91+
Self {
92+
encryption_key: SensitiveBytes::new(encryption_key),
93+
aad_prefix: aad_prefix.into(),
94+
file_length: None,
95+
}
96+
}
97+
98+
/// Returns the plaintext Data Encryption Key.
99+
pub fn encryption_key(&self) -> &[u8] {
100+
self.encryption_key.as_bytes()
101+
}
102+
103+
/// Returns the AAD prefix.
104+
pub fn aad_prefix(&self) -> &[u8] {
105+
&self.aad_prefix
106+
}
107+
108+
/// Returns the optional file length.
109+
pub fn file_length(&self) -> Option<i64> {
110+
self.file_length
111+
}
112+
113+
/// Serializes to Java-compatible format: `[0x01] [Avro binary datum]`
114+
pub fn serialize(&self) -> Result<Box<[u8]>> {
115+
let serde_repr = StandardKeyMetadataV1 {
116+
encryption_key: serde_bytes::ByteBuf::from(self.encryption_key.as_bytes()),
117+
aad_prefix: Some(serde_bytes::ByteBuf::from(self.aad_prefix.as_ref())),
118+
file_length: self.file_length,
119+
};
120+
121+
let value = to_value(serde_repr)
122+
.and_then(|v| v.resolve(&AVRO_SCHEMA_V1))
123+
.map_err(|e| {
124+
Error::new(ErrorKind::Unexpected, "Failed to serialize key metadata").with_source(e)
125+
})?;
126+
127+
let datum = to_avro_datum(&AVRO_SCHEMA_V1, value).map_err(|e| {
128+
Error::new(ErrorKind::Unexpected, "Failed to serialize key metadata").with_source(e)
129+
})?;
130+
131+
let mut result = Vec::with_capacity(1 + datum.len());
132+
result.push(V1);
133+
result.extend_from_slice(&datum);
134+
Ok(result.into_boxed_slice())
135+
}
136+
137+
/// Deserializes from Java-compatible format.
138+
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
139+
if bytes.is_empty() {
140+
return Err(Error::new(
141+
ErrorKind::DataInvalid,
142+
"Empty key metadata buffer",
143+
));
144+
}
145+
146+
let version = bytes[0];
147+
if version != V1 {
148+
return Err(Error::new(
149+
ErrorKind::FeatureUnsupported,
150+
format!("Cannot resolve schema for version: {version}"),
151+
));
152+
}
153+
154+
let mut reader = Cursor::new(&bytes[1..]);
155+
let value = from_avro_datum(&AVRO_SCHEMA_V1, &mut reader, None).map_err(|e| {
156+
Error::new(ErrorKind::DataInvalid, "Failed to parse key metadata").with_source(e)
157+
})?;
158+
159+
let v1: StandardKeyMetadataV1 = from_value(&value).map_err(|e| {
160+
Error::new(
161+
ErrorKind::DataInvalid,
162+
"Failed to deserialize key metadata fields",
163+
)
164+
.with_source(e)
165+
})?;
166+
167+
Ok(Self {
168+
encryption_key: SensitiveBytes::new(v1.encryption_key.into_vec()),
169+
aad_prefix: v1
170+
.aad_prefix
171+
.map(|b| b.into_vec().into_boxed_slice())
172+
.unwrap_or_default(),
173+
file_length: v1.file_length,
174+
})
175+
}
176+
}
177+
178+
/// Serde struct for Avro serialization of [`StandardKeyMetadata`] V1.
179+
/// Field names must match [`AVRO_SCHEMA_V1`] exactly.
180+
#[derive(Serialize, Deserialize)]
181+
struct StandardKeyMetadataV1 {
182+
encryption_key: serde_bytes::ByteBuf,
183+
aad_prefix: Option<serde_bytes::ByteBuf>,
184+
file_length: Option<i64>,
185+
}
186+
187+
#[cfg(test)]
188+
mod tests {
189+
use super::*;
190+
191+
#[test]
192+
fn test_roundtrip() {
193+
let key = b"0123456789012345";
194+
let aad = b"1234567890123456";
195+
196+
let metadata = StandardKeyMetadata::new(key, aad);
197+
let serialized = metadata.serialize().unwrap();
198+
let parsed = StandardKeyMetadata::deserialize(&serialized).unwrap();
199+
200+
assert_eq!(parsed.encryption_key(), key);
201+
assert_eq!(parsed.aad_prefix(), aad);
202+
assert_eq!(parsed.file_length(), None);
203+
}
204+
205+
#[test]
206+
fn test_unsupported_version() {
207+
let result = StandardKeyMetadata::deserialize(&[0x02]);
208+
assert!(result.is_err());
209+
let err = result.unwrap_err();
210+
assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
211+
}
212+
213+
#[test]
214+
fn test_empty_buffer() {
215+
let result = StandardKeyMetadata::deserialize(&[]);
216+
assert!(result.is_err());
217+
assert_eq!(result.unwrap_err().kind(), ErrorKind::DataInvalid);
218+
}
219+
220+
#[test]
221+
fn test_roundtrip_with_empty_aad() {
222+
let metadata = StandardKeyMetadata::new(&[1, 2, 3, 4], &[]);
223+
let serialized = metadata.serialize().unwrap();
224+
let parsed = StandardKeyMetadata::deserialize(&serialized).unwrap();
225+
226+
assert_eq!(parsed.encryption_key(), &[1, 2, 3, 4]);
227+
assert_eq!(parsed.aad_prefix(), &[] as &[u8]);
228+
}
229+
}

crates/iceberg/src/encryption/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
mod crypto;
2424
mod file_decryptor;
2525
mod file_encryptor;
26+
pub(crate) mod key_metadata;
2627
mod stream;
2728

28-
pub use crypto::{AesGcmCipher, AesKeySize, SecureKey};
29+
pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
2930
pub use file_decryptor::AesGcmFileDecryptor;
3031
pub use file_encryptor::AesGcmFileEncryptor;
32+
pub use key_metadata::StandardKeyMetadata;
3133
pub use stream::{AesGcmFileRead, AesGcmFileWrite};

0 commit comments

Comments
 (0)