Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ impl Header {
match &mut hdr.as_slice() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;
let min_tx_offset = buf.get_u64().map_err(|e| decode_header_error(e, "min_tx_offset"))?;
let n = buf.get_u16().map_err(|e| decode_header_error(e, "n"))?;
let len = buf.get_u32().map_err(|e| decode_header_error(e, "len"))?;

Ok(Some(Self {
min_tx_offset,
Expand All @@ -88,15 +88,15 @@ impl Header {
return Ok(None);
}

return Err(e);
return Err(io::Error::new(e.kind(), format!("error reading commit header: {e}")));
}
match &mut hdr.as_slice() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let epoch = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;
let min_tx_offset = buf.get_u64().map_err(|e| decode_header_error(e, "min_tx_offset"))?;
let epoch = buf.get_u64().map_err(|e| decode_header_error(e, "epoch"))?;
let n = buf.get_u16().map_err(|e| decode_header_error(e, "n"))?;
let len = buf.get_u32().map_err(|e| decode_header_error(e, "len"))?;

Ok(Some(Self {
min_tx_offset,
Expand Down Expand Up @@ -310,10 +310,16 @@ impl StoredCommit {
return Ok(None);
};
let mut records = vec![0; hdr.len as usize];
reader.read_exact(&mut records)?;
reader.read_exact(&mut records).map_err(|e| {
io::Error::new(
e.kind(),
format!("failed to read {} bytes of commit payload: {}", hdr.len, e),
)
})?;

let chk = reader.crc32c();
let crc = decode_u32(reader.into_inner())?;
let crc = decode_u32(reader.into_inner())
.map_err(|e| io::Error::new(e.kind(), format!("failed to read checksum: {e}")))?;

if chk != crc {
return Err(invalid_data(ChecksumMismatch));
Expand Down Expand Up @@ -385,8 +391,8 @@ fn decode_u32<R: Read>(mut read: R) -> io::Result<u32> {
Ok(u32::from_le_bytes(buf))
}

fn decode_error(e: DecodeError) -> io::Error {
invalid_data(e)
fn decode_header_error(e: DecodeError, field: &str) -> io::Error {
invalid_data(format!("failed to decode commit header field '{field}': {e}"))
}

fn invalid_data<E>(e: E) -> io::Error
Expand Down
Loading