Skip to content

Commit 40a0fac

Browse files
Cap ServiceBusMessageBatch size at 1 MB to match broker enforcement
The Service Bus broker enforces a 1 MB batch size limit regardless of the max-message-size advertised on the AMQP link. Premium partitioned namespaces advertise 100 MB on the link, causing tryAddMessage() to accept batches the broker will reject. Cap all four batch creation paths in ServiceBusSenderAsyncClient at 1 MB (MAX_BATCH_SIZE_BYTES). When the link reports a smaller size, use the link size. When a user requests a batch size exceeding 1 MB via CreateMessageBatchOptions, throw ServiceBusException. Tracking: azure-service-bus#708 ICM: 51000000793879
1 parent 66f5537 commit 40a0fac

2 files changed

Lines changed: 112 additions & 6 deletions

File tree

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
222222
* The default maximum allowable size, in bytes, for a batch to be sent.
223223
*/
224224
static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
225+
// Temporary workaround: Service Bus enforces a maximum batch size of 1 MB that is not
226+
// communicated via the AMQP link's max-message-size property. The link reports the per-message
227+
// limit (up to 100 MB for Premium partitioned), but batches are rejected above 1 MB.
228+
// Tracked by: https://github.com/Azure/azure-service-bus/issues/708
229+
static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024;
225230
private static final String TRANSACTION_LINK_NAME = "coordinator";
226231
private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]);
227232
private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions();
@@ -463,15 +468,15 @@ public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions
463468
final int maxSize = options.getMaximumSizeInBytes();
464469

465470
return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> {
466-
final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
471+
final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
467472
if (maxSize > maximumLinkSize) {
468473
return monoError(logger,
469474
new IllegalArgumentException(String.format(Locale.US,
470475
"CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size"
471476
+ " (%s bytes).",
472477
maxSize, maximumLinkSize)));
473478
}
474-
final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize;
479+
final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize;
475480
return Mono
476481
.just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer));
477482
})).onErrorMap(this::mapError);
@@ -812,7 +817,7 @@ private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDate
812817

