Skip to content

Commit 743edec

Browse files
authored
feat: Add logSegment.new_with_commit_appended API (delta-io#1602)
## What changes are proposed in this pull request? This PR adds a `LogSegment::new_with_commit_appended` API. This consumes the input LogSegment and returns a new LogSegment with that new commit (published or staged) added. Soon, our txn.commit() call will return, grab the FileMeta from the Committer, and use it to extend the LogSegment. That updated LogSegment will then be used to create the Post-Commit-Snapshot. ## How was this change tested? New UTs.
1 parent 43afc3a commit 743edec

3 files changed

Lines changed: 186 additions & 1 deletion

File tree

kernel/src/log_segment.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,45 @@ impl LogSegment {
327327
LogSegment::try_new(listed_commits, log_root, Some(end_version), None)
328328
}
329329

330+
/// Creates a new LogSegment with the given commit file added to the end.
331+
/// TODO: Take in multiple commits when Kernel-RS supports txn retries and conflict rebasing.
332+
#[allow(unused)]
333+
pub(crate) fn new_with_commit_appended(
334+
&self,
335+
tail_commit_file: ParsedLogPath,
336+
) -> DeltaResult<Self> {
337+
require!(
338+
tail_commit_file.is_commit(),
339+
Error::internal_error(format!(
340+
"Cannot extend and create new LogSegment. Tail log file is not a commit file. \
341+
Path: {}, Type: {:?}.",
342+
tail_commit_file.location.location, tail_commit_file.file_type
343+
))
344+
);
345+
require!(
346+
tail_commit_file.version == self.end_version + 1,
347+
Error::internal_error(format!(
348+
"Cannot extend and create new LogSegment. Tail commit file version ({}) does not \
349+
equal LogSegment end_version ({}) + 1.",
350+
tail_commit_file.version, self.end_version
351+
))
352+
);
353+
354+
let mut new_log_segment = self.clone();
355+
356+
new_log_segment.end_version = tail_commit_file.version;
357+
new_log_segment
358+
.ascending_commit_files
359+
.push(tail_commit_file.clone());
360+
new_log_segment.latest_commit_file = Some(tail_commit_file.clone());
361+
new_log_segment.max_published_version = match tail_commit_file.file_type {
362+
LogPathFileType::Commit => Some(tail_commit_file.version),
363+
_ => self.max_published_version,
364+
};
365+
366+
Ok(new_log_segment)
367+
}
368+
330369
/// Read a stream of actions from this log segment. This returns an iterator of
331370
/// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
332371
/// the data was read from a commit file (true) or a checkpoint file (false).

kernel/src/log_segment/tests.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2582,6 +2582,10 @@ async fn test_get_file_actions_schema_v1_parquet_with_hint() -> DeltaResult<()>
25822582
Ok(())
25832583
}
25842584

