Skip to content

Commit aac74b4

Browse files
zhangyue19921010YueZhang
andauthored
feat(cleanup): support rate limiter for cleanup operation (lance-format#6084)
Closes lance-format#3291 ``` stats = dataset.cleanup_old_versions( older_than=(datetime.now() - moment), delete_rate_limit=100.0 ) ``` --------- Co-authored-by: YueZhang <zhangyue.1010@bytedance.com>
1 parent d96455c commit aac74b4

8 files changed

Lines changed: 242 additions & 4 deletions

File tree

java/lance-jni/src/blocking_dataset.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2684,12 +2684,15 @@ fn inner_cleanup_with_policy<'local>(
26842684
})?
26852685
.unwrap_or(false);
26862686

2687+
let delete_rate_limit = env.get_optional_u64_from_method(&jpolicy, "getDeleteRateLimit")?;
2688+
26872689
let policy = CleanupPolicy {
26882690
before_timestamp,
26892691
before_version,
26902692
delete_unverified,
26912693
error_if_tagged_old_versions,
26922694
clean_referenced_branches,
2695+
delete_rate_limit,
26932696
};
26942697

26952698
let stats = {

java/src/main/java/org/lance/cleanup/CleanupPolicy.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@ public class CleanupPolicy {
2727
private final Optional<Boolean> deleteUnverified;
2828
private final Optional<Boolean> errorIfTaggedOldVersions;
2929
private final Optional<Boolean> cleanReferencedBranches;
30+
private final Optional<Long> deleteRateLimit;
3031

3132
private CleanupPolicy(
3233
Optional<Long> beforeTimestampMillis,
3334
Optional<Long> beforeVersion,
3435
Optional<Boolean> deleteUnverified,
3536
Optional<Boolean> errorIfTaggedOldVersions,
36-
Optional<Boolean> cleanReferencedBranches) {
37+
Optional<Boolean> cleanReferencedBranches,
38+
Optional<Long> deleteRateLimit) {
3739
this.beforeTimestampMillis = beforeTimestampMillis;
3840
this.beforeVersion = beforeVersion;
3941
this.deleteUnverified = deleteUnverified;
4042
this.errorIfTaggedOldVersions = errorIfTaggedOldVersions;
4143
this.cleanReferencedBranches = cleanReferencedBranches;
44+
this.deleteRateLimit = deleteRateLimit;
4245
}
4346

4447
public static Builder builder() {
@@ -65,13 +68,18 @@ public Optional<Boolean> getCleanReferencedBranches() {
6568
return cleanReferencedBranches;
6669
}
6770

71+
public Optional<Long> getDeleteRateLimit() {
72+
return deleteRateLimit;
73+
}
74+
6875
/** Builder for CleanupPolicy. */
6976
public static class Builder {
7077
private Optional<Long> beforeTimestampMillis = Optional.empty();
7178
private Optional<Long> beforeVersion = Optional.empty();
7279
private Optional<Boolean> deleteUnverified = Optional.empty();
7380
private Optional<Boolean> errorIfTaggedOldVersions = Optional.empty();
7481
private Optional<Boolean> cleanReferencedBranches = Optional.empty();
82+
private Optional<Long> deleteRateLimit = Optional.empty();
7583

7684
private Builder() {}
7785

@@ -105,13 +113,20 @@ public Builder withCleanReferencedBranches(boolean cleanReferencedBranches) {
105113
return this;
106114
}
107115

116+
/** Set the maximum number of delete operations per second. */
117+
public Builder withDeleteRateLimit(long deleteRateLimit) {
118+
this.deleteRateLimit = Optional.of(deleteRateLimit);
119+
return this;
120+
}
121+
108122
public CleanupPolicy build() {
109123
return new CleanupPolicy(
110124
beforeTimestampMillis,
111125
beforeVersion,
112126
deleteUnverified,
113127
errorIfTaggedOldVersions,
114-
cleanReferencedBranches);
128+
cleanReferencedBranches,
129+
deleteRateLimit);
115130
}
116131
}
117132
}

java/src/test/java/org/lance/CleanupTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import org.junit.jupiter.api.io.TempDir;
2323

2424
import java.nio.file.Path;
25+
import java.time.Duration;
26+
import java.util.List;
2527

2628
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertTrue;
2730

2831
public class CleanupTest {
2932
@Test
@@ -114,4 +117,35 @@ public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception {
114117
}
115118
}
116119
}
120+
121+
@Test
122+
public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception {
123+
String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString();
124+
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
125+
TestUtils.SimpleTestDataset testDataset =
126+
new TestUtils.SimpleTestDataset(allocator, datasetPath);
127+
128+
testDataset.createEmptyDataset().close();
129+
testDataset.write(1, 100).close();
130+
testDataset.write(2, 100).close();
131+
try (Dataset dataset = testDataset.write(3, 100)) {
132+
List<Version> versions = dataset.listVersions();
133+
assertEquals(4, versions.size());
134+
long beforeTimestampMillis =
135+
versions.get(versions.size() - 1).getDataTime().toInstant().toEpochMilli() + 1;
136+
long start = System.nanoTime();
137+
RemovalStats stats =
138+
dataset.cleanupWithPolicy(
139+
CleanupPolicy.builder()
140+
.withBeforeTimestampMillis(beforeTimestampMillis)
141+
.withDeleteRateLimit(1L)
142+
.build());
143+
long elapsed = System.nanoTime() - start;
144+
145+
assertEquals(3L, stats.getOldVersions());
146+
assertTrue(stats.getBytesRemoved() > 0);
147+
assertTrue(elapsed >= Duration.ofSeconds(2).toNanos());
148+
}
149+
}
150+
}
117151
}

