Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,6 @@ pub(crate) struct VidBatcher {
}

impl VidBatcher {
fn histogram_bounds(
conn: &mut PgConnection,
nsp: &Namespace,
table: &Table,
range: VidRange,
) -> Result<Vec<i64>, StoreError> {
let bounds = catalog::histogram_bounds(conn, nsp, &table.name, VID_COLUMN)?
.into_iter()
.filter(|bound| range.min < *bound && range.max > *bound)
.chain(vec![range.min, range.max].into_iter())
.collect::<Vec<_>>();
Ok(bounds)
}

/// Initialize a batcher for batching through entries in `table` with
/// `vid` in the given `vid_range`
///
Expand All @@ -138,7 +124,7 @@ impl VidBatcher {
table: &Table,
vid_range: VidRange,
) -> Result<Self, StoreError> {
let bounds = Self::histogram_bounds(conn, nsp, table, vid_range)?;
let bounds = catalog::histogram_bounds(conn, nsp, &table.name, VID_COLUMN)?;
let batch_size = AdaptiveBatchSize::new(table);
Self::new(bounds, vid_range, batch_size)
}
Expand All @@ -150,6 +136,26 @@ impl VidBatcher {
) -> Result<Self, StoreError> {
let start = range.min;

let bounds = {
// Keep only histogram bounds that are relevent for the range
let mut bounds = bounds
.into_iter()
.filter(|bound| range.min <= *bound && range.max >= *bound)
.collect::<Vec<_>>();
// The first and last entry in `bounds` are Postgres' estimates
// of the min and max `vid` values in the table. We use the
// actual min and max `vid` values from the `vid_range` instead
let len = bounds.len();
if len > 1 {
bounds[0] = range.min;
bounds[len - 1] = range.max;
} else {
// If Postgres doesn't have a histogram, just use one bucket
// from min to max
bounds = vec![range.min, range.max];
}
bounds
};
let mut ogive = if range.is_empty() {
None
} else {
Expand Down Expand Up @@ -371,6 +377,17 @@ mod tests {
}
}

impl std::fmt::Debug for Batcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Batcher")
.field("start", &self.vid.start)
.field("end", &self.vid.end)
.field("size", &self.vid.batch_size.size)
.field("duration", &self.vid.batch_size.target.as_secs())
.finish()
}
}

#[test]
fn simple() {
let bounds = vec![10, 20, 30, 40, 49];
Expand Down Expand Up @@ -422,4 +439,23 @@ mod tests {
batcher.at(360, 359, 80);
batcher.step(360, 359, S010);
}

#[test]
fn vid_batcher_adjusts_bounds() {
// The first and last entry in `bounds` are estimats of the min and
// max that are slightly off compared to the actual min and max we
// put in `vid_range`. Check that `VidBatcher` uses the actual min
// and max from `vid_range`.
let bounds = vec![639, 20_000, 40_000, 60_000, 80_000, 90_000];
let vid_range = VidRange::new(1, 100_000);
let batch_size = AdaptiveBatchSize {
size: 1000,
target: S100,
};

let vid_batcher = VidBatcher::new(bounds, vid_range, batch_size).unwrap();
let ogive = vid_batcher.ogive.as_ref().unwrap();
assert_eq!(1, ogive.start());
assert_eq!(100_000, ogive.end());
}
}
Loading