-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathreader.rs
More file actions
238 lines (213 loc) · 8.43 KB
/
reader.rs
File metadata and controls
238 lines (213 loc) · 8.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use std::{io::SeekFrom, ops::RangeBounds};
use async_stream::try_stream;
use bytes::{Buf as _, Bytes};
use futures::Stream;
use log::{trace, warn};
use tokio::{
io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _},
task::spawn_blocking,
};
use tokio_util::io::SyncIoBridge;
use crate::{
commit,
error::source_chain,
index::IndexError,
repo::Repo,
segment::{self, seek_to_offset, CHECKSUM_LEN},
};
use super::{
common::{read_exact, AsyncRepo, CommitBuf},
RangeFromMaybeToInclusive,
};
/// Stream the `range` of transaction offsets from the commitlog in `repo` as
/// raw commitlog data.
///
/// The stream contains segment headers as they are encountered scanning the
/// `range`.
///
/// Only whole [`commit::StoredCommit`]s are yielded, so a `range` that doesn't
/// fall on commit boundaries may yield extra data.
///
/// Only the headers of the source commitlog are inspected (in order to be able
/// to satisfy the `range` predicate), so no guarantees are made about the
/// integrity of the log.
///
/// If the commitlog is empty, that is does not contain any commits, the
/// returned stream yields nothing.
pub fn commits<R>(repo: R, range: impl RangeBounds<u64>) -> impl Stream<Item = io::Result<Bytes>>
where
R: AsyncRepo + Send + 'static,
{
let mut range = RangeFromMaybeToInclusive::from_range_bounds(range);
let retain = move |segments: Vec<_>| retain_range(&segments, range);
try_stream! {
let segments = repo.existing_offsets().map(retain)?;
for segment_offset in segments {
if range.start < segment_offset {
range.start = segment_offset;
}
trace!("segment: segment={} start={}", segment_offset, range.start);
let segment = repo.open_segment_reader_async(segment_offset).await?;
for await chunk in read_segment(repo.clone(), segment, segment_offset, range) {
yield chunk.inspect_err(|e| warn!("error reading segment {}: {}", segment_offset, e))?;
}
}
}
}
fn read_segment(
repo: impl Repo + Send + 'static,
mut segment: impl AsyncBufRead + AsyncSeek + Unpin + Send + 'static,
segment_start: u64,
range: RangeFromMaybeToInclusive,
) -> impl Stream<Item = io::Result<Bytes>> {
try_stream! {
trace!("reading segment {segment_start}");
let (segment_header, segment_header_bytes) = {
let mut buf = [0u8; segment::Header::LEN];
segment.read_exact(&mut buf).await?;
let header = segment::Header::decode(&buf[..])?;
(header, Bytes::from_owner(buf))
};
let mut send_segment_header = Some(segment_header_bytes);
// Try to seek to the starting offset
// if it doesn't fall on the segment boundary.
if range.start > segment_start {
// Don't send a segment header if we're not reading from the start.
send_segment_header = None;
segment = spawn_blocking(move || {
let mut segment = SyncIoBridge::new(segment);
if let Ok(offset_index) = repo.get_offset_index(segment_start) {
trace!("seek_to_offset segment={} start={}", segment_start, range.start);
seek_to_offset(&mut segment, &offset_index, range.start)
.inspect_err(|e| match e {
IndexError::KeyNotFound =>
trace!(
"offset not found segment={} offset={}",
segment_start, range.start
),
e => {
warn!(
"error reading index segment={} offset={}: {} {}",
segment_start,
range.start,
e,
source_chain(&e)
)
}
})
.ok();
}
segment.into_inner()
})
.await
.unwrap();
}
let checksum_len = CHECKSUM_LEN[segment_header.checksum_algorithm as usize];
let mut commit_buf = CommitBuf::default();
loop {
if read_exact(&mut segment, &mut commit_buf.header).await?.is_eof() {
trace!("eof reading commit header");
break;
}
let Some(hdr) = commit::Header::decode(&commit_buf.header[..])? else {
warn!("all-zeroes commit header");
break;
};
// Skip the commit if we're not at `range.start`.
if hdr.min_tx_offset < range.start {
segment.seek(SeekFrom::Current(hdr.len as i64 + checksum_len as i64)).await?;
// Stop if we're past the range end.
} else if range.end.is_some_and(|end| hdr.min_tx_offset > end) {
break
} else {
commit_buf.body.resize(hdr.len as usize + checksum_len, 0);
segment.read_exact(&mut commit_buf.body).await?;
// Send segment header if not sent already.
if let Some(header_bytes) = send_segment_header.take() {
trace!("sending segment header");
yield header_bytes;
}
trace!("sending commit {}", hdr.min_tx_offset);
yield commit_buf.as_buf().copy_to_bytes(commit_buf.filled_len());
}
}
}
}
/// Given a list of (segment) offsets, retain those which fall into the `range`.
pub fn retain_range(offsets: &[u64], range: RangeFromMaybeToInclusive) -> Vec<u64> {
if range.is_empty() {
return vec![];
}
offsets
.iter()
.zip(offsets.iter().skip(1).chain([&u64::MAX]))
.filter_map(|(&start, &end)| {
let in_start = range.start >= start && range.start < end;
(in_start || range.contains(&start)).then_some(start)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
fn retain_range(offsets: &[u64], range: impl RangeBounds<u64>) -> Vec<u64> {
super::retain_range(offsets, RangeFromMaybeToInclusive::from_range_bounds(range))
}
#[test]
fn test_slice_segments_on_single_commit() {
let offsets = vec![0, 10];
let retained = retain_range(&offsets, 19..=19);
assert_eq!(&retained, &[10]);
}
#[test]
fn test_slice_segments_on_boundary() {
let offsets = vec![0, 10, 20, 30];
for (i, start) in offsets.iter().enumerate() {
let retained = retain_range(&offsets, start..);
assert_eq!(&retained, &offsets[i..]);
}
}
#[test]
fn test_slice_segments_between_boundary() {
let offsets = vec![0, 10, 20, 30];
let ranges = vec![5, 11, 29];
for (i, start) in ranges.into_iter().enumerate() {
let retained = retain_range(&offsets, start..);
assert_eq!(&retained, &offsets[i..]);
}
}
#[test]
fn test_slice_segments_with_upper_bound() {
let offsets = vec![0, 10, 20, 30];
let retained = retain_range(&offsets, 11..29);
assert_eq!(&retained, &[10, 20]);
}
proptest! {
#[test]
fn prop_offset_at_or_after_last_segment_yields_last(start in 30u64..) {
let offsets = vec![0, 10, 20, 30];
let retained = retain_range(&offsets, start..);
prop_assert_eq!(&retained, &[30]);
}
#[test]
fn prop_empty_input_gives_empty_output(start in any::<u64>()) {
let retained = retain_range(&[], start..);
prop_assert_eq!(&retained, &[] as &[u64]);
}
#[test]
fn prop_empty_range_retains_nothing(start in any::<u64>()) {
let offsets = vec![0, 10, 20, 30];
let range = start..start;
prop_assert!(range.is_empty(), "expected range to be empty: {range:?}");
let retained = retain_range(&offsets, range);
prop_assert_eq!(&retained, &[] as &[u64]);
}
#[test]
fn prop_offset_at_or_after_last_with_upper_bound_yields_last(start in 30u64..) {
let offsets = vec![0, 10, 20, 30];
let retained = retain_range(&offsets, start..start + 16);
prop_assert_eq!(&retained, &[30]);
}
}
}