Skip to content

Commit fdb01e3

Browse files
gscalderonCopilot
andauthored
perf: [durable_buffer] Avoid unnecessary copy of OTLP bytes in OtlpBytesAdapter::new() (open-telemetry#2726)
Replace `BinaryArray::from_vec()` which deep-copies the entire OTLP payload with a zero-copy construction using `Buffer::from(bytes::Bytes)`. The `clone_bytes()` call is just an Arc refcount bump, and `Buffer::from(Bytes)` wraps the data without copying, eliminating a full memcpy on the ingest hot path. # Change Summary - **Zero-copy wrapping**: Use `Buffer::from(bytes::Bytes)` instead of `BinaryArray::from_vec()` to avoid deep-copying the OTLP payload into Arrow. - **Bounds check**: Added explicit `i32::try_from` guard on payload length for a clear error on oversized payloads. - **New tests**: `test_otlp_bytes_adapter_zero_copy` (pointer equality assertion) and `test_otlp_bytes_adapter_empty_payload`. ## What issue does this PR close? * Closes open-telemetry#2703 ## How are these changes tested? - Existing tests (`test_otlp_bytes_adapter`, `test_extract_otlp_bytes`) verify correctness is preserved. - New `test_otlp_bytes_adapter_zero_copy` asserts the Arrow buffer points to the same memory as the original `OtlpProtoBytes` (no copy). - New `test_otlp_bytes_adapter_empty_payload` verifies empty payload handling. ## Are there any user-facing changes? No. This is a transparent performance improvement with no API or behavior changes. --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8f6fa92 commit fdb01e3

1 file changed

Lines changed: 52 additions & 2 deletions

File tree

  • rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor

rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/bundle_adapter.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use std::sync::{Arc, LazyLock};
4545
use std::time::SystemTime;
4646

4747
use arrow::array::{BinaryArray, RecordBatch};
48+
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
4849
use arrow::datatypes::{DataType, Field, Schema};
4950
use quiver::record_bundle::{
5051
BundleDescriptor, PayloadRef, RecordBundle, SchemaFingerprint, SlotDescriptor, SlotId,
@@ -348,8 +349,22 @@ impl OtlpBytesAdapter {
348349
OtlpProtoBytes::ExportTracesRequest(_) => SignalType::Traces,
349350
};
350351

351-
// Create a record batch with a single binary column containing the OTLP bytes
352-
let binary_array = BinaryArray::from_vec(vec![bytes.as_bytes()]);
352+
// Create a record batch with a single binary column containing the OTLP bytes.
353+
// Use zero-copy wrapping: clone_bytes() is just an Arc refcount bump,
354+
// and Buffer::from(Bytes) wraps without copying the payload data.
355+
let data_bytes = bytes.clone_bytes();
356+
let len = i32::try_from(data_bytes.len()).map_err(|_| {
357+
(
358+
BundleConversionError::RecordBatchCreationError(format!(
359+
"OTLP payload too large for BinaryArray: {} bytes exceeds i32::MAX",
360+
data_bytes.len()
361+
)),
362+
bytes.clone(),
363+
)
364+
})?;
365+
let data_buffer = Buffer::from(data_bytes);
366+
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, len]));
367+
let binary_array = BinaryArray::new(offsets, data_buffer, None);
353368
let batch = match RecordBatch::try_new(otlp_binary_schema(), vec![Arc::new(binary_array)]) {
354369
Ok(batch) => batch,
355370
Err(e) => {
@@ -736,6 +751,41 @@ mod tests {
736751
assert!(adapter.payload(wrong_slot).is_none());
737752
}
738753

754+
#[test]
755+
fn test_otlp_bytes_adapter_zero_copy() {
756+
let test_bytes = b"zero-copy verification payload".to_vec();
757+
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Logs, test_bytes);
758+
759+
let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();
760+
761+
let slot = to_otlp_slot_id(SignalType::Logs);
762+
let payload = adapter.payload(slot).unwrap();
763+
let column = payload.batch.column(0);
764+
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
765+
766+
// The Arrow buffer should alias the original bytes (zero-copy).
767+
// Compare the pointer of the stored value with the original OtlpProtoBytes.
768+
let arrow_value_ptr = binary_array.value(0).as_ptr();
769+
let original_ptr = adapter.bytes.as_bytes().as_ptr();
770+
assert_eq!(
771+
arrow_value_ptr, original_ptr,
772+
"BinaryArray value should point to the same memory as OtlpProtoBytes (zero-copy)"
773+
);
774+
}
775+
776+
#[test]
777+
fn test_otlp_bytes_adapter_empty_payload() {
778+
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Metrics, vec![]);
779+
780+
let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();
781+
782+
let slot = to_otlp_slot_id(SignalType::Metrics);
783+
let payload = adapter.payload(slot).unwrap();
784+
let column = payload.batch.column(0);
785+
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
786+
assert_eq!(binary_array.value(0), b"");
787+
}
788+
739789
#[test]
740790
fn test_extract_otlp_bytes() {
741791
let original_bytes = b"original OTLP data".to_vec();

0 commit comments

Comments
 (0)