Skip to content

Commit 7f74565

Browse files
j7nw4rJohnathan WalkerCopilot
authored
Fix Spring sub-level event-hub-name override and EventContext checkpoint offsetString propagation (#49076)
* Add regression test and fix for missing offsetString in EventContext Added EventContextTest.updateCheckpointAsyncSetsOffsetString() which verifies that offsetString is populated on the Checkpoint passed to the store. This test fails without the fix (offsetString is null) and passes with it. Fix: added setOffsetString(eventData.getOffsetString()) in EventContext.updateCheckpointAsync(), matching the pattern already used in EventBatchContext. Fixes #46752 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add BlobCheckpointStore offset fallback and CHANGELOG entries BlobCheckpointStore.updateCheckpoint() now falls back to the deprecated Checkpoint.getOffset() (Long) when getOffsetString() is null/empty, so callers that build Checkpoint instances using the legacy long offset still have the offset persisted to blob metadata. The validation guard was broadened to accept offsetString-only checkpoints. Added regression tests covering: offset fallback, offsetString preference, both populated, offsetString-only, and sequenceNumber-only. Also added a CHANGELOG entry to azure-messaging-eventhubs for the EventContext.updateCheckpointAsync() offsetString fix from the prior commit. Refs #46752 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix sub-level event-hub-name override for Spring Event Hubs starter When spring.cloud.azure.eventhubs.consumer.event-hub-name or producer.event-hub-name is configured, the autoconfiguration now selects the dedicated client builder path instead of reusing the parent EventHubClientBuilder, ensuring the override actually takes effect. Fixes #43593 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Move CHANGELOG entry to sdk/spring/CHANGELOG.md per review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address Copilot review comments: clarify JavaDoc and test comment - BlobCheckpointStore.updateCheckpoint JavaDoc now documents that offsetString is a supported value and is preferred over the deprecated offset when both are populated. - BlobCheckpointStoreTests.testUpdateCheckpointOffsetStringOnlyIsValid comment clarifies this is a behavior change (previously offsetString-only checkpoints were rejected by the validation guard). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * docs(eventhubs-checkpointstore-blob): correct updateCheckpoint @return tag The updateCheckpoint override returns Mono<Void>; the @return tag still claimed it produced "The new ETag on successful update", which has never matched the contract on this method or the parent CheckpointStore interface. Match the interface wording instead. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Johnathan Walker <johwalker@microsoft.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 91a3cc7 commit 7f74565

11 files changed

Lines changed: 278 additions & 32 deletions

File tree

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
### Bugs Fixed
1010

11+
- Fixed `BlobCheckpointStore.updateCheckpoint` so that it falls back to the deprecated
12+
`Checkpoint.getOffset()` (Long) value when `Checkpoint.getOffsetString()` is not populated. This restores writing of
13+
the `offset` blob metadata for callers that still build `Checkpoint` instances using the legacy long offset, fixing
14+
a regression introduced in 1.21.0 that broke partition switch-over and consumer-group lag monitoring.
15+
([#46752](https://github.com/Azure/azure-sdk-for-java/issues/46752))
16+
1117
### Other Changes
1218

1319
## 1.21.6 (2026-05-05)

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,22 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
238238
/**
239239
* Updates the checkpoint in Storage Blobs for a partition.
240240
*
241-
* @param checkpoint Checkpoint information containing sequence number and offset to be stored for this partition.
242-
* @return The new ETag on successful update.
241+
* <p>At least one of {@code sequenceNumber}, {@code offsetString}, or the deprecated {@code offset} must be
242+
* populated on the supplied {@link Checkpoint}. When both {@code offsetString} and the deprecated {@code offset}
243+
* are populated, {@code offsetString} is preferred and persisted as the offset metadata value.</p>
244+
*
245+
* @param checkpoint Checkpoint information containing the sequence number and/or offset (as {@code offsetString}
246+
* or the deprecated {@code offset}) to be stored for this partition.
247+
* @return A {@link Mono} that completes when the checkpoint metadata has been persisted.
243248
*/
244249
@Override
245250
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
246-
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
251+
if (checkpoint == null
252+
|| (checkpoint.getSequenceNumber() == null
253+
&& checkpoint.getOffset() == null
254+
&& CoreUtils.isNullOrEmpty(checkpoint.getOffsetString()))) {
247255
throw LOGGER.logExceptionAsWarning(Exceptions.propagate(new IllegalStateException(
248-
"Both sequence number and offset cannot be null when updating a checkpoint")));
256+
"At least one of sequence number, offset, or offsetString must be provided when updating a checkpoint")));
249257
}
250258

