Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 129 additions & 32 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,35 @@ pub fn get_bit(data: &[u8], i: usize) -> bool {
(data[i >> 3] & BIT_MASK[i & 7]) != 0
}

/// Utility class for writing bit/byte streams. This class can write data in either
/// bit packed or byte aligned fashion.
/// Writes bit packed values to an in-memory buffer.
///
/// `BitWriter` is the dual of [`BitReader`] and can write values that are either
/// byte aligned or packed at arbitrary bit widths. It is primarily used by the
/// Parquet RLE/bit-packing hybrid encoder.
///
/// Bit-packed values are appended to an internal buffer in
/// little-endian bit order: the first value written occupies the
/// least-significant bits of the first byte. Bits that have not yet filled a
/// whole byte are held in an internal accumulator until a byte-aligning
/// operation (such as [`BitWriter::flush`], [`BitWriter::put_aligned`], or
/// [`BitWriter::consume`]) is called.
///
/// Use [`BitWriter::consume`] to take ownership of the underlying buffer once
/// writing is complete.
///
/// [`BitReader`]: crate::util::bit_util::BitReader
pub struct BitWriter {
/// Output Buffer
buffer: Vec<u8>,
/// Accumulator for in progress values
buffered_values: u64,
/// Current write offset within `buffered_values`
bit_offset: u8,
}

