Skip to content

Commit 2cb1714

Browse files
Cap ServiceBusMessageBatch size at 1 MB to match broker enforcement
The Service Bus broker enforces a 1 MB batch size limit regardless of the max-message-size advertised on the AMQP link. Premium partitioned namespaces advertise 100 MB on the link, causing tryAddMessage() to accept batches the broker will reject. Cap the three batch-sending paths in ServiceBusSenderAsyncClient at 1 MB (MAX_BATCH_SIZE_BYTES): createMessageBatch, sendFluxInternal, and AmqpMessageCollector. The single-message scheduleMessageInternal path is not capped since the 1 MB limit is batch-specific and individual messages on Premium can validly exceed 1 MB. When a user requests a batch size exceeding 1 MB via CreateMessageBatchOptions, throw ServiceBusException. Tracking: azure-service-bus#708 ICM: 51000000793879
1 parent 66f5537 commit 2cb1714

2 files changed

Lines changed: 116 additions & 7 deletions

File tree

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
222222
* The default maximum allowable size, in bytes, for a batch to be sent.
223223
*/
224224
static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
225+
// Temporary workaround: Service Bus enforces a maximum batch payload size of 1 MB that is not
226+
// communicated via the AMQP link's max-message-size property. The link reports the per-message
227+
// limit (up to 100 MB for Premium partitioned), but the broker rejects batch sends above 1 MB.
228+
// This cap is applied only to batch-sending code paths (createMessageBatch, sendFlux,
229+
// AmqpMessageCollector). Single-message paths (e.g. scheduleMessage) use the link-reported
230+
// per-message limit, which is valid for individual messages on Premium namespaces.
231+
// Tracked by: https://github.com/Azure/azure-service-bus/issues/708
232+
static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024;
225233
private static final String TRANSACTION_LINK_NAME = "coordinator";
226234
private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]);
227235
private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions();
@@ -463,15 +471,15 @@ public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions
463471
final int maxSize = options.getMaximumSizeInBytes();
464472