251259
String partitionId = checkpoint.getPartitionId();
@@ -259,8 +267,14 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
259267
String sequenceNumber
260268
= checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());
261269

270+
// Prefer offsetString when populated; otherwise fall back to the deprecated offset (Long) so callers that
271+
// still build Checkpoint instances using setOffset(Long) continue to persist the offset metadata.
272+
String offset = CoreUtils.isNullOrEmpty(checkpoint.getOffsetString())
273+
? Objects.toString(checkpoint.getOffset(), null)
274+
: checkpoint.getOffsetString();
275+
262276
metadata.put(SEQUENCE_NUMBER, sequenceNumber);
263-
metadata.put(OFFSET, checkpoint.getOffsetString());
277+
metadata.put(OFFSET, offset);
264278
BlobAsyncClient blobAsyncClient = blobClients.get(blobName);
265279

266280
return blobAsyncClient.exists().flatMap(exists -> {

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreTests.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.junit.jupiter.api.condition.DisabledOnJre;
2626
import org.junit.jupiter.api.condition.JRE;
27+
import org.mockito.ArgumentCaptor;
2728
import org.mockito.ArgumentMatchers;
2829
import org.mockito.Mock;
2930
import org.mockito.Mockito;
@@ -46,6 +47,7 @@
4647
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.OWNER_ID;
4748
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.SEQUENCE_NUMBER;
4849
import static org.junit.jupiter.api.Assertions.assertEquals;
50+
import static org.junit.jupiter.api.Assertions.assertNull;
4951
import static org.junit.jupiter.api.Assertions.assertThrows;
5052
import static org.mockito.ArgumentMatchers.any;
5153
import static org.mockito.ArgumentMatchers.anyMap;
@@ -261,6 +263,109 @@ public void testUpdateCheckpointInvalid() {
261263
assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(new Checkpoint()));
262264
}
263265

266+
/**
267+
* Tests that {@link BlobCheckpointStore#updateCheckpoint(Checkpoint)} falls back to the deprecated
268+
* {@link Checkpoint#getOffset()} value when {@link Checkpoint#getOffsetString()} is not provided. Reproduces the
269+
* regression reported in https://github.com/Azure/azure-sdk-for-java/issues/46752.
270+
*/
271+
@Test
272+
public void testUpdateCheckpointFallsBackToOffsetWhenOffsetStringMissing() {
273+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
274+
.setEventHubName("eh")
275+
.setConsumerGroup("cg")
276+
.setPartitionId("0")
277+
.setSequenceNumber(2L)
278+
.setOffset(100L));
279+
280+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
281+
assertEquals("100", captured.get(OFFSET));
282+
}
283+
284+
/**
285+
* Tests that {@link BlobCheckpointStore#updateCheckpoint(Checkpoint)} writes the {@code offsetString} value into
286+
* blob metadata when only {@link Checkpoint#setOffsetString(String)} has been populated.
287+
*/
288+
@Test
289+
public void testUpdateCheckpointUsesOffsetStringWhenProvided() {
290+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
291+
.setEventHubName("eh")
292+
.setConsumerGroup("cg")
293+
.setPartitionId("0")
294+
.setSequenceNumber(2L)
295+
.setOffsetString("offset-string-value"));
296+
297+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
298+
assertEquals("offset-string-value", captured.get(OFFSET));
299+
}
300+
301+
/**
302+
* Tests that when both {@code offset} and {@code offsetString} are populated, {@code offsetString} is preferred.
303+
*/
304+
@Test
305+
public void testUpdateCheckpointPrefersOffsetStringOverOffset() {
306+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
307+
.setEventHubName("eh")
308+
.setConsumerGroup("cg")
309+
.setPartitionId("0")
310+
.setSequenceNumber(2L)
311+
.setOffset(100L)
312+
.setOffsetString("offset-string-value"));
313+
314+
assertEquals("offset-string-value", captured.get(OFFSET));
315+
}
316+
317+
/**
318+
* Tests that an {@code offsetString}-only checkpoint is accepted by the validation guard. This is a behavior
319+
* change from the prior implementation, which only inspected {@code sequenceNumber} and the deprecated
320+
* {@code offset} (Long) and would have rejected a checkpoint that supplied only {@code offsetString}.
321+
*/
322+
@Test
323+
public void testUpdateCheckpointOffsetStringOnlyIsValid() {
324+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
325+
.setEventHubName("eh")
326+
.setConsumerGroup("cg")
327+
.setPartitionId("0")
328+
.setOffsetString("offset-string-value"));
329+
330+
assertNull(captured.get(SEQUENCE_NUMBER));
331+
assertEquals("offset-string-value", captured.get(OFFSET));
332+
}
333+
334+
/**
335+
* Tests that a checkpoint with only {@code sequenceNumber} populated still succeeds and writes a {@code null}
336+
* offset metadata value, preserving prior behavior.
337+
*/
338+
@Test
339+
public void testUpdateCheckpointSequenceNumberOnly() {
340+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
341+
.setEventHubName("eh")
342+
.setConsumerGroup("cg")
343+
.setPartitionId("0")
344+
.setSequenceNumber(2L));
345+
346+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
347+
assertNull(captured.get(OFFSET));
348+
}
349+
350+
@SuppressWarnings("unchecked")
351+
private Map<String, String> captureUpdateCheckpointMetadata(Checkpoint checkpoint) {
352+
final String legacyPrefix = getLegacyPrefix(checkpoint.getFullyQualifiedNamespace(),
353+
checkpoint.getEventHubName(), checkpoint.getConsumerGroup());
354+
final String blobName = legacyPrefix + CHECKPOINT_PATH + checkpoint.getPartitionId();
355+
356+
when(blobContainerAsyncClient.getBlobAsyncClient(blobName)).thenReturn(blobAsyncClient);
357+
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
358+
when(blobAsyncClient.exists()).thenReturn(Mono.just(true));
359+
when(blobAsyncClient.setMetadata(ArgumentMatchers.<Map<String, String>>any())).thenReturn(Mono.empty());
360+
361+
BlobCheckpointStore store = new BlobCheckpointStore(blobContainerAsyncClient);
362+
StepVerifier.create(store.updateCheckpoint(checkpoint)).verifyComplete();
363+
364+
ArgumentCaptor<Map<String, String>> captor = ArgumentCaptor.forClass(Map.class);
365+
Mockito.verify(blobAsyncClient).setMetadata(captor.capture());
366+
return captor.getValue();
367+
}
368+
264369
/**
265370
* Tests that will update checkpoint if one does not exist.
266371
*/

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
### Bugs Fixed
1010