813818
return tracer.traceScheduleMono("ServiceBus.scheduleMessage",
814819
getSendLinkWithRetry("schedule-message").flatMap(link -> link.getLinkSize().flatMap(size -> {
815-
final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
820+
final int maxSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
816821
return connectionProcessor.flatMap(connection -> connection.getManagementNode(entityName, entityType))
817822
.flatMap(
818823
managementNode -> managementNode
@@ -885,7 +890,7 @@ private Mono<Void> sendFluxInternal(Flux<ServiceBusMessage> messages,
885890

886891
final Mono<List<ServiceBusMessageBatch>> batchList
887892
= getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> {
888-
final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
893+
final int batchSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES);
889894
final CreateMessageBatchOptions batchOptions
890895
= new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize);
891896
return messages.collect(
@@ -955,8 +960,9 @@ private static class AmqpMessageCollector
955960
AmqpMessageCollector(boolean isV2, CreateMessageBatchOptions options, Integer maxNumberOfBatches,
956961
ErrorContextProvider contextProvider, ServiceBusTracer tracer, MessageSerializer serializer) {
957962
this.maxNumberOfBatches = maxNumberOfBatches;
958-
this.maxMessageSize
959-
= options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : MAX_MESSAGE_LENGTH_BYTES;
963+
this.maxMessageSize = Math.min(
964+
options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : MAX_MESSAGE_LENGTH_BYTES,
965+
MAX_BATCH_SIZE_BYTES);
960966
this.contextProvider = contextProvider;
961967
this.tracer = tracer;
962968
this.serializer = serializer;

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
7676
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
7777
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
78+
import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_BATCH_SIZE_BYTES;
7879
import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES;
7980
import static org.junit.jupiter.api.Assertions.assertEquals;
8081
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -302,6 +303,105 @@ void createsMessageBatchWithSize(boolean isV2) {
302303
}).expectComplete().verify(DEFAULT_TIMEOUT);
303304
}
304305

306+
/**
307+
* Verifies that the batch max size is capped at MAX_BATCH_SIZE_BYTES (1 MB) when the link reports a larger size.
308+
* This simulates a Premium partitioned namespace where the link advertises up to 100 MB per-message.
309+
*/
310+
@ParameterizedTest
311+
@MethodSource("selectStack")
312+
void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) {
313+
// Arrange
314+
arrangeIfV2(isV2);
315+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
316+
317+
final AmqpSendLink link = mock(AmqpSendLink.class);
318+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
319+
320+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
321+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
322+
323+
// Act & Assert
324+
StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> {
325+
Assertions.assertEquals(MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes());
326+
}).expectComplete().verify(DEFAULT_TIMEOUT);
327+
}
328+
329+
/**
330+
* Verifies that the batch max size uses the link size when it is smaller than MAX_BATCH_SIZE_BYTES (1 MB).
331+
* This simulates a Standard namespace where the link advertises 256 KB.
332+
*/
333+
@ParameterizedTest
334+
@MethodSource("selectStack")
335+
void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) {
336+
// Arrange
337+
arrangeIfV2(isV2);
338+
int smallLinkSize = 256 * 1024; // 256 KB
339+
340+
final AmqpSendLink link = mock(AmqpSendLink.class);
341+
when(link.getLinkSize()).thenReturn(Mono.just(smallLinkSize));
342+
343+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
344+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
345+
346+
// Act & Assert
347+
StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> {
348+
Assertions.assertEquals(smallLinkSize, batch.getMaxSizeInBytes());
349+
}).expectComplete().verify(DEFAULT_TIMEOUT);
350+
}
351+
352+
/**
353+
* Verifies that user-specified maxSize exceeding the effective 1 MB cap throws an error.
354+
*/
355+
@ParameterizedTest
356+
@MethodSource("selectStack")
357+
void createBatchWithOptionsExceedingMaxBatchSizeCapThrowsError(boolean isV2) {
358+
// Arrange
359+
arrangeIfV2(isV2);
360+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
361+
int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds 1 MB cap
362+
363+
final AmqpSendLink link = mock(AmqpSendLink.class);
364+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
365+
366+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
367+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
368+
369+
final CreateMessageBatchOptions options
370+
= new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize);
371+
372+
// Act & Assert
373+
// The IllegalArgumentException from createMessageBatch is wrapped by mapError into ServiceBusException.
374+
StepVerifier.create(sender.createMessageBatch(options))
375+
.expectError(ServiceBusException.class)
376+
.verify(DEFAULT_TIMEOUT);
377+
}
378+
379+
/**
380+
* Verifies that user-specified maxSize smaller than the 1 MB cap is respected.
381+
*/
382+
@ParameterizedTest
383+
@MethodSource("selectStack")
384+
void createBatchWithOptionsSmallerThanMaxBatchSizeCapIsRespected(boolean isV2) {
385+
// Arrange
386+
arrangeIfV2(isV2);
387+
int largeLinkSize = 100 * 1024 * 1024; // 100 MB
388+
int requestedBatchSize = 500 * 1024; // 500 KB
389+
390+
final AmqpSendLink link = mock(AmqpSendLink.class);
391+
when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize));
392+
393+
when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(),
394+
eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link));
395+
396+
final CreateMessageBatchOptions options
397+
= new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize);
398+
399+
// Act & Assert
400+
StepVerifier.create(sender.createMessageBatch(options)).assertNext(batch -> {
401+
Assertions.assertEquals(requestedBatchSize, batch.getMaxSizeInBytes());
402+
}).expectComplete().verify(DEFAULT_TIMEOUT);
403+
}
404+
305405
@ParameterizedTest
306406
@MethodSource("selectStack")
307407
void scheduleMessageSizeTooBig(boolean isV2) {

0 commit comments

Comments
 (0)