python/python/lance/dataset.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2419,6 +2419,7 @@ def cleanup_old_versions(
24192419
*,
24202420
delete_unverified: bool = False,
24212421
error_if_tagged_old_versions: bool = True,
2422+
delete_rate_limit: Optional[int] = None,
24222423
) -> CleanupStats:
24232424
"""
24242425
Cleans up old versions of the dataset.
@@ -2458,6 +2459,12 @@ def cleanup_old_versions(
24582459
tagged versions match the parameters. Otherwise, tagged versions will
24592460
be ignored without any error and only untagged versions will be
24602461
cleaned up.
2462+
2463+
delete_rate_limit: int, optional
2464+
Maximum number of delete operations per second. When not set (default),
2465+
deletions run at full speed. Set this to a positive integer to avoid
2466+
hitting object store request rate limits (e.g. S3 HTTP 503 SlowDown).
2467+
For example, ``delete_rate_limit=100`` limits to 100 operations/second.
24612468
"""
24622469
if older_than is None and retain_versions is None:
24632470
older_than = timedelta(days=14)
@@ -2467,6 +2474,7 @@ def cleanup_old_versions(
24672474
retain_versions,
24682475
delete_unverified,
24692476
error_if_tagged_old_versions,
2477+
delete_rate_limit,
24702478
)
24712479

24722480
def create_scalar_index(

python/python/lance/lance/__init__.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ class _Dataset:
317317
older_than_micros: int,
318318
delete_unverified: Optional[bool] = None,
319319
error_if_tagged_old_versions: Optional[bool] = None,
320+
delete_rate_limit: Optional[int] = None,
320321
) -> CleanupStats: ...
321322
def get_version(self, tag: str) -> int: ...
322323
# Tag operations

python/python/tests/test_dataset.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,6 +1529,36 @@ def test_enable_disable_auto_cleanup(tmp_path):
15291529
assert len(ds.versions()) == 7
15301530

15311531

1532+
def test_cleanup_with_rate_limit(tmp_path):
1533+
"""Test that cleanup_old_versions works with delete_rate_limit parameter."""
1534+
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
1535+
base_dir = tmp_path / "test"
1536+
1537+
lance.write_dataset(table, base_dir, mode="create")
1538+
lance.write_dataset(table, base_dir, mode="overwrite")
1539+
lance.write_dataset(table, base_dir, mode="overwrite")
1540+
lance.write_dataset(table, base_dir, mode="overwrite")
1541+
1542+
dataset = lance.dataset(base_dir)
1543+
latest_version_timestamp = dataset.versions()[-1]["timestamp"]
1544+
now = (
1545+
datetime.now(latest_version_timestamp.tzinfo)
1546+
if latest_version_timestamp.tzinfo is not None
1547+
else datetime.now()
1548+
)
1549+
1550+
start = time.time_ns()
1551+
# Cleanup with a rate limit should still remove old versions correctly
1552+
stats = dataset.cleanup_old_versions(
1553+
older_than=(now - latest_version_timestamp), delete_rate_limit=1
1554+
)
1555+
finished = time.time_ns()
1556+
1557+
assert stats.old_versions == 3
1558+
assert stats.bytes_removed > 0
1559+
assert (finished - start) >= 2_000_000_000 # 2s
1560+
1561+
15321562
def test_create_from_commit(tmp_path: Path):
15331563
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
15341564
base_dir = tmp_path / "test"

python/src/dataset.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1558,13 +1558,14 @@ impl Dataset {
15581558
}
15591559

15601560
/// Cleanup old versions from the dataset
1561-
#[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))]
1561+
#[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None, delete_rate_limit = None))]
15621562
fn cleanup_old_versions(
15631563
&self,
15641564
older_than_micros: Option<i64>,
15651565
retain_versions: Option<usize>,
15661566
delete_unverified: Option<bool>,
15671567
error_if_tagged_old_versions: Option<bool>,
1568+
delete_rate_limit: Option<u64>,
15681569
) -> PyResult<CleanupStats> {
15691570
let cleanup_stats = rt()
15701571
.block_on(None, async {
@@ -1582,6 +1583,9 @@ impl Dataset {
15821583
if let Some(v) = error_if_tagged_old_versions {
15831584
builder = builder.error_if_tagged_old_versions(v);
15841585
}
1586+
if let Some(v) = delete_rate_limit {
1587+
builder = builder.delete_rate_limit(v)?;
1588+
}
15851589

15861590
self.ds.cleanup_with_policy(builder.build()).await
15871591
})?

0 commit comments

Comments
 (0)