Skip to content

Commit 03fe291

Browse files
author
Local Merge
committed
resolving merge conflicts
1 parent d7254df commit 03fe291

1 file changed

Lines changed: 181 additions & 181 deletions

File tree

Lines changed: 181 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,184 +1,184 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
5-
package com.azure.storage.common.policy;
6-
7-
import com.azure.core.http.HttpClient;
8-
import com.azure.core.http.HttpHeaderName;
9-
import com.azure.core.http.HttpHeaders;
10-
import com.azure.core.http.HttpMethod;
11-
import com.azure.core.http.HttpPipeline;
12-
import com.azure.core.http.HttpPipelineBuilder;
13-
import com.azure.core.http.HttpRequest;
14-
import com.azure.core.http.HttpResponse;
15-
import com.azure.core.test.http.MockHttpResponse;
16-
import com.azure.core.util.Context;
17-
import com.azure.core.util.FluxUtil;
18-
import com.azure.storage.common.implementation.Constants;
19-
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageConstants;
20-
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageEncoder;
21-
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageFlags;
22-
import org.junit.jupiter.api.Test;
23-
import org.junit.jupiter.params.ParameterizedTest;
24-
import org.junit.jupiter.params.provider.Arguments;
25-
import org.junit.jupiter.params.provider.MethodSource;
26-
import reactor.core.publisher.Flux;
27-
import reactor.core.publisher.Mono;
28-
29-
import java.io.IOException;
30-
import java.nio.ByteBuffer;
31-
import java.util.Objects;
32-
import java.util.concurrent.ThreadLocalRandom;
33-
import java.util.concurrent.atomic.AtomicReference;
34-
import java.util.stream.Stream;
35-
36-
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
37-
import static org.junit.jupiter.api.Assertions.assertEquals;
38-
import static org.junit.jupiter.api.Assertions.assertNotNull;
39-
import static org.junit.jupiter.api.Assertions.assertTrue;
40-
41-
/**
42-
* Tests {@link StorageContentValidationDecoderPolicy} together with {@link StructuredMessageEncoder} /
43-
* wire-format payloads so the reactive decode path matches what the blob download pipeline uses.
44-
*/
45-
public class StorageContentValidationDecoderPolicyTests {
46-
47-
/**
48-
* End-to-end through the policy: encoded body uses multi-megabyte segment payload lengths (not the default
49-
* 4 MiB framing only); decoded flux must match the original bytes.
50-
*/
51-
@ParameterizedTest
52-
@MethodSource("segmentPayloadSizeAndTotalPayloadSizeSupplier")
53-
public void decodesDynamicallySizedSegmentStructuredMessageThroughPipeline(int segmentPayloadSize,
54-
int totalPayloadSize) throws IOException {
55-
byte[] originalData = new byte[totalPayloadSize];
56-
ThreadLocalRandom.current().nextBytes(originalData);
57-
58-
byte[] encodedBytes
59-
= encodeStructuredMessageWireBytes(originalData, segmentPayloadSize, StructuredMessageFlags.STORAGE_CRC64);
60-
61-
AtomicReference<HttpRequest> requestAfterPolicies = new AtomicReference<>();
62-
HttpClient httpClient = request -> {
63-
requestAfterPolicies.set(request);
64-
HttpHeaders headers = structuredDownloadResponseHeaders(encodedBytes.length, totalPayloadSize);
65-
return Mono.just(new MockHttpResponse(request, 200, headers, encodedBytes));
66-
};
67-
68-
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
69-
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
70-
return next.process();
71-
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
72-
73-
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
74-
try (HttpResponse response = pipeline.send(request, Context.NONE).block()) {
75-
assertNotNull(response);
76-
assertTrue(response instanceof DecodedResponse);
77-
byte[] decoded = Objects.requireNonNull(response.getBodyAsByteArray().block());
78-
assertArrayEquals(originalData, decoded);
79-
}
80-
81-
HttpRequest sent = requestAfterPolicies.get();
82-
assertNotNull(sent);
83-
assertEquals(StructuredMessageConstants.STRUCTURED_BODY_TYPE_VALUE,
84-
sent.getHeaders().getValue(Constants.HeaderConstants.STRUCTURED_BODY_TYPE_HEADER_NAME));
85-
}
86-
87-
@Test
88-
public void contentLengthIsOverriddenToDecodedSizeWhenDecodingApplied() throws IOException {
89-
byte[] payload = new byte[64];
90-
ThreadLocalRandom.current().nextBytes(payload);
91-
92-
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
93-
long encodedLen = encoded.length;
94-
long decodedLen = payload.length;
95-
96-
HttpClient httpClient = request -> Mono.just(
97-
new MockHttpResponse(request, 200, structuredDownloadResponseHeaders(encodedLen, decodedLen), encoded));
98-
99-
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
100-
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
101-
return next.process();
102-
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
103-
104-
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
105-
HttpResponse response = pipeline.send(request, Context.NONE).block();
106-
107-
assertNotNull(response);
108-
assertEquals(String.valueOf(decodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
109-
}
110-
111-
@Test
112-
public void contentLengthMatchesActualDecodedBodySize() throws IOException {
113-
byte[] payload = new byte[128];
114-
ThreadLocalRandom.current().nextBytes(payload);
115-
116-
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
117-
long encodedLen = encoded.length;
118-
long decodedLen = payload.length;
119-
120-
HttpClient httpClient = request -> Mono.just(
121-
new MockHttpResponse(request, 200, structuredDownloadResponseHeaders(encodedLen, decodedLen), encoded));
122-
123-
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
124-
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
125-
return next.process();
126-
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
127-
128-
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
129-
try (HttpResponse response = pipeline.send(request, Context.NONE).block()) {
130-
assertNotNull(response);
131-
assertEquals(String.valueOf(decodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
132-
133-
byte[] body = Objects.requireNonNull(FluxUtil.collectBytesInByteBufferStream(response.getBody()).block());
134-
assertEquals(decodedLen, body.length);
135-
assertArrayEquals(payload, body);
136-
}
137-
}
138-
139-
@Test
140-
public void contentLengthIsUnchangedWhenDecodingNotApplied() throws IOException {
141-
byte[] payload = new byte[64];
142-
ThreadLocalRandom.current().nextBytes(payload);
143-
144-
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
145-
long encodedLen = encoded.length;
146-
147-
HttpHeaders responseHeaders = new HttpHeaders().set(HttpHeaderName.CONTENT_LENGTH, String.valueOf(encodedLen));
148-
HttpClient httpClient = request -> Mono.just(new MockHttpResponse(request, 200, responseHeaders, encoded));
149-
150-
HttpPipeline pipeline = new HttpPipelineBuilder()
151-
.policies(new StorageContentValidationDecoderPolicy())
152-
.httpClient(httpClient)
153-
.build();
154-
155-
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
156-
HttpResponse response = pipeline.send(request, Context.NONE).block();
157-
158-
assertNotNull(response);
159-
assertEquals(String.valueOf(encodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
160-
}
161-
162-
private static Stream<Arguments> segmentPayloadSizeAndTotalPayloadSizeSupplier() {
163-
return Stream.of(Arguments.of(10 * 1024 * 1024, 10 * 1024 * 1024 + 1), // larger than 4 MiB
164-
Arguments.of(3 * 1024 * 1024, 3 * 1024 * 1024 + 1), // smaller than 4 MiB, but not KB
165-
Arguments.of(5 * 1024 * 1024 + 1, 15 * 1024 * 1024));
166-
}
167-
168-
private static HttpHeaders structuredDownloadResponseHeaders(long contentLength, long structuredContentLength) {
169-
HttpHeaders headers = new HttpHeaders();
170-
headers.set(HttpHeaderName.CONTENT_LENGTH, String.valueOf(contentLength));
171-
headers.set(Constants.HeaderConstants.STRUCTURED_BODY_TYPE_HEADER_NAME,
172-
StructuredMessageConstants.STRUCTURED_BODY_TYPE_VALUE);
173-
headers.set(Constants.HeaderConstants.STRUCTURED_CONTENT_LENGTH_HEADER_NAME,
174-
String.valueOf(structuredContentLength));
175-
return headers;
176-
}
177-
178-
private static byte[] encodeStructuredMessageWireBytes(byte[] originalData, int segmentLength,
179-
StructuredMessageFlags flags) throws IOException {
180-
StructuredMessageEncoder encoder = new StructuredMessageEncoder(originalData.length, segmentLength, flags);
181-
Flux<ByteBuffer> flux = encoder.encode(ByteBuffer.wrap(originalData));
182-
return Objects.requireNonNull(FluxUtil.collectBytesInByteBufferStream(flux).block());
183-
}
184-
}
4+
package com.azure.storage.common.policy;
5+
6+
import com.azure.core.http.HttpClient;
7+
import com.azure.core.http.HttpHeaderName;
8+
import com.azure.core.http.HttpHeaders;
9+
import com.azure.core.http.HttpMethod;
10+
import com.azure.core.http.HttpPipeline;
11+
import com.azure.core.http.HttpPipelineBuilder;
12+
import com.azure.core.http.HttpRequest;
13+
import com.azure.core.http.HttpResponse;
14+
import com.azure.core.test.http.MockHttpResponse;
15+
import com.azure.core.util.Context;
16+
import com.azure.core.util.FluxUtil;
17+
import com.azure.storage.common.implementation.Constants;
18+
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageConstants;
19+
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageEncoder;
20+
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageFlags;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.Arguments;
24+
import org.junit.jupiter.params.provider.MethodSource;
25+
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Objects;
32+
import java.util.concurrent.ThreadLocalRandom;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.stream.Stream;
35+
36+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.junit.jupiter.api.Assertions.assertNotNull;
39+
import static org.junit.jupiter.api.Assertions.assertTrue;
40+
41+
/**
42+
* Tests {@link StorageContentValidationDecoderPolicy} together with {@link StructuredMessageEncoder} /
43+
* wire-format payloads so the reactive decode path matches what the blob download pipeline uses.
44+
*/
45+
public class StorageContentValidationDecoderPolicyTests {
46+
47+
/**
48+
* End-to-end through the policy: encoded body uses multi-megabyte segment payload lengths (not the default
49+
* 4 MiB framing only); decoded flux must match the original bytes.
50+
*/
51+
@ParameterizedTest
52+
@MethodSource("segmentPayloadSizeAndTotalPayloadSizeSupplier")
53+
public void decodesDynamicallySizedSegmentStructuredMessageThroughPipeline(int segmentPayloadSize,
54+
int totalPayloadSize) throws IOException {
55+
byte[] originalData = new byte[totalPayloadSize];
56+
ThreadLocalRandom.current().nextBytes(originalData);
57+
58+
byte[] encodedBytes
59+
= encodeStructuredMessageWireBytes(originalData, segmentPayloadSize, StructuredMessageFlags.STORAGE_CRC64);
60+
61+
AtomicReference<HttpRequest> requestAfterPolicies = new AtomicReference<>();
62+
HttpClient httpClient = request -> {
63+
requestAfterPolicies.set(request);
64+
HttpHeaders headers = structuredDownloadResponseHeaders(encodedBytes.length, totalPayloadSize);
65+
return Mono.just(new MockHttpResponse(request, 200, headers, encodedBytes));
66+
};
67+
68+
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
69+
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
70+
return next.process();
71+
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
72+
73+
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
74+
try (HttpResponse response = pipeline.send(request, Context.NONE).block()) {
75+
assertNotNull(response);
76+
assertTrue(response instanceof DecodedResponse);
77+
byte[] decoded = Objects.requireNonNull(response.getBodyAsByteArray().block());
78+
assertArrayEquals(originalData, decoded);
79+
}
80+
81+
HttpRequest sent = requestAfterPolicies.get();
82+
assertNotNull(sent);
83+
assertEquals(StructuredMessageConstants.STRUCTURED_BODY_TYPE_VALUE,
84+
sent.getHeaders().getValue(Constants.HeaderConstants.STRUCTURED_BODY_TYPE_HEADER_NAME));
85+
}
86+
87+
@Test
88+
public void contentLengthIsOverriddenToDecodedSizeWhenDecodingApplied() throws IOException {
89+
byte[] payload = new byte[64];
90+
ThreadLocalRandom.current().nextBytes(payload);
91+
92+
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
93+
long encodedLen = encoded.length;
94+
long decodedLen = payload.length;
95+
96+
HttpClient httpClient = request -> Mono.just(
97+
new MockHttpResponse(request, 200, structuredDownloadResponseHeaders(encodedLen, decodedLen), encoded));
98+
99+
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
100+
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
101+
return next.process();
102+
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
103+
104+
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
105+
HttpResponse response = pipeline.send(request, Context.NONE).block();
106+
107+
assertNotNull(response);
108+
assertEquals(String.valueOf(decodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
109+
}
110+
111+
@Test
112+
public void contentLengthMatchesActualDecodedBodySize() throws IOException {
113+
byte[] payload = new byte[128];
114+
ThreadLocalRandom.current().nextBytes(payload);
115+
116+
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
117+
long encodedLen = encoded.length;
118+
long decodedLen = payload.length;
119+
120+
HttpClient httpClient = request -> Mono.just(
121+
new MockHttpResponse(request, 200, structuredDownloadResponseHeaders(encodedLen, decodedLen), encoded));
122+
123+
HttpPipeline pipeline = new HttpPipelineBuilder().policies((context, next) -> {
124+
context.setData(StructuredMessageConstants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
125+
return next.process();
126+
}, new StorageContentValidationDecoderPolicy()).httpClient(httpClient).build();
127+
128+
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
129+
try (HttpResponse response = pipeline.send(request, Context.NONE).block()) {
130+
assertNotNull(response);
131+
assertEquals(String.valueOf(decodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
132+
133+
byte[] body = Objects.requireNonNull(FluxUtil.collectBytesInByteBufferStream(response.getBody()).block());
134+
assertEquals(decodedLen, body.length);
135+
assertArrayEquals(payload, body);
136+
}
137+
}
138+
139+
@Test
140+
public void contentLengthIsUnchangedWhenDecodingNotApplied() throws IOException {
141+
byte[] payload = new byte[64];
142+
ThreadLocalRandom.current().nextBytes(payload);
143+
144+
byte[] encoded = encodeStructuredMessageWireBytes(payload, 64, StructuredMessageFlags.STORAGE_CRC64);
145+
long encodedLen = encoded.length;
146+
147+
HttpHeaders responseHeaders = new HttpHeaders().set(HttpHeaderName.CONTENT_LENGTH, String.valueOf(encodedLen));
148+
HttpClient httpClient = request -> Mono.just(new MockHttpResponse(request, 200, responseHeaders, encoded));
149+
150+
HttpPipeline pipeline = new HttpPipelineBuilder()
151+
.policies(new StorageContentValidationDecoderPolicy())
152+
.httpClient(httpClient)
153+
.build();
154+
155+
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://example.blob.core.windows.net/c/b");
156+
HttpResponse response = pipeline.send(request, Context.NONE).block();
157+
158+
assertNotNull(response);
159+
assertEquals(String.valueOf(encodedLen), response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH));
160+
}
161+
162+
private static Stream<Arguments> segmentPayloadSizeAndTotalPayloadSizeSupplier() {
163+
return Stream.of(Arguments.of(10 * 1024 * 1024, 10 * 1024 * 1024 + 1), // larger than 4 MiB
164+
Arguments.of(3 * 1024 * 1024, 3 * 1024 * 1024 + 1), // smaller than 4 MiB, but not KB
165+
Arguments.of(5 * 1024 * 1024 + 1, 15 * 1024 * 1024));
166+
}
167+
168+
private static HttpHeaders structuredDownloadResponseHeaders(long contentLength, long structuredContentLength) {
169+
HttpHeaders headers = new HttpHeaders();
170+
headers.set(HttpHeaderName.CONTENT_LENGTH, String.valueOf(contentLength));
171+
headers.set(Constants.HeaderConstants.STRUCTURED_BODY_TYPE_HEADER_NAME,
172+
StructuredMessageConstants.STRUCTURED_BODY_TYPE_VALUE);
173+
headers.set(Constants.HeaderConstants.STRUCTURED_CONTENT_LENGTH_HEADER_NAME,
174+
String.valueOf(structuredContentLength));
175+
return headers;
176+
}
177+
178+
private static byte[] encodeStructuredMessageWireBytes(byte[] originalData, int segmentLength,
179+
StructuredMessageFlags flags) throws IOException {
180+
StructuredMessageEncoder encoder = new StructuredMessageEncoder(originalData.length, segmentLength, flags);
181+
Flux<ByteBuffer> flux = encoder.encode(ByteBuffer.wrap(originalData));
182+
return Objects.requireNonNull(FluxUtil.collectBytesInByteBufferStream(flux).block());
183+
}
184+
}

0 commit comments

Comments
 (0)