11+
- Fixed `EventContext.updateCheckpointAsync()` so that the `offsetString` from the received `EventData` is propagated
12+
to the `Checkpoint` passed to the `CheckpointStore`. Previously only the deprecated `offset` (Long) was set, which
13+
caused checkpoint stores that read `offsetString` (such as `BlobCheckpointStore`) to persist a `null` offset, breaking
14+
partition switch-over and consumer-group lag monitoring.
15+
([#46752](https://github.com/Azure/azure-sdk-for-java/issues/46752))
16+
1117
### Other Changes
1218

1319
## 5.21.4 (2026-05-05)

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public Mono<Void> updateCheckpointAsync() {
9494
.setConsumerGroup(partitionContext.getConsumerGroup())
9595
.setPartitionId(partitionContext.getPartitionId())
9696
.setSequenceNumber(eventData.getSequenceNumber())
97+
.setOffsetString(eventData.getOffsetString())
9798
.setOffset(eventData.getOffset());
9899
return this.checkpointStore.updateCheckpoint(checkpoint);
99100
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.eventhubs.models;
5+
6+
import com.azure.messaging.eventhubs.CheckpointStore;
7+
import com.azure.messaging.eventhubs.EventData;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.mockito.ArgumentCaptor;
11+
import org.mockito.Mock;
12+
import org.mockito.MockitoAnnotations;
13+
import reactor.core.publisher.Mono;
14+
import reactor.test.StepVerifier;
15+
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertNotNull;
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.when;
21+
22+
/**
23+
* Tests for {@link EventContext}.
24+
*/
25+
class EventContextTest {
26+
private final PartitionContext partitionContext
27+
= new PartitionContext("TEST_NAMESPACE", "TEST_EVENT_HUB", "TEST_DEFAULT_GROUP", "TEST_PARTITION_ID");
28+
29+
@Mock
30+
private CheckpointStore checkpointStore;
31+
@Mock
32+
private EventData eventData;
33+
34+
@BeforeEach
35+
void beforeEach() {
36+
MockitoAnnotations.initMocks(this);
37+
}
38+
39+
/**
40+
* Verifies that updateCheckpointAsync sets offsetString on the checkpoint.
41+
* Regression test for https://github.com/Azure/azure-sdk-for-java/issues/46752
42+
* where only setOffset(Long) was called, causing BlobCheckpointStore to store
43+
* null offset because it reads getOffsetString().
44+
*/
45+
@Test
46+
void updateCheckpointAsyncSetsOffsetString() {
47+
// Arrange
48+
final Long sequenceNumber = 10L;
49+
final Long offset = 15L;
50+
final String offsetString = "15";
51+
52+
when(eventData.getSequenceNumber()).thenReturn(sequenceNumber);
53+
when(eventData.getOffset()).thenReturn(offset);
54+
when(eventData.getOffsetString()).thenReturn(offsetString);
55+
when(checkpointStore.updateCheckpoint(any(Checkpoint.class))).thenReturn(Mono.empty());
56+
57+
final EventContext context = new EventContext(partitionContext, eventData, checkpointStore, null);
58+
59+
// Act
60+
StepVerifier.create(context.updateCheckpointAsync()).verifyComplete();
61+
62+
// Assert - offsetString must be set on the checkpoint passed to the store
63+
ArgumentCaptor<Checkpoint> captor = ArgumentCaptor.forClass(Checkpoint.class);
64+
verify(checkpointStore).updateCheckpoint(captor.capture());
65+
66+
Checkpoint captured = captor.getValue();
67+
assertEquals(partitionContext.getFullyQualifiedNamespace(), captured.getFullyQualifiedNamespace());
68+
assertEquals(partitionContext.getEventHubName(), captured.getEventHubName());
69+
assertEquals(partitionContext.getConsumerGroup(), captured.getConsumerGroup());
70+
assertEquals(partitionContext.getPartitionId(), captured.getPartitionId());
71+
assertEquals(sequenceNumber, captured.getSequenceNumber());
72+
assertEquals(offset, captured.getOffset());
73+
assertNotNull(captured.getOffsetString(), "offsetString must not be null - BlobCheckpointStore depends on it");
74+
assertEquals(offsetString, captured.getOffsetString());
75+
}
76+
}

sdk/spring/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ This section includes changes in `spring-cloud-azure-appconfiguration-config` mo
106106
This section includes changes in `azure-spring-data-cosmos` module.
107107
Please refer to [azure-spring-data-cosmos/CHANGELOG.md](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/spring/azure-spring-data-cosmos/CHANGELOG.md#720-2026-04-17) for more details.
108108

109+
### Spring Cloud Azure Autoconfigure
110+
This section includes changes in `spring-cloud-azure-autoconfigure` module.
111+
112+
#### Bugs Fixed
113+
114+
- Fixed a bug where the sub-level `event-hub-name` property under `spring.cloud.azure.eventhubs.consumer` or `spring.cloud.azure.eventhubs.producer` was ignored when the base-level `spring.cloud.azure.eventhubs.event-hub-name` was also configured, causing the produced clients to connect to the base event hub instead of the overridden one. [#43593](https://github.com/Azure/azure-sdk-for-java/issues/43593)
115+
109116
## 6.2.0 (2026-03-25)
110117
- This release is compatible with Spring Boot 3.5.0-3.5.8. (Note: 3.5.x (x>8) should be supported, but they aren't tested with this release.)
111118
- This release is compatible with Spring Cloud 2025.0.0. (Note: 2025.0.x (x>0) should be supported, but they aren't tested with this release.)

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsConsumerClientConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
@ConditionalOnProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = "consumer-group")
4343
class AzureEventHubsConsumerClientConfiguration {
4444

45-
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace" })
45+
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace", "event-hub-name" })
4646
@ConditionalOnBean(EventHubClientBuilder.class)
4747
@Configuration(proxyBeanMethods = false)
4848
static class SharedConsumerConnectionConfiguration {
@@ -69,7 +69,7 @@ EventHubConsumerClient eventHubConsumerClient(EventHubClientBuilder builder) {
6969
}
7070
}
7171

72-
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace" })
72+
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace", "event-hub-name" })
7373
@Configuration(proxyBeanMethods = false)
7474
static class DedicatedConsumerConnectionConfiguration {
7575

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsProducerClientConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs", name = { "event-hub-name", "producer.event-hub-name" })
3535
class AzureEventHubsProducerClientConfiguration {
3636

37-
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace" })
37+
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace", "event-hub-name" })
3838
@ConditionalOnBean(EventHubClientBuilder.class)
3939
@Configuration(proxyBeanMethods = false)
4040
static class SharedProducerConnectionConfiguration {
@@ -51,7 +51,7 @@ EventHubProducerClient eventHubProducerClient(EventHubClientBuilder builder) {
5151
}
5252
}
5353

54-
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace" })
54+
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace", "event-hub-name" })
5555
@Configuration(proxyBeanMethods = false)
5656
static class DedicatedProducerConnectionConfiguration {
5757

0 commit comments

Comments
 (0)