Skip to content

Commit b9f87f3

Browse files
committed
server: Reload pending query plan requests on server startup
1 parent f98996f commit b9f87f3

3 files changed

Lines changed: 125 additions & 2 deletions

File tree

server/src/main/java/org/ebean/monitor/config/OnStart.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,32 @@
77
import jakarta.inject.Singleton;
88
import org.ebean.monitor.Application;
99
import org.ebean.monitor.cleanup.CleanupPartitions;
10+
import org.ebean.monitor.domain.DCaptureRequest;
1011
import org.ebean.monitor.domain.DJob;
12+
import org.ebean.monitor.domain.query.QDCaptureRequest;
1113
import org.ebean.monitor.ingest.PlanShapeBackfill;
14+
import org.ebean.monitor.web.MessageService;
1215
import org.slf4j.Logger;
1316
import org.slf4j.LoggerFactory;
1417

18+
import java.time.Duration;
19+
import java.time.Instant;
1520
import java.util.concurrent.TimeUnit;
1621

1722
@Singleton
1823
public class OnStart {
1924

2025
private static final Logger log = LoggerFactory.getLogger(OnStart.class);
26+
private static final long PENDING_STALE_MINUTES = 15L;
2127

2228
private final CleanupPartitions cleanupPartitions = new CleanupPartitions();
2329

2430
private final PlanShapeBackfill planShapeBackfill;
31+
private final MessageService messageService;
2532

26-
OnStart(PlanShapeBackfill planShapeBackfill) {
33+
OnStart(PlanShapeBackfill planShapeBackfill, MessageService messageService) {
2734
this.planShapeBackfill = planShapeBackfill;
35+
this.messageService = messageService;
2836
}
2937

3038
@PostConstruct
@@ -41,6 +49,32 @@ public void start() {
4149
private void initData() {
4250
GlobalMetrics.init();
4351
DJob.find.initRollup();
52+
rehydratePendingCaptures();
53+
}
54+
55+
/**
56+
* Re-push any uncollected capture requests from the DB into the in-memory queue.
57+
*/
58+
private void rehydratePendingCaptures() {
59+
final Instant from = Instant.now().minus(Duration.ofMinutes(PENDING_STALE_MINUTES));
60+
final var pending = new QDCaptureRequest()
61+
.collectedAt.isNull()
62+
.requestedAt.gt(from)
63+
.findList();
64+
int count = pushPendingCaptures(pending);
65+
if (count > 0) {
66+
log.info("rehydrated {} pending capture request(s) from DB into message queue", count);
67+
}
68+
}
69+
70+
int pushPendingCaptures(Iterable<DCaptureRequest> pending) {
71+
int count = 0;
72+
for (var r : pending) {
73+
final String env = r.env() != null ? r.env().getName() : MessageService.ANY_ENV;
74+
messageService.pushMessage(r.app().getName(), env, "qp:" + r.hash());
75+
count++;
76+
}
77+
return count;
4478
}
4579

4680
/**

server/src/main/java/org/ebean/monitor/ingest/PlanShapeBackfill.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class PlanShapeBackfill {
2727

2828
private final Database database;
2929

30-
PlanShapeBackfill(Database database) {
30+
public PlanShapeBackfill(Database database) {
3131
this.database = database;
3232
}
3333

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.ebean.monitor.config;
2+
3+
import org.ebean.monitor.api.MetricRequest;
4+
import org.ebean.monitor.domain.DApp;
5+
import org.ebean.monitor.domain.DCaptureRequest;
6+
import org.ebean.monitor.domain.DEnv;
7+
import org.ebean.monitor.ingest.PlanShapeBackfill;
8+
import org.ebean.monitor.web.MessageService;
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.time.Instant;
12+
import java.util.List;
13+
14+
import static org.assertj.core.api.Assertions.assertThat;
15+
16+
/**
17+
* Unit tests for {@link OnStart#pushPendingCaptures}: verifies that pending
18+
* capture requests are re-pushed into {@link MessageService} correctly on
19+
* startup, covering both env-specific and any-env cases.
20+
*/
21+
class OnStartRehydrateTest {
22+
23+
/** No-op backfill stub — PlanShapeBackfill.run() is never called by pushPendingCaptures. */
24+
private static final class NoopBackfill extends PlanShapeBackfill {
25+
NoopBackfill() { super(null); }
26+
@Override public int run() { return 0; }
27+
}
28+
29+
private final MessageService messageService = new MessageService();
30+
private final OnStart onStart = new OnStart(new NoopBackfill(), messageService);
31+
32+
private static MetricRequest poll(String app, String env) {
33+
return MetricRequest.builder().appName(app).environment(env).build();
34+
}
35+
36+
private static DCaptureRequest request(String appName, String envName, String hash) {
37+
DApp app = new DApp(appName);
38+
DEnv env = envName != null ? new DEnv(envName) : null;
39+
return new DCaptureRequest(app, hash)
40+
.setEnv(env)
41+
.setRequestedAt(Instant.now());
42+
}
43+
44+
@Test
45+
void envSpecific_rehydratedAndDeliveredToCorrectEnv() {
46+
var r = request("myapp", "test", "abc123");
47+
48+
int count = onStart.pushPendingCaptures(List.of(r));
49+
50+
assertThat(count).isEqualTo(1);
51+
assertThat(messageService.pendingResponse()).isTrue();
52+
// delivered to the matching env
53+
assertThat(messageService.responseBody(poll("myapp", "test"))).isEqualTo("v1|qp:abc123");
54+
// queue is now drained
55+
assertThat(messageService.pendingResponse()).isFalse();
56+
}
57+
58+
@Test
59+
void anyEnv_nullEnvOnRequest_usesAnyEnvSentinel() {
60+
var r = request("myapp", null, "def456");
61+
62+
onStart.pushPendingCaptures(List.of(r));
63+
64+
// any-env bucket: delivered regardless of which env polls
65+
assertThat(messageService.responseBody(poll("myapp", "prod"))).isEqualTo("v1|qp:def456");
66+
}
67+
68+
@Test
69+
void multipleRequests_allRehydrated() {
70+
var r1 = request("app1", "dev", "hash1");
71+
var r2 = request("app1", "dev", "hash2");
72+
var r3 = request("app2", null, "hash3");
73+
74+
int count = onStart.pushPendingCaptures(List.of(r1, r2, r3));
75+
76+
assertThat(count).isEqualTo(3);
77+
String body = messageService.responseBody(poll("app1", "dev"));
78+
assertThat(body).contains("qp:hash1").contains("qp:hash2");
79+
assertThat(messageService.responseBody(poll("app2", "test"))).isEqualTo("v1|qp:hash3");
80+
}
81+
82+
@Test
83+
void emptyList_returnsZero_noMessagesQueued() {
84+
int count = onStart.pushPendingCaptures(List.of());
85+
86+
assertThat(count).isEqualTo(0);
87+
assertThat(messageService.pendingResponse()).isFalse();
88+
}
89+
}

0 commit comments

Comments
 (0)