2585+
// ============================================================================
2586+
// max_published_version tests
2587+
// ============================================================================
2588+
25852589
#[tokio::test]
25862590
async fn test_max_published_version_only_published_commits() {
25872591
let log_segment = create_segment_for(LogSegmentConfig {
@@ -2681,6 +2685,11 @@ async fn test_max_published_version_checkpoint_only() {
26812685
.await;
26822686
assert_eq!(log_segment.max_published_version, None);
26832687
}
2688+
2689+
// ============================================================================
2690+
// schema_has_compatible_stats_parsed tests
2691+
// ============================================================================
2692+
26842693
// Helper to create a checkpoint schema with stats_parsed for testing
26852694
fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec<StructField>) -> StructType {
26862695
let stats_parsed = StructType::new_unchecked([
@@ -2883,3 +2892,107 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {
28832892
&stats_schema
28842893
));
28852894
}
2895+
2896+
// ============================================================================
2897+
// new_with_commit tests
2898+
// ============================================================================
2899+
2900+
/// Asserts that `new` is `orig` extended with exactly one commit via `LogSegment::new_with_commit`.
2901+
fn assert_log_segment_extended(orig: LogSegment, new: LogSegment) {
2902+
// Check: What should have changed
2903+
assert_eq!(orig.end_version + 1, new.end_version);
2904+
assert_eq!(
2905+
orig.ascending_commit_files.len() + 1,
2906+
new.ascending_commit_files.len()
2907+
);
2908+
assert_eq!(
2909+
orig.latest_commit_file.as_ref().unwrap().version + 1,
2910+
new.latest_commit_file.as_ref().unwrap().version
2911+
);
2912+
2913+
// Check: What should be the same
2914+
fn normalize(log_segment: LogSegment) -> LogSegment {
2915+
LogSegment {
2916+
end_version: 0,
2917+
max_published_version: None,
2918+
ascending_commit_files: vec![],
2919+
latest_commit_file: None,
2920+
..log_segment
2921+
}
2922+
}
2923+
2924+
assert_eq!(normalize(orig), normalize(new));
2925+
}
2926+
2927+
#[tokio::test]
2928+
async fn test_new_with_commit_published_commit() {
2929+
let log_segment = create_segment_for(LogSegmentConfig {
2930+
published_commit_versions: &[0, 1, 2, 3, 4],
2931+
..Default::default()
2932+
})
2933+
.await;
2934+
let table_root = Url::parse("memory:///").unwrap();
2935+
let new_commit = ParsedLogPath::create_parsed_published_commit(&table_root, 5);
2936+
2937+
let new_log_segment = log_segment
2938+
.clone()
2939+
.new_with_commit_appended(new_commit)
2940+
.unwrap();
2941+
2942+
assert_eq!(new_log_segment.max_published_version, Some(5));
2943+
assert_log_segment_extended(log_segment, new_log_segment);
2944+
}
2945+
2946+
#[tokio::test]
2947+
async fn test_new_with_commit_staged_commit() {
2948+
let log_segment = create_segment_for(LogSegmentConfig {
2949+
published_commit_versions: &[0, 1, 2, 3, 4],
2950+
..Default::default()
2951+
})
2952+
.await;
2953+
let table_root = Url::parse("memory:///").unwrap();
2954+
let new_commit = ParsedLogPath::create_parsed_staged_commit(&table_root, 5);
2955+
2956+
let new_log_segment = log_segment
2957+
.clone()
2958+
.new_with_commit_appended(new_commit)
2959+
.unwrap();
2960+
2961+
assert_eq!(new_log_segment.max_published_version, Some(4));
2962+
assert_log_segment_extended(log_segment, new_log_segment);
2963+
}
2964+
2965+
#[tokio::test]
2966+
async fn test_new_with_commit_not_commit_type() {
2967+
let log_segment = create_segment_for(LogSegmentConfig {
2968+
published_commit_versions: &[0, 1, 2, 3, 4],
2969+
..Default::default()
2970+
})
2971+
.await;
2972+
let checkpoint = create_log_path("file:///_delta_log/00000000000000000005.checkpoint.parquet");
2973+
2974+
let result = log_segment.new_with_commit_appended(checkpoint);
2975+
2976+
assert_result_error_with_message(
2977+
result,
2978+
"Cannot extend and create new LogSegment. Tail log file is not a commit file.",
2979+
);
2980+
}
2981+
2982+
#[tokio::test]
2983+
async fn test_new_with_commit_not_end_version_plus_one() {
2984+
let log_segment = create_segment_for(LogSegmentConfig {
2985+
published_commit_versions: &[0, 1, 2, 3, 4],
2986+
..Default::default()
2987+
})
2988+
.await;
2989+
let table_root = Url::parse("memory:///").unwrap();
2990+
2991+
let wrong_version_commit = ParsedLogPath::create_parsed_published_commit(&table_root, 10);
2992+
let result = log_segment.new_with_commit_appended(wrong_version_commit);
2993+
2994+
assert_result_error_with_message(
2995+
result,
2996+
"Cannot extend and create new LogSegment. Tail commit file version (10) does not equal LogSegment end_version (4) + 1."
2997+
);
2998+
}

kernel/src/path.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ impl LogRoot {
460460
}
461461

462462
#[cfg(test)]
463-
mod tests {
463+
pub(crate) mod tests {
464464
use std::path::PathBuf;
465465
use std::sync::Arc;
466466

@@ -471,6 +471,39 @@ mod tests {
471471
use object_store::memory::InMemory;
472472
use test_utils::add_commit;
473473

474+
impl ParsedLogPath<FileMeta> {
475+
pub(crate) fn create_parsed_published_commit(table_root: &Url, version: Version) -> Self {
476+
let filename = format!("{version:020}.json");
477+
let location = table_root
478+
.join(DELTA_LOG_DIR_WITH_SLASH)
479+
.unwrap()
480+
.join(&filename)
481+
.unwrap();
482+
let parsed = ParsedLogPath::try_from(FileMeta::new(location, 0, 0))
483+
.unwrap()
484+
.unwrap();
485+
assert!(parsed.file_type == LogPathFileType::Commit);
486+
parsed
487+
}
488+
489+
pub(crate) fn create_parsed_staged_commit(table_root: &Url, version: Version) -> Self {
490+
let uuid = Uuid::new_v4();
491+
let filename = format!("{version:020}.{uuid}.json");
492+
let location = table_root
493+
.join(DELTA_LOG_DIR_WITH_SLASH)
494+
.unwrap()
495+
.join(STAGED_COMMITS_DIR)
496+
.unwrap()
497+
.join(&filename)
498+
.unwrap();
499+
let parsed = ParsedLogPath::try_from(FileMeta::new(location, 0, 0))
500+
.unwrap()
501+
.unwrap();
502+
assert!(parsed.file_type == LogPathFileType::StagedCommit);
503+
parsed
504+
}
505+
}
506+
474507
fn table_root_dir_url() -> Url {
475508
let path = PathBuf::from("./tests/data/table-with-dv-small/");
476509
let path = std::fs::canonicalize(path).unwrap();

0 commit comments

Comments
 (0)