Skip to content

Commit e3c9100

Browse files
mccullsdevflow.devflow-routing-intake
andauthored
Marshals proto messages into a single prepending buffer, instead of many small byte-arrays (#11296)
Marshals proto messages into a single prepending buffer, instead of many small byte-arrays and move decision whether to export a span into OtlpTraceCollector (avoids re-allocations) Address merge conflicts Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 7e99f00 commit e3c9100

23 files changed

Lines changed: 1430 additions & 1037 deletions

communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.communication.serialization;
22

3+
import static datadog.trace.util.BitUtils.nextPowerOfTwo;
4+
35
import java.nio.ByteBuffer;
46

57
/**
@@ -12,8 +14,8 @@ public final class GrowableBuffer implements StreamingBuffer {
1214
private ByteBuffer buffer;
1315
private int messageCount;
1416

15-
public GrowableBuffer(int initialCapacity) {
16-
this.initialCapacity = initialCapacity;
17+
public GrowableBuffer(int requiredCapacity) {
18+
this.initialCapacity = nextPowerOfTwo(requiredCapacity);
1719
this.buffer = ByteBuffer.allocate(initialCapacity);
1820
}
1921

@@ -122,7 +124,7 @@ public void put(ByteBuffer buffer) {
122124

123125
private void checkCapacity(int required) {
124126
if (buffer.remaining() < required) {
125-
// round up to next multiple of required
127+
// round up to next multiple of initialCapacity that can accommodate required
126128
int newSize = (buffer.capacity() + required + initialCapacity - 1) & -initialCapacity;
127129
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
128130
buffer.flip();

communication/src/test/java/datadog/communication/serialization/GrowableBufferTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ public void byteBufferTriggersResize() {
2727
@Test
2828
public void testBufferCapacity() {
2929
GrowableBuffer gb = new GrowableBuffer(5);
30-
assertEquals(5, gb.capacity());
30+
assertEquals(8, gb.capacity());
3131
ByteBuffer buffer = ByteBuffer.allocate(20);
3232
for (int i = 0; i < 5; ++i) {
3333
buffer.putInt(i);
3434
}
3535
buffer.flip();
3636
gb.put(buffer);
37-
assertEquals(25, gb.capacity());
37+
assertEquals(32, gb.capacity());
3838
}
3939

4040
@Test

dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package datadog.trace.common.writer;
22

33
import datadog.trace.core.CoreSpan;
4-
import datadog.trace.core.DDSpanContext;
54
import datadog.trace.core.otlp.common.OtlpPayload;
65
import datadog.trace.core.otlp.common.OtlpSender;
76
import datadog.trace.core.otlp.trace.OtlpTraceCollector;
87
import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector;
9-
import java.util.ArrayList;
108
import java.util.Collection;
119
import java.util.Collections;
1210
import java.util.List;
@@ -26,18 +24,7 @@ final class OtlpPayloadDispatcher implements PayloadDispatcher {
2624

2725
@Override
2826
public void addTrace(List<? extends CoreSpan<?>> trace) {
29-
List<CoreSpan<?>> sampled = null;
30-
for (CoreSpan<?> span : trace) {
31-
if (shouldExport(span)) {
32-
if (sampled == null) {
33-
sampled = new ArrayList<>(trace.size());
34-
}
35-
sampled.add(span);
36-
}
37-
}
38-
if (sampled != null) {
39-
collector.addTrace(sampled);
40-
}
27+
collector.addTrace(trace);
4128
}
4229

4330
@Override
@@ -57,13 +44,4 @@ public void onDroppedTrace(int spanCount) {
5744
public Collection<RemoteApi> getApis() {
5845
return Collections.emptyList();
5946
}
60-
61-
private static boolean shouldExport(CoreSpan<?> span) {
62-
// trace-level sampling priority
63-
if (span.samplingPriority() > 0) {
64-
return true;
65-
}
66-
// span-level sampling priority
67-
return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null;
68-
}
6947
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import static java.nio.charset.StandardCharsets.UTF_8;
1212

1313
import datadog.communication.serialization.GenerationalUtf8Cache;
14-
import datadog.communication.serialization.GrowableBuffer;
1514
import datadog.communication.serialization.SimpleUtf8Cache;
1615
import datadog.communication.serialization.StreamingBuffer;
1716
import datadog.trace.api.Config;
@@ -125,26 +124,6 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
125124
writeVarInt(buf, fieldNum << 3 | wireType);
126125
}
127126

128-
public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
129-
return recordMessage(buf, fieldNum, 0);
130-
}
131-
132-
public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) {
133-
try {
134-
ByteBuffer data = buf.flip();
135-
int dataSize = data.remaining();
136-
int expectedSize = dataSize + remainingBytes;
137-
ByteBuffer message =
138-
ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize);
139-
writeTag(message, fieldNum, LEN_WIRE_TYPE);
140-
writeVarInt(message, expectedSize);
141-
message.put(data);
142-
return message.array();
143-
} finally {
144-
buf.reset();
145-
}
146-
}
147-
148127
public static void writeInstrumentationScope(
149128
StreamingBuffer buf, OtelInstrumentationScope scope) {
150129
byte[] nameUtf8 = scope.getName().getUtf8Bytes();

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpGrpcRequestBody.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException {
4242
if (gzip) {
4343
try (Buffer gzipBody = new Buffer()) {
4444
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) {
45-
payload.drain(gzipSink::write);
45+
gzipSink.write(payload.getContent());
4646
}
4747
sink.writeByte(COMPRESSED_FLAG);
4848
long gzipLength = gzipBody.size();
@@ -52,7 +52,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException {
5252
} else {
5353
sink.writeByte(UNCOMPRESSED_FLAG);
5454
sink.writeInt(payload.getContentLength());
55-
payload.drain(sink::write);
55+
sink.write(payload.getContent());
5656
}
5757
}
5858
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ public MediaType contentType() {
3535
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
3636
if (gzip) {
3737
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(sink))) {
38-
payload.drain(gzipSink::write);
38+
gzipSink.write(payload.getContent());
3939
}
4040
} else {
41-
payload.drain(sink::write);
41+
sink.write(payload.getContent());
4242
}
4343
}
4444
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpPayload.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,22 @@
11
package datadog.trace.core.otlp.common;
22

3-
import java.io.IOException;
4-
import java.util.ArrayDeque;
5-
import java.util.Deque;
3+
import java.nio.ByteBuffer;
64

7-
/** OTLP payload consisting of a sequence of chunked byte-arrays. */
85
public final class OtlpPayload {
9-
public static final OtlpPayload EMPTY = new OtlpPayload(new ArrayDeque<>(), 0, "");
6+
public static final OtlpPayload EMPTY = new OtlpPayload(ByteBuffer.allocate(0), "");
107

11-
private final Deque<byte[]> chunks;
8+
private final ByteBuffer content;
129
private final int contentLength;
1310
private final String contentType;
1411

15-
public OtlpPayload(Deque<byte[]> chunks, int contentLength, String contentType) {
16-
this.chunks = chunks;
17-
this.contentLength = contentLength;
12+
public OtlpPayload(ByteBuffer content, String contentType) {
13+
this.content = content;
14+
this.contentLength = content.remaining();
1815
this.contentType = contentType;
1916
}
2017

21-
/** Drains the chunked payload to the given sink. */
22-
public void drain(OtlpSink sink) throws IOException {
23-
byte[] chunk;
24-
while ((chunk = chunks.pollFirst()) != null) {
25-
sink.write(chunk);
26-
}
18+
public ByteBuffer getContent() {
19+
return content.asReadOnlyBuffer();
2720
}
2821

2922
public int getContentLength() {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package datadog.trace.core.otlp.common;
2+
3+
import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
4+
import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt;
5+
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeVarInt;
6+
import static datadog.trace.util.BitUtils.nextPowerOfTwo;
7+
8+
import datadog.communication.serialization.GrowableBuffer;
9+
import java.nio.ByteBuffer;
10+
11+
/**
12+
* Growable buffer optimized for prepending protobuf messages. This buffer doesn't have a bounded
13+
* length, and grows linearly. It should only be used to serialize bounded data structures.
14+
*
15+
* <p>Messages appear in the final payload in reverse insertion order.
16+
*
17+
* @see GrowableBuffer
18+
*/
19+
public final class OtlpProtoBuffer {
20+
private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf";
21+
22+
private final int initialCapacity;
23+
private ByteBuffer buffer;
24+
private int remaining;
25+
26+
public OtlpProtoBuffer(int requiredCapacity) {
27+
this.initialCapacity = nextPowerOfTwo(requiredCapacity);
28+
this.buffer = ByteBuffer.allocate(initialCapacity);
29+
this.remaining = initialCapacity;
30+
}
31+
32+
/**
33+
* Records a self-contained protobuf message.
34+
*
35+
* @param buf buffer containing the message body
36+
* @param fieldNum field number of the message
37+
* @return overall size of the message in bytes
38+
*/
39+
public int recordMessage(GrowableBuffer buf, int fieldNum) {
40+
return recordMessage(buf, fieldNum, 0);
41+
}
42+
43+
/**
44+
* Records a protobuf message that has zero or more nested elements already recorded.
45+
*
46+
* @param buf buffer containing the message header
47+
* @param fieldNum field number of the message
48+
* @param bytesSoFar nested element bytes recorded so far
49+
* @return overall size of the message in bytes
50+
*/
51+
public int recordMessage(GrowableBuffer buf, int fieldNum, int bytesSoFar) {
52+
try {
53+
ByteBuffer message = buf.flip();
54+
// calculate space needed to encode message, its total length, and the tag
55+
int messageSize = message.remaining();
56+
int length = messageSize + bytesSoFar;
57+
int tag = fieldNum << 3 | LEN_WIRE_TYPE;
58+
int numBytes = sizeVarInt(tag) + sizeVarInt(length) + messageSize;
59+
// grow the buffer to fit the incoming content
60+
checkCapacity(numBytes);
61+
remaining -= numBytes;
62+
// reposition so we can write the encoded message
63+
buffer.position(remaining);
64+
// write the usual prelude
65+
writeVarInt(buffer, tag);
66+
writeVarInt(buffer, length);
67+
// write the primary message
68+
buffer.put(message);
69+
// no need to reset position here; it's always reset before any write/read
70+
return numBytes + bytesSoFar;
71+
} finally {
72+
buf.reset();
73+
}
74+
}
75+
76+
/**
77+
* Records a previously cached protobuf message.
78+
*
79+
* @param bytes cached bytes containing the message header
80+
* @return overall size of the message in bytes
81+
*/
82+
public int recordMessage(byte[] bytes) {
83+
// grow the buffer to fit the incoming content
84+
int numBytes = bytes.length;
85+
checkCapacity(numBytes);
86+
remaining -= numBytes;
87+
// reposition so we can write the cached message
88+
buffer.position(remaining);
89+
buffer.put(bytes);
90+
// no need to reset position here; it's always reset before any write/read
91+
return numBytes;
92+
}
93+
94+
/** Flips the buffer, returning the protobuf encoded content for reading. */
95+
public ByteBuffer flip() {
96+
buffer.position(remaining);
97+
return buffer;
98+
}
99+
100+
/**
101+
* Returns an {@link OtlpPayload} containing the protobuf encoded content.
102+
*
103+
* <p>This payload is only valid for the calling thread until the next collection.
104+
*/
105+
public OtlpPayload toPayload() {
106+
return new OtlpPayload(flip(), PROTOBUF_CONTENT_TYPE);
107+
}
108+
109+
/**
110+
* Resets the buffer in anticipation of the next collection cycle.
111+
*
112+
* <p>This does not affect the active payload, which remains valid until the next collection.
113+
*/
114+
public void reset() {
115+
if (buffer.capacity() > initialCapacity) {
116+
buffer = ByteBuffer.allocate(initialCapacity);
117+
}
118+
remaining = buffer.capacity();
119+
}
120+
121+
/** Grows the buffer to ensure the required number of bytes can be prepended. */
122+
private void checkCapacity(int required) {
123+
if (remaining < required) {
124+
ByteBuffer oldBuffer = flip();
125+
int oldSize = oldBuffer.remaining();
126+
// round up to next multiple of initialCapacity that can accommodate required
127+
int newSize = (oldSize + required + initialCapacity - 1) & -initialCapacity;
128+
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
129+
// copy over old content so it stays at the far end
130+
remaining = newSize - oldSize;
131+
newBuffer.position(remaining);
132+
newBuffer.put(oldBuffer);
133+
buffer = newBuffer;
134+
}
135+
}
136+
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static datadog.communication.ddagent.TracerVersion.TRACER_VERSION;
44
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE;
55
import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
6-
import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage;
76
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute;
87
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag;
98

@@ -62,7 +61,12 @@ static byte[] buildResourceMessage(Config config) {
6261
}
6362
});
6463

65-
return recordMessage(buf, 1);
64+
OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity());
65+
int numBytes = protobuf.recordMessage(buf, 1);
66+
byte[] resourceMessage = new byte[numBytes];
67+
protobuf.flip().get(resourceMessage);
68+
69+
return resourceMessage;
6670
}
6771

6872
private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) {

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSink.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)