Skip to content

Commit 074f2de

Browse files
committed
Use new checkpoint if it exists even if version didn't change to trim the LogSegments from memory
1 parent 0cb7d29 commit 074f2de

1 file changed

Lines changed: 109 additions & 6 deletions

File tree

kernel/src/snapshot.rs

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,12 @@ impl Snapshot {
230230
return Err(Error::Generic(format!(
231231
"Unexpected state: The newest version in the log {new_end_version} is older than the old version {old_version}")));
232232
}
233-
if new_end_version == old_version {
234-
// No new commits, just return the same snapshot
235-
return Ok(existing_snapshot.clone());
236-
}
237-
238233
if new_log_segment.checkpoint_version.is_some() {
239-
// we have a checkpoint in the new LogSegment, just construct a new snapshot from that
234+
// We have a checkpoint in the new LogSegment, just construct a new snapshot from that.
235+
// This must be checked before the version equality check below, because a sole writer
236+
// can write a checkpoint at the current version. In that case, new_end_version ==
237+
// old_version, but we still need to rebase onto the new checkpoint to trim the
238+
// accumulated commit files in the LogSegment.
240239
let snapshot = Self::try_new_from_log_segment_impl(
241240
existing_snapshot.table_root().clone(),
242241
new_log_segment,
@@ -246,6 +245,11 @@ impl Snapshot {
246245
return Ok(Arc::new(snapshot?));
247246
}
248247

248+
if new_end_version == old_version {
249+
// No new commits and no new checkpoint, just return the same snapshot
250+
return Ok(existing_snapshot.clone());
251+
}
252+
249253
// after this point, we incrementally update the snapshot with the new log segment.
250254
// first we remove the 'overlap' in commits, example:
251255
//
@@ -2439,4 +2443,103 @@ mod tests {
24392443

24402444
Ok(())
24412445
}
2446+
2447+
/// Helper: write a minimal checkpoint parquet file at the given version. The checkpoint
2448+
/// contains the protocol, metadata, and add actions needed for a valid table state.
2449+
async fn write_checkpoint(
2450+
store: &InMemory,
2451+
engine: &dyn Engine,
2452+
version: u64,
2453+
) -> DeltaResult<()> {
2454+
let checkpoint_actions = vec![
2455+
json!({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}}),
2456+
json!({
2457+
"metaData": {
2458+
"id": "test-id",
2459+
"format": {"provider": "parquet", "options": {}},
2460+
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
2461+
"partitionColumns": [],
2462+
"configuration": {},
2463+
"createdTime": 1587968585495i64
2464+
}
2465+
}),
2466+
json!({"add": {"path": "file1.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1000, "dataChange": true}}),
2467+
];
2468+
let json_strings: StringArray = checkpoint_actions
2469+
.into_iter()
2470+
.map(|json| json.to_string())
2471+
.collect::<Vec<_>>()
2472+
.into();
2473+
let parsed = engine
2474+
.json_handler()
2475+
.parse_json(
2476+
string_array_to_engine_data(json_strings),
2477+
crate::actions::get_commit_schema().clone(),
2478+
)
2479+
.unwrap();
2480+
let checkpoint = ArrowEngineData::try_from_engine_data(parsed).unwrap();
2481+
let checkpoint: RecordBatch = checkpoint.into();
2482+
2483+
let mut buffer = vec![];
2484+
let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?;
2485+
writer.write(&checkpoint)?;
2486+
writer.close()?;
2487+
2488+
store
2489+
.put(
2490+
&delta_path_for_version(version, "checkpoint.parquet"),
2491+
buffer.into(),
2492+
)
2493+
.await?;
2494+
Ok(())
2495+
}
2496+
2497+
/// When a sole writer commits version N, caches the snapshot, then writes a checkpoint at N,
2498+
/// the next call to `builder_from(snapshot_at_N).build()` should rebase onto the new
2499+
/// checkpoint even though the version hasn't changed. Before the fix, the version equality
2500+
/// check short-circuited before the checkpoint rebase check, causing commit files to
2501+
/// accumulate unboundedly.
2502+
#[tokio::test]
2503+
async fn test_builder_from_rebases_onto_checkpoint_at_same_version() -> DeltaResult<()> {
2504+
let store = Arc::new(InMemory::new());
2505+
let table_root = "memory:///";
2506+
let engine = DefaultEngineBuilder::new(store.clone()).build();
2507+
2508+
// Create a table with commits 0 through 5 (no checkpoint)
2509+
setup_test_table_with_commits(table_root, &store, 6).await?;
2510+
2511+
// Build a snapshot at version 5. This accumulates all commit files.
2512+
let snapshot_v5 = Snapshot::builder_for(table_root)
2513+
.at_version(5)
2514+
.build(&engine)?;
2515+
assert_eq!(snapshot_v5.version(), 5);
2516+
assert!(snapshot_v5.log_segment().checkpoint_version.is_none());
2517+
assert!(
2518+
!snapshot_v5
2519+
.log_segment()
2520+
.listed
2521+
.ascending_commit_files
2522+
.is_empty(),
2523+
"should have accumulated commit files"
2524+
);
2525+
2526+
// Write a checkpoint at version 5 (simulates what a sole writer does after committing)
2527+
write_checkpoint(&store, &engine, 5).await?;
2528+
2529+
// Rebuild from the existing snapshot. Version is still 5, but a checkpoint now exists.
2530+
let rebased = Snapshot::builder_from(snapshot_v5).build(&engine)?;
2531+
2532+
assert_eq!(rebased.version(), 5);
2533+
assert_eq!(rebased.log_segment().checkpoint_version, Some(5));
2534+
assert!(
2535+
rebased
2536+
.log_segment()
2537+
.listed
2538+
.ascending_commit_files
2539+
.is_empty(),
2540+
"commit files should be trimmed after rebasing onto the checkpoint"
2541+
);
2542+
2543+
Ok(())
2544+
}
24422545
}

0 commit comments

Comments
 (0)