Skip to content

Commit be1054d

Browse files
guilloadPSeitz
andauthored
Make Storage::get_slice zero copy for single-segment bodies (#6336)
Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
1 parent 3f0bd88 commit be1054d

8 files changed

Lines changed: 144 additions & 57 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ serde_yaml = "0.9"
240240
serial_test = { version = "3.2", features = ["file_locks"] }
241241
sha2 = "0.11"
242242
siphasher = "1.0"
243+
stable_deref_trait = "1.2"
243244
stateright = "0.31"
244245
storekey = { version = "0.11", default-features = false }
245246
smallvec = "1"

quickwit/quickwit-storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ quick_cache = { workspace = true }
2929
regex = { workspace = true }
3030
serde = { workspace = true }
3131
serde_json = { workspace = true }
32+
stable_deref_trait = { workspace = true }
3233
tantivy = { workspace = true }
3334
tempfile = { workspace = true }
3435
thiserror = { workspace = true }

quickwit/quickwit-storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod prefix_storage;
5050
mod ram_storage;
5151
mod split;
5252
mod split_cache;
53+
mod stable_deref_bytes;
5354
mod storage_factory;
5455
mod storage_resolver;
5556
mod versioned_component;

quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use azure_storage::Error as AzureError;
2626
use azure_storage::prelude::*;
2727
use azure_storage_blobs::blob::operations::GetBlobResponse;
2828
use azure_storage_blobs::prelude::*;
29-
use bytes::Bytes;
29+
use bytes::{Bytes, BytesMut};
3030
use futures::io::Error as FutureError;
3131
use futures::stream::{StreamExt, TryStreamExt};
3232
use md5::Digest;
@@ -44,6 +44,7 @@ use tracing::{instrument, warn};
4444

4545
use crate::debouncer::DebouncedStorage;
4646
use crate::metrics::object_storage_get_slice_in_flight_guards;
47+
use crate::stable_deref_bytes::into_owned_bytes;
4748
use crate::storage::SendableAsync;
4849
use crate::{
4950
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage,
@@ -203,12 +204,12 @@ impl AzureBlobStorage {
203204
key_path.to_string_lossy().to_string()
204205
}
205206

206-
/// Downloads a blob as vector of bytes.
207-
async fn get_to_vec(
207+
/// Downloads a blob as `Bytes` — zero-copy when the blob arrives as a single chunk.
208+
async fn get_to_bytes(
208209
&self,
209210
path: &Path,
210211
range_opt: Option<Range<usize>>,
211-
) -> StorageResult<Vec<u8>> {
212+
) -> StorageResult<Bytes> {
212213
let name = self.blob_name(path);
213214
let capacity = range_opt.as_ref().map(Range::len).unwrap_or(0);
214215
retry(&self.retry_params, || async {
@@ -226,10 +227,8 @@ impl AzureBlobStorage {
226227
let stream = self.container_client.blob_client(&name).get().into_stream();
227228
(stream, None)
228229
};
229-
let mut buf: Vec<u8> = Vec::with_capacity(capacity);
230-
download_all(&mut response_stream, &mut buf).await?;
231-
232-
Result::<_, AzureErrorWrapper>::Ok(buf)
230+
let bytes = download_all(&mut response_stream).await?;
231+
Result::<_, AzureErrorWrapper>::Ok(bytes)
233232
})
234233
.await
235234
.map_err(StorageError::from)
@@ -442,9 +441,9 @@ impl Storage for AzureBlobStorage {
442441

443442
#[instrument(level = "debug", skip(self, range), fields(range.start = range.start, range.end = range.end))]
444443
async fn get_slice(&self, path: &Path, range: Range<usize>) -> StorageResult<OwnedBytes> {
445-
self.get_to_vec(path, Some(range.clone()))
444+
self.get_to_bytes(path, Some(range.clone()))
446445
.await
447-
.map(OwnedBytes::new)
446+
.map(into_owned_bytes)
448447
.map_err(|err| {
449448
err.add_context(format!(
450449
"failed to fetch slice {:?} for object: {}/{}",
@@ -493,9 +492,9 @@ impl Storage for AzureBlobStorage {
493492
#[instrument(level = "debug", skip(self), fields(fetched_bytes_len))]
494493
async fn get_all(&self, path: &Path) -> StorageResult<OwnedBytes> {
495494
let data = self
496-
.get_to_vec(path, None)
495+
.get_to_bytes(path, None)
497496
.await
498-
.map(OwnedBytes::new)
497+
.map(into_owned_bytes)
499498
.map_err(|err| {
500499
err.add_context(format!(
501500
"failed to fetch object: {}/{}",
@@ -559,28 +558,45 @@ pub fn parse_azure_uri(uri: &Uri) -> Option<(String, PathBuf)> {
559558
Some((container, prefix))
560559
}
561560

562-
/// Collect a download stream into an output buffer.
561+
/// Collect a download stream into a single [`Bytes`].
562+
///
563+
/// `Bytes` segments yielded by the SDK are preserved so that the single-segment case avoids the
564+
/// extra copy into a contiguous buffer. When more than one segment is received, they are
565+
/// concatenated exactly once into a freshly allocated `Bytes`.
563566
async fn download_all(
564567
chunk_stream: &mut Pageable<GetBlobResponse, AzureError>,
565-
output: &mut Vec<u8>,
566-
) -> Result<(), AzureErrorWrapper> {
567-
output.clear();
568+
) -> Result<Bytes, AzureErrorWrapper> {
569+
let mut segments: Vec<Bytes> = Vec::new();
570+
let mut total_num_bytes: usize = 0;
568571
while let Some(chunk_result) = chunk_stream.next().await {
569572
let chunk_response = chunk_result?;
570-
let chunk_response_body_stream = chunk_response
571-
.data
572-
.map_err(FutureError::other)
573-
.into_async_read()
574-
.compat();
575-
let mut body_stream_reader = BufReader::new(chunk_response_body_stream);
576-
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
577-
crate::STORAGE_METRICS
578-
.object_storage_download_num_bytes
579-
.inc_by(num_bytes_copied);
573+
let mut data_stream = chunk_response.data;
574+
while let Some(bytes_res) = data_stream.next().await {
575+
let bytes = bytes_res?;
576+
total_num_bytes += bytes.len();
577+
segments.push(bytes);
578+
}
579+
}
580+
crate::STORAGE_METRICS
581+
.object_storage_download_num_bytes
582+
.inc_by(total_num_bytes as u64);
583+
Ok(coalesce_segments(segments, total_num_bytes))
584+
}
585+
586+
/// Returns a single [`Bytes`] covering `segments`. Zero-copy when there is at most one segment;
587+
/// otherwise a single allocation concatenates them.
588+
fn coalesce_segments(mut segments: Vec<Bytes>, total_num_bytes: usize) -> Bytes {
589+
match segments.len() {
590+
0 => Bytes::new(),
591+
1 => segments.remove(0),
592+
_ => {
593+
let mut out = BytesMut::with_capacity(total_num_bytes);
594+
for segment in segments {
595+
out.extend_from_slice(&segment);
596+
}
597+
out.freeze()
598+
}
580599
}
581-
// When calling `get_all`, the Vec capacity is not properly set.
582-
output.shrink_to_fit();
583-
Ok(())
584600
}
585601

586602
#[derive(Error, Debug)]

quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use aws_sdk_s3::config::{
3030
use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
3131
use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
3232
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
33-
use aws_sdk_s3::primitives::ByteStream;
33+
use aws_sdk_s3::primitives::{AggregatedBytes, ByteStream};
3434
use aws_sdk_s3::types::builders::ObjectIdentifierBuilder;
3535
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
3636
use base64::prelude::{BASE64_STANDARD, Engine};
37+
use bytes::Bytes;
3738
use futures::{StreamExt, stream};
3839
use quickwit_aws::retry::{AwsRetryable, aws_retry};
3940
use quickwit_aws::{aws_behavior_version, get_aws_config};
@@ -48,6 +49,7 @@ use tracing::{info, instrument, warn};
4849

4950
use crate::metrics::object_storage_get_slice_in_flight_guards;
5051
use crate::object_storage::MultiPartPolicy;
52+
use crate::stable_deref_bytes::into_owned_bytes;
5153
use crate::storage::SendableAsync;
5254
use crate::{
5355
BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError,
@@ -445,11 +447,11 @@ impl S3CompatibleObjectStorage {
445447
.upload_id(upload_id.0)
446448
.send()
447449
.await
448-
.map_err(|s3_err| {
449-
if s3_err.is_retryable() {
450-
Retry::Transient(StorageError::from(s3_err))
450+
.map_err(|sdk_error| {
451+
if sdk_error.is_retryable() {
452+
Retry::Transient(StorageError::from(sdk_error))
451453
} else {
452-
Retry::Permanent(StorageError::from(s3_err))
454+
Retry::Permanent(StorageError::from(sdk_error))
453455
}
454456
})?;
455457

@@ -484,7 +486,7 @@ impl S3CompatibleObjectStorage {
484486
.collect::<Vec<_>>()
485487
.await
486488
.into_iter()
487-
.map(|res| res.map_err(|e| e.into_inner()))
489+
.map(|res| res.map_err(|error| error.into_inner()))
488490
.collect();
489491
match completed_parts_res {
490492
Ok(completed_parts) => {
@@ -564,22 +566,19 @@ impl S3CompatibleObjectStorage {
564566
Ok(get_object_output)
565567
}
566568

567-
async fn get_to_vec(
569+
async fn get_to_bytes(
568570
&self,
569571
path: &Path,
570572
range_opt: Option<Range<usize>>,
571-
) -> StorageResult<Vec<u8>> {
572-
let cap = range_opt.as_ref().map(Range::len).unwrap_or(0);
573+
) -> StorageResult<Bytes> {
573574
let get_object_output = aws_retry(&self.retry_params, || {
574575
self.get_object(path, range_opt.clone())
575576
})
576577
.await?;
577578
// only record ranged get request as being in flight
578579
let _in_flight_guards =
579580
range_opt.map(|range| object_storage_get_slice_in_flight_guards(range.len()));
580-
let mut buf: Vec<u8> = Vec::with_capacity(cap);
581-
download_all(get_object_output.body, &mut buf).await?;
582-
Ok(buf)
581+
download_all(get_object_output.body).await
583582
}
584583

585584
/// Bulk delete implementation based on the DeleteObject API:
@@ -720,16 +719,35 @@ impl S3CompatibleObjectStorage {
720719
}
721720
}
722721

723-
async fn download_all(byte_stream: ByteStream, output: &mut Vec<u8>) -> io::Result<()> {
724-
output.clear();
725-
let mut body_stream_reader = BufReader::new(byte_stream.into_async_read());
726-
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
722+
async fn download_all(byte_stream: ByteStream) -> StorageResult<Bytes> {
723+
let aggregated: AggregatedBytes = byte_stream
724+
.collect()
725+
.await
726+
.map_err(byte_stream_to_storage_error)?;
727+
// `AggregatedBytes::into_bytes` returns the underlying `Bytes` without copying when the body
728+
// was received as a single segment, and concatenates into a fresh `Bytes` otherwise.
729+
let bytes = aggregated.into_bytes();
727730
STORAGE_METRICS
728731
.object_storage_download_num_bytes
729-
.inc_by(num_bytes_copied);
730-
// When calling `get_all`, the Vec capacity is not properly set.
731-
output.shrink_to_fit();
732-
Ok(())
732+
.inc_by(bytes.len() as u64);
733+
Ok(bytes)
734+
}
735+
736+
/// Classifies a `ByteStream::collect` failure. The response headers were already received
737+
/// successfully at this point; what fails here is draining the body. The underlying cause can be a
738+
/// local `io::Error` (which we classify as `Io`) or a transport-level streaming failure — stalled
739+
/// stream, malformed body, checksum mismatch — which we classify as `Internal` and report with the
740+
/// original error preserved as source.
741+
fn byte_stream_to_storage_error(error: aws_sdk_s3::primitives::ByteStreamError) -> StorageError {
742+
let is_io_error = std::error::Error::source(&error)
743+
.and_then(|source| source.downcast_ref::<io::Error>())
744+
.is_some();
745+
let kind = if is_io_error {
746+
StorageErrorKind::Io
747+
} else {
748+
StorageErrorKind::Internal
749+
};
750+
kind.with_error(error)
733751
}
734752

735753
#[async_trait]
@@ -816,9 +834,9 @@ impl Storage for S3CompatibleObjectStorage {
816834
#[instrument(level = "debug", skip(self, range), fields(range.start = range.start, range.end = range.end))]
817835
async fn get_slice(&self, path: &Path, range: Range<usize>) -> StorageResult<OwnedBytes> {
818836
let _permit = REQUEST_SEMAPHORE.acquire().await;
819-
self.get_to_vec(path, Some(range.clone()))
837+
self.get_to_bytes(path, Some(range.clone()))
820838
.await
821-
.map(OwnedBytes::new)
839+
.map(into_owned_bytes)
822840
.map_err(|err| {
823841
err.add_context(format!(
824842
"failed to fetch slice {:?} for object: {}/{}",
@@ -850,9 +868,9 @@ impl Storage for S3CompatibleObjectStorage {
850868
async fn get_all(&self, path: &Path) -> StorageResult<OwnedBytes> {
851869
let _permit = REQUEST_SEMAPHORE.acquire().await;
852870
let bytes = self
853-
.get_to_vec(path, None)
871+
.get_to_bytes(path, None)
854872
.await
855-
.map(OwnedBytes::new)
873+
.map(into_owned_bytes)
856874
.map_err(|err| {
857875
err.add_context(format!(
858876
"failed to fetch object: {}/{}",

quickwit/quickwit-storage/src/opendal_storage/base.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio::io::{AsyncRead, AsyncWriteExt as TokioAsyncWriteExt};
2424
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
2525

2626
use crate::metrics::object_storage_get_slice_in_flight_guards;
27+
use crate::stable_deref_bytes::into_owned_bytes;
2728
use crate::storage::SendableAsync;
2829
use crate::{
2930
BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError,
@@ -123,8 +124,11 @@ impl Storage for OpendalStorage {
123124
// recorded before issuing the query to the object store.
124125
let _inflight_guards = object_storage_get_slice_in_flight_guards(size);
125126
crate::STORAGE_METRICS.object_storage_get_total.inc();
126-
let storage_content = self.op.read_with(&path).range(range).await?.to_vec();
127-
Ok(OwnedBytes::new(storage_content))
127+
// `Buffer::to_bytes` is zero-copy when the underlying buffer is contiguous, and coalesces
128+
// into a single `Bytes` otherwise — avoiding the extra `Vec<u8>` round-trip `to_vec` would
129+
// perform.
130+
let storage_content = self.op.read_with(&path).range(range).await?.to_bytes();
131+
Ok(into_owned_bytes(storage_content))
128132
}
129133

130134
async fn get_slice_stream(
@@ -146,8 +150,8 @@ impl Storage for OpendalStorage {
146150

147151
async fn get_all(&self, path: &Path) -> StorageResult<OwnedBytes> {
148152
let path = path.as_os_str().to_string_lossy();
149-
let storage_content = self.op.read(&path).await?.to_vec();
150-
Ok(OwnedBytes::new(storage_content))
153+
let storage_content = self.op.read(&path).await?.to_bytes();
154+
Ok(into_owned_bytes(storage_content))
151155
}
152156

153157
async fn delete(&self, path: &Path) -> StorageResult<()> {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::ops::Deref;
16+
17+
use bytes::Bytes;
18+
use stable_deref_trait::StableDeref;
19+
20+
use crate::OwnedBytes;
21+
22+
/// `StableDeref` wrapper around [`Bytes`] so it can back an [`OwnedBytes`] without an extra copy.
23+
///
24+
/// `Bytes` dereferences to a heap-allocated slice whose address does not change when the `Bytes`
25+
/// itself is moved, which is exactly the contract `StableDeref` requires.
26+
pub(crate) struct StableDerefBytes(pub Bytes);
27+
28+
impl Deref for StableDerefBytes {
29+
type Target = [u8];
30+
31+
#[inline]
32+
fn deref(&self) -> &[u8] {
33+
&self.0
34+
}
35+
}
36+
37+
// SAFETY: `Bytes` stores its payload behind a stable heap pointer; moving the `Bytes` (and thus
38+
// the `StableDerefBytes`) does not invalidate the slice returned by `deref`.
39+
unsafe impl StableDeref for StableDerefBytes {}
40+
41+
/// Wraps a [`Bytes`] into an [`OwnedBytes`] without copying its contents.
42+
#[inline]
43+
pub(crate) fn into_owned_bytes(bytes: Bytes) -> OwnedBytes {
44+
OwnedBytes::new(StableDerefBytes(bytes))
45+
}

0 commit comments

Comments
 (0)