Skip to content

Commit 09b8e15

Browse files
authored
Storage - Decoder Perf Improvements (#49205)
* refactoring * removing unused code * reverting unecessary changes * reverting unecessary changes * reverting unecessary changes * not pre-allocating entire byte array of segment size * renaming variable * adding back unecessary removals * renaming variable * addressing copilot comments
1 parent 9cfb308 commit 09b8e15

3 files changed

Lines changed: 194 additions & 106 deletions

File tree

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoder.java

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import java.io.ByteArrayOutputStream;
99
import java.nio.ByteBuffer;
1010
import java.nio.ByteOrder;
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.List;
1114

1215
import static com.azure.storage.common.implementation.contentvalidation.StructuredMessageConstants.CRC64_LENGTH;
1316
import static com.azure.storage.common.implementation.contentvalidation.StructuredMessageConstants.DEFAULT_MESSAGE_VERSION;
@@ -74,9 +77,12 @@ public class StructuredMessageDecoder {
7477
// Holds bytes left over from a previous decodeChunk() call when the current chunk did not contain a full
7578
// header or footer.
7679
private final ByteArrayOutputStream pendingBytes = new ByteArrayOutputStream();
77-
// Holds the payload bytes of the segment that is currently being decoded. These bytes are intentionally NOT
78-
// emitted to the caller until the segment's CRC footer has been validated.
79-
private final ByteArrayOutputStream currentSegmentBuffer = new ByteArrayOutputStream();
80+
// Payload for the current segment, one per inbound wire buffer. Not pre-sized to the full segment length,
81+
// avoiding unnecessary heap spikes.
82+
// These bytes are intentionally NOT emitted until the segment's CRC footer has been validated.
83+
private List<byte[]> currentSegmentPayload;
84+
// Validated payload buffers to emit from the current decodeChunk() invocation.
85+
private final List<ByteBuffer> validatedOutput = new ArrayList<>();
8086

8187
/**
8288
* Constructs a new StructuredMessageDecoder.
@@ -149,7 +155,7 @@ private boolean tryReadMessageHeader(ByteBuffer buffer) {
149155
/**
150156
* Reads the 10-byte header for the next segment (segment number + segment payload length) and resets
151157
* per-segment state so {@link #tryReadSegmentContent(ByteBuffer)} can begin filling
152-
* {@link #currentSegmentBuffer}.
158+
* {@link #currentSegmentPayload}.
153159
*
154160
* <p>Validates that segments arrive in order and that the declared segment size leaves enough room in the
155161
* remaining message for any subsequent segment headers, payloads, footers, and the trailing message footer –
@@ -197,7 +203,7 @@ private boolean tryReadSegmentHeader(ByteBuffer buffer) {
197203
currentSegmentNumber = segmentNum;
198204
currentSegmentContentLength = segmentSize;
199205
currentSegmentContentOffset = 0;
200-
currentSegmentBuffer.reset();
206+
currentSegmentPayload = new ArrayList<>();
201207

202208
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
203209
// Reset only the per-segment running CRC; the message-wide running CRC keeps accumulating across all
@@ -210,7 +216,7 @@ private boolean tryReadSegmentHeader(ByteBuffer buffer) {
210216

211217
/**
212218
* Pulls as many payload bytes as possible (bounded by what is still owed for the current segment) from the
213-
* pending+buffer view into {@link #currentSegmentBuffer}, updating the running per-segment and per-message
219+
* pending+buffer view into {@link #currentSegmentPayload}, updating the running per-segment and per-message
214220
* CRC64 values along the way.
215221
*
216222
* <p>Bytes accumulated here are not yet emitted to the caller. They are released only after
@@ -238,20 +244,20 @@ private int tryReadSegmentContent(ByteBuffer buffer) {
238244
int toRead = (int) Math.min(available, remaining);
239245
ByteBuffer combined = getCombinedBuffer(buffer);
240246

241-
// Materialize the bytes into a fresh array so we can both feed the CRC64 calculator and stash them in the
242-
// per-segment buffer in one pass.
247+
// Materialize only this chunk so retained memory grows with bytes received, not the full declared segment size.
243248
byte[] content = new byte[toRead];
244249
combined.get(content);
245-
currentSegmentBuffer.write(content, 0, toRead);
246250

247251
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
248252
// Update both CRCs incrementally: the segment CRC will be checked at the segment footer, and the
249253
// message CRC accumulates across every segment to be checked at the message footer.
250-
segmentCrc64 = StorageCrc64Calculator.compute(content, segmentCrc64);
251-
messageCrc64 = StorageCrc64Calculator.compute(content, messageCrc64);
254+
segmentCrc64 = StorageCrc64Calculator.compute(content, 0, toRead, segmentCrc64);
255+
messageCrc64 = StorageCrc64Calculator.compute(content, 0, toRead, messageCrc64);
252256
}
253257

254258
consumeBytes(toRead, buffer);
259+
currentSegmentPayload.add(content);
260+
255261
messageOffset += toRead;
256262
currentSegmentContentOffset += toRead;
257263

@@ -301,28 +307,28 @@ private boolean tryReadMessageFooter(ByteBuffer buffer) {
301307
* Decodes as much as possible from the given buffer and returns any fully validated
302308
* payload bytes that are now safe to emit downstream.
303309
*
304-
* <p>The returned buffer will only ever contain bytes from segments whose CRC (when
305-
* enabled) has already been verified. If no segments have been fully validated by
306-
* this invocation the method returns {@code null}. Callers distinguish "more bytes
310+
* <p>The returned buffers will only ever contain bytes from segments whose CRC (when
311+
* enabled) has already been verified. If no segments have been fully validated by
312+
* this invocation the method returns an empty list. Callers distinguish "more bytes
307313
* needed" from "stream complete" via {@link #isComplete()}.</p>
308314
*
309315
* @param buffer The buffer containing encoded data.
310-
* @return Validated payload bytes ready to emit, or {@code null} if none are ready.
316+
* @return Validated payload bytes ready to emit downstream; empty when none are ready yet.
311317
* @throws IllegalArgumentException if the input is malformed or a CRC64 check fails.
312318
*/
313-
public ByteBuffer decodeChunk(ByteBuffer buffer) {
319+
public List<ByteBuffer> decodeChunk(ByteBuffer buffer) {
314320
// Decoder always reads little-endian; force the order on the caller's buffer so all our get() calls match
315321
// the wire format regardless of how the buffer was constructed.
316322
buffer.order(ByteOrder.LITTLE_ENDIAN);
317323

318324
// Output collected during this single invocation. Each segment whose CRC validates in this call is appended
319-
// here and ultimately returned to the policy as one ByteBuffer.
320-
ByteArrayOutputStream validatedOutput = new ByteArrayOutputStream();
325+
// here and ultimately returned to the policy as one or more ByteBuffers.
326+
validatedOutput.clear();
321327

322328
// Step 1: parse the message header on the first chunk that has enough bytes for it. If this chunk doesn't,
323329
// bail out early.
324330
if (!tryReadMessageHeader(buffer)) {
325-
return emptyOrNull(validatedOutput);
331+
return finishDecodeChunk();
326332
}
327333

328334
// Step 2: walk forward through the message until we either hit the end (messageOffset == messageLength) or
@@ -349,20 +355,13 @@ public ByteBuffer decodeChunk(ByteBuffer buffer) {
349355

350356
if (currentSegmentContentOffset == currentSegmentContentLength) {
351357
// Segment payload fully buffered. Validate the CRC footer (if any). When the footer isn't fully
352-
// available yet, break and resume on the next chunk – currentSegmentBuffer keeps its contents so
358+
// available yet, break and resume on the next chunk – currentSegmentPayload keeps its contents so
353359
// we can still emit them on the call where the footer arrives.
354360
if (!tryReadSegmentFooter(buffer)) {
355361
break;
356362
}
357363
// Segment passed validation: it is now safe to release the buffered payload to the caller.
358-
try {
359-
currentSegmentBuffer.writeTo(validatedOutput);
360-
} catch (java.io.IOException e) {
361-
// ByteArrayOutputStream.writeTo(ByteArrayOutputStream) does not actually throw, but the
362-
// signature forces us to handle it.
363-
throw LOGGER.logExceptionAsError(new IllegalStateException(e));
364-
}
365-
currentSegmentBuffer.reset();
364+
releaseValidatedSegmentPayload();
366365
segmentHeaderRead = false;
367366
// Loop continues: either consume the next segment's header or the message footer.
368367
} else if (payloadRead == 0 && getAvailableBytes(buffer) == 0) {
@@ -371,7 +370,33 @@ public ByteBuffer decodeChunk(ByteBuffer buffer) {
371370
}
372371
}
373372

374-
return emptyOrNull(validatedOutput);
373+
return finishDecodeChunk();
374+
}
375+
376+
/**
377+
* Hands off validated segment bytes for emission as {@link ByteBuffer} views over the accumulated payload copies
378+
* without consolidating them into a single array.
379+
*/
380+
private void releaseValidatedSegmentPayload() {
381+
List<byte[]> validatedCopies = currentSegmentPayload;
382+
currentSegmentPayload = null;
383+
for (byte[] validatedCopy : validatedCopies) {
384+
validatedOutput.add(ByteBuffer.wrap(validatedCopy));
385+
}
386+
}
387+
388+
private List<ByteBuffer> finishDecodeChunk() {
389+
if (validatedOutput.isEmpty()) {
390+
return Collections.emptyList();
391+
}
392+
List<ByteBuffer> result;
393+
if (validatedOutput.size() == 1) {
394+
result = Collections.singletonList(validatedOutput.get(0));
395+
} else {
396+
result = new ArrayList<>(validatedOutput);
397+
}
398+
validatedOutput.clear();
399+
return result;
375400
}
376401

377402
/**
@@ -474,18 +499,6 @@ private void appendToPending(ByteBuffer buffer) {
474499
}
475500
}
476501

477-
/**
478-
* Wraps {@code output} as a {@link ByteBuffer}, or returns {@code null} when no bytes were emitted in this
479-
* pass. The {@code null} return distinguishes "no validated bytes ready in this chunk" (still need more input)
480-
* from "stream complete" (which the caller checks via {@link #isComplete()}).
481-
*/
482-
private static ByteBuffer emptyOrNull(ByteArrayOutputStream output) {
483-
if (output.size() == 0) {
484-
return null;
485-
}
486-
return ByteBuffer.wrap(output.toByteArray());
487-
}
488-
489502
/**
490503
* Reports whether the decoder has finished consuming the entire structured message and validated everything it
491504
* was supposed to validate. Used by the pipeline policy to distinguish "stream ended cleanly" from "stream was

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,11 @@ private static boolean isEligibleDownload(HttpResponse response, Long contentLen
145145
* appended so a truncated body raises an error instead of completing silently.
146146
*/
147147
private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, StructuredMessageDecoder decoder) {
148-
return encodedFlux.concatMap(buffer -> decodeBuffer(buffer, decoder))
148+
// limitRate(1) mirrors StorageContentValidationPolicy's upload path: process one wire buffer at a time so
149+
// the decoder can copy only the current chunk into owned storage and release the inbound buffer before the
150+
// next segment payload bytes arrive.
151+
return encodedFlux.limitRate(1)
152+
.concatMap(buffer -> decodeBuffer(buffer, decoder))
149153
.concatWith(Mono.defer(() -> handleStreamCompletion(decoder)));
150154
}
151155

@@ -168,8 +172,7 @@ private Flux<ByteBuffer> decodeBuffer(ByteBuffer buffer, StructuredMessageDecode
168172
}
169173

170174
try {
171-
ByteBuffer validated = decoder.decodeChunk(buffer);
172-
return emitDecodedPayload(validated);
175+
return Flux.fromIterable(decoder.decodeChunk(buffer));
173176
} catch (IllegalArgumentException e) {
174177
return Flux.error(new IOException("Failed to decode structured message: " + e.getMessage(), e));
175178
} catch (Exception e) {
@@ -191,13 +194,4 @@ private Mono<ByteBuffer> handleStreamCompletion(StructuredMessageDecoder decoder
191194
return Mono.empty();
192195
}
193196

194-
/**
195-
* Wraps decoder output in a Flux.
196-
*/
197-
private static Flux<ByteBuffer> emitDecodedPayload(ByteBuffer decodedPayload) {
198-
if (decodedPayload == null || !decodedPayload.hasRemaining()) {
199-
return Flux.empty();
200-
}
201-
return Flux.just(decodedPayload);
202-
}
203197
}

0 commit comments

Comments
 (0)