impl BitWriter {
/// Creates a new [`BitWriter`] backed by an internal buffer of the given
/// initial capacity.
pub fn new(initial_capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(initial_capacity),
Expand All @@ -257,7 +277,9 @@ impl BitWriter {
}
}

/// Initializes the writer appending to the existing buffer `buffer`
/// Creates a new [`BitWriter`] that appends to the existing `buffer`.
///
/// Data written with this writer are appended after existing values.
pub fn new_from_buf(buffer: Vec<u8>) -> Self {
Self {
buffer,
Expand All @@ -266,37 +288,50 @@ impl BitWriter {
}
}

/// Consumes and returns the current buffer.
/// Flushes any buffered bits to a byte boundary, then consumes this
/// writer and returns the underlying buffer.
#[inline]
pub fn consume(mut self) -> Vec<u8> {
self.flush();
self.buffer
}

/// Flushes the internal buffered bits and returns the buffer's content.
/// This is a borrow equivalent of `consume` method.
/// Flushes any buffered bits to a byte boundary and returns a borrowed
/// view of the buffer's contents.
///
/// This is the borrowing equivalent of [`BitWriter::consume`]. The writer
/// can continue to be used after this call.
#[inline]
pub fn flush_buffer(&mut self) -> &[u8] {
self.flush();
self.buffer()
}

/// Like `flush_buffer`, but returns mutable access to the buffer.
/// Like [`BitWriter::flush_buffer`], but returns mutable access to the
/// buffer.
#[inline]
pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
self.flush();
&mut self.buffer
}

/// Clears the internal state so the buffer can be reused.
/// Clears the internal state.
///
/// Truncates the underlying buffer to length 0 (preserving its capacity)
/// and resets the bit accumulator.
#[inline]
pub fn clear(&mut self) {
self.buffer.clear();
self.buffered_values = 0;
self.bit_offset = 0;
}

/// Flushes the internal buffered bits and the align the buffer to the next byte.
/// Flushes any buffered bits and aligns the writer to the next byte
/// boundary.
///
/// Any partial byte currently held in the bit accumulator is appended to
/// the buffer, and the accumulator is reset. Subsequent writes will start
/// at a byte boundary.
#[inline]
pub fn flush(&mut self) {
let num_bytes = ceil(self.bit_offset, 8);
Expand All @@ -306,10 +341,14 @@ impl BitWriter {
self.bit_offset = 0;
}

/// Advances the current offset by skipping `num_bytes`, flushing the internal bit
/// buffer first.
/// This is useful when you want to jump over `num_bytes` bytes and come back later
/// to fill these bytes.
/// Reserves `num_bytes` bytes of zero-filled space at the current
/// position and returns the byte offset of the start of that region.
///
/// Internally flushes any buffered bits first so the reservation begins
/// at a byte boundary. Use the returned offset together with
/// [`BitWriter::write_at`] or [`BitWriter::put_aligned_offset`] to fill
/// in the reserved bytes once their contents are known (for example, a
/// length prefix that depends on subsequently encoded data).
#[inline]
pub fn skip(&mut self, num_bytes: usize) -> usize {
self.flush();
Expand All @@ -318,38 +357,67 @@ impl BitWriter {
result
}

/// Returns a slice containing the next `num_bytes` bytes starting from the current
/// offset, and advances the underlying buffer by `num_bytes`.
/// This is useful when you want to jump over `num_bytes` bytes and come back later
/// to fill these bytes.
/// Reserves `num_bytes` bytes at the current position and returns a
/// mutable slice over them.
///
/// Equivalent to [`BitWriter::skip`], but returns the reserved region
/// directly so it can be written into. Useful for filling in a header
/// (such as a length prefix) once the size of the following payload is
/// known.
#[inline]
pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> &mut [u8] {
let offset = self.skip(num_bytes);
&mut self.buffer[offset..offset + num_bytes]
}

/// Returns the total number of bytes written so far, including any
/// partial byte still held in the bit accumulator (rounded up).
#[inline]
pub fn bytes_written(&self) -> usize {
self.buffer.len() + ceil(self.bit_offset, 8) as usize
}

/// Returns a borrowed view of the bytes that have been flushed to the
/// underlying buffer so far.
///
/// Note that bits currently held in the bit accumulator (i.e. not yet
/// flushed to a byte boundary) are not included. Use
/// [`BitWriter::flush_buffer`] to also flush pending bits before reading.
#[inline]
pub fn buffer(&self) -> &[u8] {
&self.buffer
}

/// Returns the current offset within the output buffer.
///
/// This is the index of the next byte that a byte-aligned write would
/// land in (excluding any bits currently held in the bit accumulator).
#[inline]
pub fn byte_offset(&self) -> usize {
self.buffer.len()
}

/// Writes the entire byte `value` at the byte `offset`
/// Overwrites the byte at position `offset` in the underlying buffer
/// with `value`.
///
/// Typically used together with [`BitWriter::skip`] or
/// [`BitWriter::get_next_byte_ptr`] to back-fill a previously reserved
/// byte once its value is known.
///
/// # Panics
///
/// Panics if `offset` is out of bounds for the underlying buffer.
pub fn write_at(&mut self, offset: usize, value: u8) {
self.buffer[offset] = value;
}

/// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer.
/// The `num_bits` must not be greater than 64. This is bit packed.
/// Writes the `num_bits` least-significant bits of `v` to the writer in
/// bit-packed form.
///
/// Values are packed in little-endian bit order: this call appends
/// `num_bits` bits starting at the current bit position.
///
/// `num_bits` must be no larger than 64.
#[inline]
pub fn put_value(&mut self, v: u64, num_bits: usize) {
debug_assert!(num_bits <= 64);
Expand All @@ -372,8 +440,12 @@ impl BitWriter {
}
}

/// Writes `val` of `num_bytes` bytes to the next aligned byte. If size of `T` is
/// larger than `num_bytes`, extra higher ordered bytes will be ignored.
/// Writes the first `num_bytes` little-endian bytes of `val` to the
/// writer at the next byte boundary.
///
/// Any buffered bits are first flushed so the value is byte-aligned in
/// the output. If `T` is wider than `num_bytes`, the high-order bytes
/// are silently truncated.
#[inline]
pub fn put_aligned<T: AsBytes>(&mut self, val: T, num_bytes: usize) {
self.flush();
Expand All @@ -382,19 +454,34 @@ impl BitWriter {
self.buffer.extend_from_slice(&slice[..len]);
}

/// Writes `val` of `num_bytes` bytes at the designated `offset`. The `offset` is the
/// offset starting from the beginning of the internal buffer that this writer
/// maintains. Note that this will overwrite any existing data between `offset` and
/// `offset + num_bytes`. Also that if size of `T` is larger than `num_bytes`, extra
/// higher ordered bytes will be ignored.
/// Writes the first `num_bytes` little-endian bytes of `val` at the
/// given `offset` in the underlying buffer, overwriting any existing
/// data in `offset..offset + num_bytes`.
///
/// `offset` is measured from the start of the internal buffer. If `T`
/// is wider than `num_bytes`, the high-order bytes are silently
/// truncated.
///
/// Typically used together with [`BitWriter::skip`] to back-fill a
/// previously reserved region once its contents are known.
///
/// # Panics
///
/// Panics if `offset + min(size_of::<T>(), num_bytes)` is out of bounds
/// for the underlying buffer.
#[inline]
pub fn put_aligned_offset<T: AsBytes>(&mut self, val: T, num_bytes: usize, offset: usize) {
let slice = val.as_bytes();
let len = num_bytes.min(slice.len());
self.buffer[offset..offset + len].copy_from_slice(&slice[..len])
}

/// Writes a VLQ encoded integer `v` to this buffer. The value is byte aligned.
/// Writes `v` to the buffer in VLQ (variable-length quantity) encoding,
/// in little-endian byte order.
///
/// Any buffered bits are first flushed so the encoding starts at a byte
/// boundary. The encoded form is between 1 and [`MAX_VLQ_BYTE_LEN`]
/// bytes long, depending on the magnitude of `v`.
#[inline]
pub fn put_vlq_int(&mut self, mut v: u64) {
while v & 0xFFFFFFFFFFFFFF80 != 0 {
Expand All @@ -404,17 +491,27 @@ impl BitWriter {
self.put_aligned::<u8>((v & 0x7F) as u8, 1);
}

/// Writes a zigzag-VLQ encoded (in little endian order) int `v` to this buffer.
/// Writes `v` to the buffer in zigzag-VLQ encoding, in little-endian
/// byte order.
///
/// Zigzag-VLQ is a variant of VLQ encoding where negative and positive
/// numbers are encoded in a zigzag fashion.
/// See: https://developers.google.com/protocol-buffers/docs/encoding
/// numbers are interleaved so that small absolute values produce short
/// encodings regardless of sign. See the [Protocol Buffers encoding
/// documentation](https://developers.google.com/protocol-buffers/docs/encoding)
/// for details.
///
/// As with [`BitWriter::put_vlq_int`], any buffered bits are first
/// flushed so the encoding starts at a byte boundary.
#[inline]
pub fn put_zigzag_vlq_int(&mut self, v: i64) {
let u: u64 = ((v << 1) ^ (v >> 63)) as u64;
self.put_vlq_int(u)
}

/// Returns an estimate of the memory used, in bytes
/// Returns an estimate of the heap memory used by this writer, in bytes.
///
/// This reflects the capacity of the underlying buffer rather than the
/// number of bytes actually written.
pub fn estimated_memory_size(&self) -> usize {
self.buffer.capacity() * size_of::<u8>()
}
Expand Down
Loading