Skip to content

Commit c6be44c

Browse files
authored
[fix][broker] Fixes Inconsistent ServiceUnitStateData View (ExtensibleLoadManagerImpl only) (apache#24186)
1 parent 3bdc661 commit c6be44c

7 files changed

Lines changed: 91 additions & 19 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,4 @@ public ServiceUnitStateData(ServiceUnitState state, String dstBroker, boolean fo
7575
public static ServiceUnitState state(ServiceUnitStateData data) {
7676
return data == null ? ServiceUnitState.Init : data.state();
7777
}
78-
79-
@Override
80-
public boolean equals(Object o) {
81-
if (this == o) {
82-
return true;
83-
}
84-
85-
if (o == null || getClass() != o.getClass()) {
86-
return false;
87-
}
88-
89-
ServiceUnitStateData that = (ServiceUnitStateData) o;
90-
91-
return versionId == that.versionId;
92-
}
9378
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
2222
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
2323
import static org.testng.Assert.assertEquals;
24+
import static org.testng.Assert.assertNotEquals;
2425
import static org.testng.Assert.assertNull;
2526
import static org.assertj.core.api.Assertions.assertThat;
2627
import com.fasterxml.jackson.core.JsonProcessingException;
2728
import com.fasterxml.jackson.databind.ObjectMapper;
29+
import java.util.Map;
30+
import java.util.Optional;
2831
import org.apache.pulsar.common.util.ObjectMapperFactory;
2932
import org.testng.annotations.Test;
3033

@@ -76,4 +79,25 @@ public void jsonWriteAndReadTest() throws JsonProcessingException {
7679
ServiceUnitStateData dst = mapper.readValue(json, ServiceUnitStateData.class);
7780
assertEquals(dst, src);
7881
}
82+
83+
@Test
84+
public void equalsTest() {
85+
// record ServiceUnitStateData(
86+
// ServiceUnitState state, String dstBroker, String sourceBroker,
87+
// Map<String, Optional<String>> splitServiceUnitToDestBroker, boolean force,
88+
// long timestamp, long versionId) {
89+
var d1 = new ServiceUnitStateData(Assigning, "A", "B", Map.of("A", Optional.of("B")), true, 0, 1);
90+
var d2 = new ServiceUnitStateData(Assigning, "A", "B", Map.of("A", Optional.of("B")), true, 0, 1);
91+
assertEquals(d1, d2);
92+
var d3 = new ServiceUnitStateData(Assigning, "C", "B", 1);
93+
var d4 = new ServiceUnitStateData(Assigning, "A", "B", Map.of("A", Optional.of("C")), true, 0, 1);
94+
assertNotEquals(d1, d3);
95+
assertNotEquals(d1, d4);
96+
97+
var d5 = new ServiceUnitStateData(Assigning, "A", "B", Map.of("A", Optional.of("B")), true, 0, 2);
98+
assertNotEquals(d1, d5);
99+
100+
var d6 = new ServiceUnitStateData(Assigning, "C", "B", Map.of("A", Optional.of("B")), true, 0, 1);
101+
assertNotEquals(d1, d6);
102+
}
79103
}

pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@ public class Backoff {
4343
TimeUnit unitMandatoryStop, Clock clock) {
4444
this.initial = unitInitial.toMillis(initial);
4545
this.max = unitMax.toMillis(max);
46+
if (initial == 0 && max == 0 && mandatoryStop == 0) {
47+
this.mandatoryStopMade = true;
48+
}
4649
this.next = this.initial;
4750
this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
4851
this.clock = clock;
52+
this.firstBackoffTimeInMillis = 0;
4953
}
5054

5155
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
@@ -91,7 +95,11 @@ public void reduceToHalf() {
9195

9296
public void reset() {
9397
this.next = this.initial;
94-
this.mandatoryStopMade = false;
98+
if (initial == 0 && max == 0 && mandatoryStop == 0) {
99+
this.mandatoryStopMade = true;
100+
} else {
101+
this.mandatoryStopMade = false;
102+
}
95103
}
96104

97105
public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public class MetadataCacheConfig<T> {
3939
.setMax(3, TimeUnit.SECONDS)
4040
.setMandatoryStop(30, TimeUnit.SECONDS);
4141

42+
public static final BackoffBuilder NO_RETRY_BACKOFF_BUILDER =
43+
new BackoffBuilder().setInitialTime(0, TimeUnit.MILLISECONDS)
44+
.setMax(0, TimeUnit.SECONDS)
45+
.setMandatoryStop(0, TimeUnit.SECONDS);
46+
4247
/**
4348
* Specifies that active entries are eligible for automatic refresh once a fixed duration has
4449
* elapsed after the entry's creation, or the most recent replacement of its value.

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,12 @@ private void execute(Supplier<CompletableFuture<T>> op, String key, CompletableF
339339
objCache.synchronous().invalidate(key);
340340
long elapsed = System.currentTimeMillis() - backoff.getFirstBackoffTimeInMillis();
341341
if (backoff.isMandatoryStopMade()) {
342-
result.completeExceptionally(new TimeoutException(
343-
String.format("Timeout to update key %s. Elapsed time: %d ms", key, elapsed)));
342+
if (backoff.getFirstBackoffTimeInMillis() == 0) {
343+
result.completeExceptionally(ex.getCause());
344+
} else {
345+
result.completeExceptionally(new TimeoutException(
346+
String.format("Timeout to update key %s. Elapsed time: %d ms", key, elapsed)));
347+
}
344348
return null;
345349
}
346350
final var next = backoff.next();

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public MetadataStoreTableViewImpl(@NonNull Class<T> clazz,
111111
MetadataCacheConfig.<T>builder()
112112
.expireAfterWriteMillis(-1)
113113
.refreshAfterWriteMillis(CACHE_REFRESH_FREQUENCY_IN_MILLIS)
114+
.retryBackoff(MetadataCacheConfig.NO_RETRY_BACKOFF_BUILDER)
114115
.asyncReloadConsumer(this::consumeAsyncReload)
115116
.build());
116117
store.registerListener(this::handleNotification);
@@ -306,7 +307,15 @@ public CompletableFuture<Void> put(String key, T value) {
306307
throw new ConflictException(
307308
String.format("Failed to update from old:%s to value:%s", old, value));
308309
}
309-
}).thenCompose(__ -> doHandleNotification(path)); // immediately notify local tableview
310+
}).thenCompose(__ -> doHandleNotification(path)) // immediately notify local tableview
311+
.exceptionally(e -> {
312+
if (e.getCause() instanceof MetadataStoreException.BadVersionException) {
313+
throw FutureUtil.wrapToCompletionException(new ConflictException(
314+
String.format("Failed to update to value:%s", value)));
315+
}
316+
317+
throw FutureUtil.wrapToCompletionException(e.getCause());
318+
});
310319
}
311320

312321
public CompletableFuture<Void> delete(String key) {

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,4 +702,41 @@ public void testDefaultMetadataCacheConfig() {
702702
assertEquals(backoff.getMax(), 3000);
703703
assertEquals(backoff.getMandatoryStop(), 30_000);
704704
}
705+
706+
@Test
707+
public void testNoBackoffMetadataCacheConfig() {
708+
final var config = MetadataCacheConfig.builder().retryBackoff(
709+
MetadataCacheConfig.NO_RETRY_BACKOFF_BUILDER).build();
710+
711+
final var backoff = config.getRetryBackoff().create();
712+
713+
assertEquals(backoff.getInitial(), 0);
714+
assertEquals(backoff.getMax(),0);
715+
assertEquals(backoff.getMandatoryStop(), 0);
716+
assertTrue(backoff.isMandatoryStopMade());
717+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
718+
assertEquals(backoff.next(), 0);
719+
assertEquals(backoff.next(), 0);
720+
assertEquals(backoff.next(), 0);
721+
assertTrue(backoff.isMandatoryStopMade());
722+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
723+
724+
backoff.reduceToHalf();
725+
assertTrue(backoff.isMandatoryStopMade());
726+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
727+
assertEquals(backoff.next(), 0);
728+
assertEquals(backoff.next(), 0);
729+
assertEquals(backoff.next(), 0);
730+
assertTrue(backoff.isMandatoryStopMade());
731+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
732+
733+
backoff.reset();
734+
assertTrue(backoff.isMandatoryStopMade());
735+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
736+
assertEquals(backoff.next(), 0);
737+
assertEquals(backoff.next(), 0);
738+
assertEquals(backoff.next(), 0);
739+
assertTrue(backoff.isMandatoryStopMade());
740+
assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
741+
}
705742
}

0 commit comments

Comments
 (0)