465473
return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> {
466-
final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
474+
final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
467475
if (maxSize > maximumLinkSize) {
468476
return monoError(logger,
469477
new IllegalArgumentException(String.format(Locale.US,
470-
"CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size"
471-
+ " (%s bytes).",
478+
"CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum"
479+
+ " allowed size (%s bytes).",
472480
maxSize, maximumLinkSize)));
473481
}
474-
final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize;
482+
final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize;
475483
return Mono
476484
.just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer));
477485
})).onErrorMap(this::mapError);
@@ -885,7 +893,7 @@ private Mono<Void> sendFluxInternal(Flux<ServiceBusMessage> messages,
885893

886894
final Mono<List<ServiceBusMessageBatch>> batchList
887895
= getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> {
888-
final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
896+
final int batchSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
889897
final CreateMessageBatchOptions batchOptions
890898
= new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize);
891899
return messages.collect(
@@ -955,8 +963,9 @@ private static class AmqpMessageCollector
955963
AmqpMessageCollector(boolean isV2, CreateMessageBatchOptions options, Integer maxNumberOfBatches,
956964
ErrorContextProvider contextProvider, ServiceBusTracer tracer, MessageSerializer serializer) {
957965
this.maxNumberOfBatches = maxNumberOfBatches;
958-
this.maxMessageSize
959-
= options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : MAX_MESSAGE_LENGTH_BYTES;
966+
this.maxMessageSize = Math.min(
967+
options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : MAX_MESSAGE_LENGTH_BYTES,
968+
MAX_BATCH_SIZE_BYTES);
960969
this.contextProvider = contextProvider;
961970
this.tracer = tracer;
962971
this.serializer = serializer;

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
7676
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
7777
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
78+
import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_BATCH_SIZE_BYTES;
7879
import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES;
7980
import static org.junit.jupiter.api.Assertions.assertEquals;
8081
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -302,6 +303,105 @@ void createsMessageBatchWithSize(boolean isV2) {
302303
}).expectComplete().verify(DEFAULT_TIMEOUT);
303304
}
304305

306+
/**
307+
* Verifies that the batch max size is capped at MAX_BATCH_SIZE_BYTES (1 MB) when the link reports a larger size.
308+
* This simulates a Premium partitioned namespace where the link advertises up to 100 MB per-message.
309+
*/
310+
@ParameterizedTest
311+
@MethodSource("selectStack")
312+
void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) {
313+
// Arrange
314+
arrangeIfV2(isV2);
315+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
316+
317+
final AmqpSendLink link = mock(AmqpSendLink.class);
318+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
319+
320+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
321+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
322+
323+
// Act & Assert
324+
StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> {
325+
Assertions.assertEquals(MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes());
326+
}).expectComplete().verify(DEFAULT_TIMEOUT);
327+
}
328+
329+
/**
330+
* Verifies that the batch max size uses the link size when it is smaller than MAX_BATCH_SIZE_BYTES (1 MB).
331+
* This simulates a Standard namespace where the link advertises 256 KB.
332+
*/
333+
@ParameterizedTest
334+
@MethodSource("selectStack")
335+
void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) {
336+
// Arrange
337+
arrangeIfV2(isV2);
338+
int smallLinkSize = 256 * 1024; // 256 KB
339+
340+
final AmqpSendLink link = mock(AmqpSendLink.class);
341+
when(link.getLinkSize()).thenReturn(Mono.just(smallLinkSize));
342+
343+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
344+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
345+
346+
// Act & Assert
347+
StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> {
348+
Assertions.assertEquals(smallLinkSize, batch.getMaxSizeInBytes());
349+
}).expectComplete().verify(DEFAULT_TIMEOUT);
350+
}
351+
352+
/**
353+
* Verifies that user-specified maxSize exceeding the effective 1 MB cap throws an error.
354+
*/
355+
@ParameterizedTest
356+
@MethodSource("selectStack")
357+
void createBatchWithOptionsExceedingMaxBatchSizeCapThrowsError(boolean isV2) {
358+
// Arrange
359+
arrangeIfV2(isV2);
360+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
361+
int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds 1 MB cap
362+
363+
final AmqpSendLink link = mock(AmqpSendLink.class);
364+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
365+
366+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
367+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
368+
369+
final CreateMessageBatchOptions options
370+
= new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize);
371+
372+
// Act & Assert
373+
// The IllegalArgumentException from createMessageBatch is wrapped by mapError into ServiceBusException.
374+
StepVerifier.create(sender.createMessageBatch(options))
375+
.expectError(ServiceBusException.class)
376+
.verify(DEFAULT_TIMEOUT);
377+
}
378+
379+
/**
380+
* Verifies that user-specified maxSize smaller than the 1 MB cap is respected.
381+
*/
382+
@ParameterizedTest
383+
@MethodSource("selectStack")
384+
void createBatchWithOptionsSmallerThanMaxBatchSizeCapIsRespected(boolean isV2) {
385+
// Arrange
386+
arrangeIfV2(isV2);
387+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
388+
int requestedBatchSize = 500 * 1024; // 500 KB
389+
390+
final AmqpSendLink link = mock(AmqpSendLink.class);
391+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
392+
393+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
394+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
395+
396+
final CreateMessageBatchOptions options
397+
= new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize);
398+
399+
// Act & Assert
400+
StepVerifier.create(sender.createMessageBatch(options)).assertNext(batch -> {
401+
Assertions.assertEquals(requestedBatchSize, batch.getMaxSizeInBytes());
402+
}).expectComplete().verify(DEFAULT_TIMEOUT);
403+
}
404+
305405
@ParameterizedTest
306406
@MethodSource("selectStack")
307407
void scheduleMessageSizeTooBig(boolean isV2) {

0 commit comments

Comments
 (0)