Skip to content

Commit e601072

Browse files
authored
feat: retry merge_insert when possible (#3614)
Part of #3397 * Pull out `Backoff` utility into separate struct. * Set default backoff to `50ms, 100ms, 200ms, 400ms` (previously started at 100ms) * Changed `Transaction::conflicts_with()` to return an enum that differentiates Retryable and non-retryable conflicts. * Made `merge_insert` retry on retry-able conflicts up to 10 times. * After 10 attempts, will now return a `TooMuchContention` error. * Added a spill utility that allows replaying the same stream multiple times. * Reimplemented `background_iterator` so that it preserves `size_hint()`. * Simplified error message of `CommitConflict` so it's easier to see which operations conflicted.
1 parent aa08d74 commit e601072

17 files changed

Lines changed: 1824 additions & 160 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/lance-arrow/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod floats;
3030
pub use floats::*;
3131
pub mod cast;
3232
pub mod list;
33+
pub mod memory;
3334

3435
type Result<T> = std::result::Result<T, ArrowError>;
3536

rust/lance-arrow/src/memory.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
use std::collections::HashSet;
5+
6+
use arrow_array::{Array, RecordBatch};
7+
use arrow_data::ArrayData;
8+
9+
/// Counts memory used by buffers of Arrow arrays and RecordBatches.
10+
///
11+
/// This is meant to capture how much memory is being used by the Arrow data
12+
/// structures as they are. It does not represent the memory used if the data
13+
/// were to be serialized and then deserialized. In particular:
14+
///
15+
/// * This does not double count memory used by buffers shared by multiple
16+
/// arrays or batches. Round-tripped data may use more memory because of this.
17+
/// * This counts the **total** size of the buffers, even if the array is a slice.
18+
/// Round-tripped data may use less memory because of this.
19+
#[derive(Default)]
20+
pub struct MemoryAccumulator {
21+
seen: HashSet<usize>,
22+
total: usize,
23+
}
24+
25+
impl MemoryAccumulator {
26+
pub fn record_array(&mut self, array: &dyn Array) {
27+
let data = array.to_data();
28+
self.record_array_data(&data);
29+
}
30+
31+
fn record_array_data(&mut self, data: &ArrayData) {
32+
for buffer in data.buffers() {
33+
let ptr = buffer.as_ptr();
34+
if self.seen.insert(ptr as usize) {
35+
self.total += buffer.capacity();
36+
}
37+
}
38+
39+
if let Some(nulls) = data.nulls() {
40+
let null_buf = nulls.inner().inner();
41+
let ptr = null_buf.as_ptr();
42+
if self.seen.insert(ptr as usize) {
43+
self.total += null_buf.capacity();
44+
}
45+
}
46+
47+
for child in data.child_data() {
48+
self.record_array_data(child);
49+
}
50+
}
51+
52+
pub fn record_batch(&mut self, batch: &RecordBatch) {
53+
for array in batch.columns() {
54+
self.record_array(array);
55+
}
56+
}
57+
58+
pub fn total(&self) -> usize {
59+
self.total
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use std::sync::Arc;
66+
67+
use arrow_array::Int32Array;
68+
use arrow_schema::{DataType, Field, Schema};
69+
70+
use super::*;
71+
72+
#[test]
73+
fn test_memory_accumulator() {
74+
let batch = RecordBatch::try_new(
75+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
76+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
77+
)
78+
.unwrap();
79+
let slice = batch.slice(1, 2);
80+
81+
let mut acc = MemoryAccumulator::default();
82+
83+
// Should record whole buffer, not just slice
84+
acc.record_batch(&slice);
85+
assert_eq!(acc.total(), 3 * std::mem::size_of::<i32>());
86+
87+
// Should not double count
88+
acc.record_batch(&slice);
89+
assert_eq!(acc.total(), 3 * std::mem::size_of::<i32>());
90+
}
91+
}

rust/lance-core/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ pub enum Error {
5151
source: BoxedError,
5252
location: Location,
5353
},
54+
#[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
55+
RetryableCommitConflict {
56+
version: u64,
57+
source: BoxedError,
58+
location: Location,
59+
},
60+
#[snafu(display("Too many concurrent writers. {message}, {location}"))]
61+
TooMuchWriteContention { message: String, location: Location },
5462
#[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
5563
Internal { message: String, location: Location },
5664
#[snafu(display("A prerequisite task failed: {message}, {location}"))]

rust/lance-core/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

44
pub mod address;
5+
pub mod backoff;
56
pub mod bit;
67
pub mod cpu;
78
pub mod deletion;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use rand::Rng;
2+
use std::time::Duration;
3+
4+
// SPDX-License-Identifier: Apache-2.0
5+
// SPDX-FileCopyrightText: Copyright The Lance Authors
6+
7+
/// Computes backoff as
8+
///
9+
/// ```text
10+
/// backoff = base^attempt * unit + jitter
11+
/// ```
12+
///
13+
/// The defaults are base=2, unit=50ms, jitter=50ms, min=0ms, max=5s. This gives
14+
/// a backoff of 50ms, 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, 5s, (not including jitter).
15+
///
16+
/// You can have non-exponential backoff by setting base=1.
17+
pub struct Backoff {
18+
base: u32,
19+
unit: u32,
20+
jitter: i32,
21+
min: u32,
22+
max: u32,
23+
attempt: u32,
24+
}
25+
26+
impl Default for Backoff {
27+
fn default() -> Self {
28+
Self {
29+
base: 2,
30+
unit: 50,
31+
jitter: 50,
32+
min: 0,
33+
max: 5000,
34+
attempt: 0,
35+
}
36+
}
37+
}
38+
39+
impl Backoff {
40+
pub fn with_base(self, base: u32) -> Self {
41+
Self { base, ..self }
42+
}
43+
44+
pub fn with_jitter(self, jitter: i32) -> Self {
45+
Self { jitter, ..self }
46+
}
47+
48+
pub fn with_min(self, min: u32) -> Self {
49+
Self { min, ..self }
50+
}
51+
52+
pub fn with_max(self, max: u32) -> Self {
53+
Self { max, ..self }
54+
}
55+
56+
pub fn next_backoff(&mut self) -> Duration {
57+
let backoff = self
58+
.base
59+
.saturating_pow(self.attempt)
60+
.saturating_mul(self.unit);
61+
let jitter = rand::thread_rng().gen_range(-self.jitter..=self.jitter);
62+
let backoff = (backoff.saturating_add_signed(jitter)).clamp(self.min, self.max);
63+
self.attempt += 1;
64+
Duration::from_millis(backoff as u64)
65+
}
66+
67+
pub fn attempt(&self) -> u32 {
68+
self.attempt
69+
}
70+
71+
pub fn reset(&mut self) {
72+
self.attempt = 0;
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use super::*;
79+
80+
#[test]
81+
fn test_backoff() {
82+
let mut backoff = Backoff::default().with_jitter(0);
83+
assert_eq!(backoff.next_backoff().as_millis(), 50);
84+
assert_eq!(backoff.attempt(), 1);
85+
assert_eq!(backoff.next_backoff().as_millis(), 100);
86+
assert_eq!(backoff.attempt(), 2);
87+
assert_eq!(backoff.next_backoff().as_millis(), 200);
88+
assert_eq!(backoff.attempt(), 3);
89+
assert_eq!(backoff.next_backoff().as_millis(), 400);
90+
assert_eq!(backoff.attempt(), 4);
91+
}
92+
}

rust/lance-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ lance-core = { workspace = true, features = ["datafusion"] }
2828
lance-datagen.workspace = true
2929
lazy_static.workspace = true
3030
log.workspace = true
31+
pin-project.workspace = true
3132
prost.workspace = true
3233
snafu.workspace = true
34+
tempfile.workspace = true
3335
tokio.workspace = true
3436
tracing.workspace = true
3537

rust/lance-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod expr;
99
pub mod logical_expr;
1010
pub mod planner;
1111
pub mod projection;
12+
pub mod spill;
1213
pub mod sql;
1314
#[cfg(feature = "substrait")]
1415
pub mod substrait;

0 commit comments

Comments
 (0)