Skip to content

Commit 62cb1ac

Browse files
authored
add file size metadata to integration test (delta-io#1951)
## What changes are proposed in this pull request? - Part of delta-io#1010 - There are several test in `kernel/test/read.rs` and `kernel/test/log_segment.rs` that read the Parquet file, however it is using file_meta's constant value. In the next PR, when we used the file_meta's size in the read the Parquet file, those tests will fail since there is a mismatch in the correct parquet size and the file_meta's size. In this PR, we will integrate the correct file_meta's size to those affected test. - Change included: - Create a new mock Enum for add/remove commit with size called `AddWithSize`, `RemoveWithSize`. - Change the affected test in the `kernel/test/read.rs` to use the `AddWithSize`, `RemoveWithSize`. - Update the log_segment's test to write sidecar's file size in the checkpoint. ## How was this change tested? `cargo test --package delta_kernel`
1 parent 17da470 commit 62cb1ac

3 files changed

Lines changed: 84 additions & 36 deletions

File tree

kernel/src/log_segment/tests.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,22 @@ pub(crate) async fn add_checkpoint_to_store(
159159
write_parquet_to_store(store, path, data).await
160160
}
161161

162-
/// Writes all actions to a _delta_log/_sidecars file in the store.
162+
/// Writes all actions to a _delta_log/_sidecars file in the store and return the [`FileMeta`].
163163
/// This function formats the provided filename into the _sidecars subdirectory.
164164
async fn add_sidecar_to_store(
165165
store: &Arc<InMemory>,
166166
data: Box<dyn EngineData>,
167167
filename: &str,
168-
) -> DeltaResult<()> {
168+
) -> DeltaResult<FileMeta> {
169169
let path = format!("_delta_log/_sidecars/{filename}");
170-
write_parquet_to_store(store, path, data).await
170+
write_parquet_to_store(store, path.clone(), data).await?;
171+
let size = get_file_size(store, &path).await;
172+
let location = Url::parse(&format!("memory:///{path}")).expect("valid url");
173+
Ok(FileMeta {
174+
location,
175+
last_modified: 0,
176+
size,
177+
})
171178
}
172179

173180
/// Writes all actions to a _delta_log json checkpoint file in the store.
@@ -989,21 +996,27 @@ async fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaR
989996
let engine = DefaultEngineBuilder::new(store.clone()).build();
990997
let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?;
991998

992-
add_sidecar_to_store(
999+
let sidecar1_size = add_sidecar_to_store(
9931000
&store,
9941001
add_batch_simple(read_schema.clone()),
9951002
"sidecarfile1.parquet",
9961003
)
997-
.await?;
998-
add_sidecar_to_store(
1004+
.await?
1005+
.size;
1006+
1007+
let sidecar2_size = add_sidecar_to_store(
9991008
&store,
10001009
add_batch_with_remove(read_schema.clone()),
10011010
"sidecarfile2.parquet",
10021011
)
1003-
.await?;
1012+
.await?
1013+
.size;
10041014

1005-
let checkpoint_batch = sidecar_batch_with_given_paths(
1006-
vec!["sidecarfile1.parquet", "sidecarfile2.parquet"],
1015+
let checkpoint_batch = sidecar_batch_with_given_paths_and_sizes(
1016+
vec![
1017+
("sidecarfile1.parquet", sidecar1_size),
1018+
("sidecarfile2.parquet", sidecar2_size),
1019+
],
10071020
read_schema.clone(),
10081021
);
10091022

@@ -1058,16 +1071,19 @@ async fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> {
10581071
let engine = DefaultEngineBuilder::new(store.clone()).build();
10591072
let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?;
10601073

1061-
let checkpoint_batch =
1062-
sidecar_batch_with_given_paths(vec!["sidecarfile1.parquet"], read_schema.clone());
1063-
10641074
// Add a sidecar file with only add actions
1065-
add_sidecar_to_store(
1075+
let sidecar_size = add_sidecar_to_store(
10661076
&store,
10671077
add_batch_simple(read_schema.clone()),
10681078
"sidecarfile1.parquet",
10691079
)
1070-
.await?;
1080+
.await?
1081+
.size;
1082+
1083+
let checkpoint_batch = sidecar_batch_with_given_paths_and_sizes(
1084+
vec![("sidecarfile1.parquet", sidecar_size)],
1085+
read_schema.clone(),
1086+
);
10711087

10721088
// Filter out sidecar files that do not contain remove actions
10731089
let remove_predicate: LazyLock<Option<PredicateRef>> = LazyLock::new(|| {
@@ -1344,22 +1360,21 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
13441360
let engine = DefaultEngineBuilder::new(store.clone()).build();
13451361

13461362
// Write sidecars first so we can get their actual sizes
1347-
add_sidecar_to_store(
1363+
let sidecar1_size = add_sidecar_to_store(
13481364
&store,
13491365
add_batch_simple(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
13501366
"sidecarfile1.parquet",
13511367
)
1352-
.await?;
1353-
add_sidecar_to_store(
1368+
.await?
1369+
.size;
1370+
1371+
let sidecar2_size = add_sidecar_to_store(
13541372
&store,
13551373
add_batch_with_remove(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
13561374
"sidecarfile2.parquet",
13571375
)
1358-
.await?;
1359-
1360-
// Get actual sidecar sizes for correct FileMeta creation
1361-
let sidecar1_size = get_file_size(&store, "_delta_log/_sidecars/sidecarfile1.parquet").await;
1362-
let sidecar2_size = get_file_size(&store, "_delta_log/_sidecars/sidecarfile2.parquet").await;
1376+
.await?
1377+
.size;
13631378

13641379
// Now create checkpoint with correct sidecar sizes
13651380
add_checkpoint_to_store(

kernel/tests/read.rs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ fn make_top_level_fields_nullable(batch: &RecordBatch) -> RecordBatch {
5252
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
5353
let batch = generate_simple_batch()?;
5454
let storage = Arc::new(InMemory::new());
55+
let parquet_bytes = record_batch_to_bytes(&batch);
56+
let file_size = parquet_bytes.len() as u64;
5557
add_commit(
5658
storage.as_ref(),
5759
0,
5860
actions_to_string(vec![
5961
TestAction::Metadata,
60-
TestAction::Add(PARQUET_FILE1.to_string()),
61-
TestAction::Add(PARQUET_FILE2.to_string()),
62+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
63+
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size),
6264
]),
6365
)
6466
.await?;
@@ -99,19 +101,24 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
99101
async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
100102
let batch = generate_simple_batch()?;
101103
let storage = Arc::new(InMemory::new());
104+
let parquet_bytes = record_batch_to_bytes(&batch);
105+
let file_size = parquet_bytes.len() as u64;
102106
add_commit(
103107
storage.as_ref(),
104108
0,
105109
actions_to_string(vec![
106110
TestAction::Metadata,
107-
TestAction::Add(PARQUET_FILE1.to_string()),
111+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
108112
]),
109113
)
110114
.await?;
111115
add_commit(
112116
storage.as_ref(),
113117
1,
114-
actions_to_string(vec![TestAction::Add(PARQUET_FILE2.to_string())]),
118+
actions_to_string(vec![TestAction::AddWithSize(
119+
PARQUET_FILE2.to_string(),
120+
file_size,
121+
)]),
115122
)
116123
.await?;
117124
storage
@@ -152,25 +159,33 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
152159
async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
153160
let batch = generate_simple_batch()?;
154161
let storage = Arc::new(InMemory::new());
162+
let parquet_bytes = record_batch_to_bytes(&batch);
163+
let file_size = parquet_bytes.len() as u64;
155164
add_commit(
156165
storage.as_ref(),
157166
0,
158167
actions_to_string(vec![
159168
TestAction::Metadata,
160-
TestAction::Add(PARQUET_FILE1.to_string()),
169+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
161170
]),
162171
)
163172
.await?;
164173
add_commit(
165174
storage.as_ref(),
166175
1,
167-
actions_to_string(vec![TestAction::Add(PARQUET_FILE2.to_string())]),
176+
actions_to_string(vec![TestAction::AddWithSize(
177+
PARQUET_FILE2.to_string(),
178+
file_size,
179+
)]),
168180
)
169181
.await?;
170182
add_commit(
171183
storage.as_ref(),
172184
2,
173-
actions_to_string(vec![TestAction::Remove(PARQUET_FILE2.to_string())]),
185+
actions_to_string(vec![TestAction::RemoveWithSize(
186+
PARQUET_FILE2.to_string(),
187+
file_size,
188+
)]),
174189
)
175190
.await?;
176191
storage
@@ -209,6 +224,8 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
209224
TestAction::Add(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
210225
TestAction::Remove(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
211226
TestAction::Metadata => METADATA.into(),
227+
TestAction::AddWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
228+
TestAction::RemoveWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
212229
})
213230
.fold(String::new(), |a, b| a + &b + "\n")
214231
}
@@ -218,22 +235,27 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
218235
("id", vec![5, 7].into_array()),
219236
("val", vec!["e", "g"].into_array()),
220237
])?);
238+
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
239+
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
221240
let storage = Arc::new(InMemory::new());
222241
// valid commit with min/max (0, 2)
223242
add_commit(
224243
storage.as_ref(),
225244
0,
226245
actions_to_string(vec![
227246
TestAction::Metadata,
228-
TestAction::Add(PARQUET_FILE1.to_string()),
247+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
229248
]),
230249
)
231250
.await?;
232251
// storage.add_commit(1, &format!("{}\n", r#"{{"add":{{"path":"doesnotexist","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 0}},\"maxValues\":{{\"id\":2}}}}"}}}}"#));
233252
add_commit(
234253
storage.as_ref(),
235254
1,
236-
generate_commit2(vec![TestAction::Add(PARQUET_FILE2.to_string())]),
255+
generate_commit2(vec![TestAction::AddWithSize(
256+
PARQUET_FILE2.to_string(),
257+
file_size2,
258+
)]),
237259
)
238260
.await?;
239261

@@ -1335,15 +1357,18 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
13351357
("value", vec!["p", "q", "r", "s"].into_array()),
13361358
])?;
13371359

1360+
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
1361+
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
1362+
let file_size3 = record_batch_to_bytes(&batch3).len() as u64;
13381363
let storage = Arc::new(InMemory::new());
13391364
add_commit(
13401365
storage.as_ref(),
13411366
0,
13421367
actions_to_string(vec![
13431368
TestAction::Metadata,
1344-
TestAction::Add(PARQUET_FILE1.to_string()),
1345-
TestAction::Add(PARQUET_FILE2.to_string()),
1346-
TestAction::Add(PARQUET_FILE3.to_string()),
1369+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
1370+
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
1371+
TestAction::AddWithSize(PARQUET_FILE3.to_string(), file_size3),
13471372
]),
13481373
)
13491374
.await?;
@@ -1430,14 +1455,16 @@ async fn test_file_path_metadata_column() -> Result<(), Box<dyn std::error::Erro
14301455
("value", vec!["x", "y"].into_array()),
14311456
])?;
14321457

1458+
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
1459+
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
14331460
let storage = Arc::new(InMemory::new());
14341461
add_commit(
14351462
storage.as_ref(),
14361463
0,
14371464
actions_to_string(vec![
14381465
TestAction::Metadata,
1439-
TestAction::Add(PARQUET_FILE1.to_string()),
1440-
TestAction::Add(PARQUET_FILE2.to_string()),
1466+
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
1467+
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
14411468
]),
14421469
)
14431470
.await?;

test-utils/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ pub enum TestAction {
104104
Add(String),
105105
Remove(String),
106106
Metadata,
107+
// TODO: This is a temporary fix to make the test compatible with the file size requirement.
108+
// In the future, we can create an AddCommit/RemoveCommit struct type with DefaultAddCommit/DefaultRemoveCommit value to store the commit info in the enum for Add/Remove.
109+
AddWithSize(String, u64),
110+
RemoveWithSize(String, u64),
107111
}
108112

109113
// TODO: We need a better way to mock tables :)
@@ -125,6 +129,8 @@ pub fn actions_to_string_with_metadata(actions: Vec<TestAction>, metadata: &str)
125129
TestAction::Add(path) => format!(r#"{{"add":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 1}},\"maxValues\":{{\"id\":3}}}}"}}}}"#),
126130
TestAction::Remove(path) => format!(r#"{{"remove":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#),
127131
TestAction::Metadata => metadata.into(),
132+
TestAction::AddWithSize(path, file_size) => format!(r#"{{"add":{{"path":"{path}","partitionValues":{{}},"size":{file_size},"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 1}},\"maxValues\":{{\"id\":3}}}}"}}}}"#),
133+
TestAction::RemoveWithSize(path, file_size) => format!(r#"{{"remove":{{"path":"{path}","partitionValues":{{}},"size":{file_size},"modificationTime":1587968586000,"dataChange":true}}}}"#),
128134
})
129135
.join("\n")
130136
}

0 commit comments

Comments
 (0)