Skip to content

Commit a3ae705

Browse files
authored
[fix][test] Fix flaky MessagePublishBufferThrottleTest.testBlockByPublishRateLimiting (#25365)
1 parent 543a75d commit a3ae705

1 file changed

Lines changed: 39 additions & 21 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020

2121
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
2222
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
23-
import static org.testng.Assert.fail;
24-
import java.util.concurrent.CompletableFuture;
23+
import io.opentelemetry.sdk.metrics.data.MetricData;
24+
import java.util.Collection;
2525
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.TimeoutException;
2726
import lombok.Cleanup;
2827
import org.apache.pulsar.broker.BrokerTestUtil;
2928
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -134,33 +133,32 @@ public void testBlockByPublishRateLimiting() throws Exception {
134133

135134
pulsarTestContext.getMockBookKeeper().addEntryDelay(5, TimeUnit.SECONDS);
136135

137-
// Block by publish buffer.
136+
// Block by publish buffer: 10 x 1MB messages with a 1MB buffer limit.
138137
byte[] payload = new byte[1024 * 1024];
139138
for (int i = 0; i < 10; i++) {
140139
producer.sendAsync(payload);
141140
}
142141

143-
Awaitility.await().untilAsserted(() -> assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1));
142+
// Wait for at least one pause event to be recorded.
143+
Awaitility.await().untilAsserted(
144+
() -> assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName.PAUSED, 1));
144145

145-
CompletableFuture<Void> flushFuture = producer.flushAsync();
146+
// Verify that no resume has happened yet while messages are still blocked.
147+
Awaitility.await().untilAsserted(
148+
() -> assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0));
146149

147-
// Block by publish rate.
148-
// After 1 second, the message buffer throttling will be lifted, but the rate limiting will still be in place.
149-
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1);
150-
assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
151-
152-
try {
153-
flushFuture.get(2, TimeUnit.SECONDS);
154-
fail("Should have timed out");
155-
} catch (TimeoutException e) {
156-
// Ok
157-
}
158-
159-
flushFuture.join();
150+
// Flush and wait for all messages to complete.
151+
producer.flush();
160152

153+
// After all messages are sent, the number of pauses and resumes should match:
154+
// every pause must eventually be followed by a resume.
161155
Awaitility.await().untilAsserted(() -> {
162-
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 10);
163-
assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 10);
156+
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
157+
long pausedCount = getMetricLongSumValue(metrics, ConnectionRateLimitOperationName.PAUSED);
158+
long resumedCount = getMetricLongSumValue(metrics, ConnectionRateLimitOperationName.RESUMED);
159+
Assert.assertTrue(pausedCount > 0, "Expected at least one pause event");
160+
Assert.assertEquals(pausedCount, resumedCount,
161+
"Paused and resumed counts should match after all messages are sent");
164162
});
165163
}
166164

@@ -206,4 +204,24 @@ private void assertRateLimitCounter(ConnectionRateLimitOperationName connectionR
206204
connectionRateLimitState.attributes, expectedCount);
207205
}
208206
}
207+
208+
private void assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName connectionRateLimitState,
209+
int minExpectedCount) {
210+
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
211+
assertMetricLongSumValue(metrics, BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
212+
connectionRateLimitState.attributes,
213+
actual -> assertThat(actual).isGreaterThanOrEqualTo(minExpectedCount));
214+
}
215+
216+
private long getMetricLongSumValue(Collection<MetricData> metrics,
217+
ConnectionRateLimitOperationName connectionRateLimitState) {
218+
var attributesMap = connectionRateLimitState.attributes.asMap();
219+
return metrics.stream()
220+
.filter(m -> m.getName().equals(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME))
221+
.flatMap(m -> m.getLongSumData().getPoints().stream())
222+
.filter(point -> point.getAttributes().asMap().equals(attributesMap))
223+
.mapToLong(io.opentelemetry.sdk.metrics.data.LongPointData::getValue)
224+
.findFirst()
225+
.orElse(0L);
226+
}
209227
}

0 commit comments

Comments
 (0)