Skip to content

Commit e88adab

Browse files
committed
HTM-2016: Create a separate SseEventBus bean for the viewer so viewers don't get aministrative braodcasts
1 parent 8ded6a3 commit e88adab

6 files changed

Lines changed: 171 additions & 7 deletions

File tree

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,23 @@ mvn -Pdeveloping,postgresql verify -Dspring-boot.run.profiles=dev,populate-testd
217217
* You can use `mvn -U org.codehaus.mojo:versions-maven-plugin:display-dependency-updates` to search for dependency
218218
updates
219219

220+
#### SSE streams
221+
222+
We have 2 SSE streams available, one in the admin API at: `api/admin/events/{clientId}` and one in the viewer API
223+
at: `api/events/{clientId}`. When using these streams you must make sure that you are using/injecting the correct
224+
`SseEventBus` for each API.
225+
226+
For the admin use the default `eventBus` bean, inject using: `SseEventBus eventBus` (defined using the `@EnableSseEventBus` annotation in the `TailormapConfig` class).
227+
for the viewer use the `viewerSseEventBus` bean, inject using: `@Qualifier("viewerSseEventBus") SseEventBus eventBus` (defined in the `TailormapConfig` class).
228+
See:
229+
- [ServerSentEventsAdminController](src/main/java/org/tailormap/api/controller/admin/ServerSentEventsAdminController.java) for the admin configuration
230+
- [ServerSentEventsController](src/main/java/org/tailormap/api/controller/ServerSentEventsController.java) for the viewer configuration
231+
- [TailormapConfig](src/main/java/org/tailormap/api/configuration/TailormapConfig.java) for the bean definitions
232+
233+
If you inject the wrong one you may not receive the events ypu want and ypu risk sending administrative events to the viewer.
234+
235+
```
236+
220237
## Releasing
221238
222239
### Prerequisites

src/main/java/org/tailormap/api/configuration/TailormapConfig.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,23 @@
55
*/
66
package org.tailormap.api.configuration;
77

8+
import ch.rasc.sse.eventbus.DataObjectConverter;
9+
import ch.rasc.sse.eventbus.DefaultDataObjectConverter;
10+
import ch.rasc.sse.eventbus.DefaultSubscriptionRegistry;
11+
import ch.rasc.sse.eventbus.DistributedEventBus;
12+
import ch.rasc.sse.eventbus.JacksonDataObjectConverter;
13+
import ch.rasc.sse.eventbus.ReplayStore;
14+
import ch.rasc.sse.eventbus.SseEventBus;
15+
import ch.rasc.sse.eventbus.SubscriptionRegistry;
816
import ch.rasc.sse.eventbus.config.EnableSseEventBus;
17+
import ch.rasc.sse.eventbus.config.SseEventBusConfigurer;
18+
import ch.rasc.sse.eventbus.observation.SseEventBusObservationConvention;
19+
import io.micrometer.observation.ObservationRegistry;
20+
import java.util.ArrayList;
21+
import java.util.List;
922
import java.util.Locale;
23+
import org.jspecify.annotations.Nullable;
24+
import org.springframework.beans.factory.annotation.Autowired;
1025
import org.springframework.beans.factory.annotation.Value;
1126
import org.springframework.boot.context.properties.ConfigurationProperties;
1227
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -15,6 +30,7 @@
1530
import org.springframework.scheduling.annotation.EnableScheduling;
1631
import org.springframework.web.servlet.LocaleResolver;
1732
import org.springframework.web.servlet.i18n.AcceptHeaderLocaleResolver;
33+
import tools.jackson.databind.ObjectMapper;
1834

1935
@Configuration
2036
@EnableConfigurationProperties
@@ -42,4 +58,43 @@ public LocaleResolver localeResolver() {
4258
resolver.setDefaultLocale(Locale.of(defaultLanguage));
4359
return resolver;
4460
}
61+
62+
/**
63+
* Define a ew viewer SseEventBus bean for viewer-specific SSE traffic.
64+
*
65+
* @return the viewerSseEventBus instance
66+
*/
67+
@Bean("viewerSseEventBus")
68+
public SseEventBus viewerSseEventBus(
69+
@Autowired(required = false) @Nullable SseEventBusConfigurer configurer,
70+
@Autowired(required = false) @Nullable ObjectMapper objectMapper,
71+
@Autowired(required = false) @Nullable List<DataObjectConverter> dataObjectConverters,
72+
@Autowired(required = false) @Nullable SubscriptionRegistry subscriptionRegistry,
73+
@Autowired(required = false) @Nullable ReplayStore replayStore,
74+
@Autowired(required = false) @Nullable ObservationRegistry observationRegistry,
75+
@Autowired(required = false) @Nullable SseEventBusObservationConvention observationConvention,
76+
@Autowired(required = false) @Nullable DistributedEventBus distributedEventBus) {
77+
78+
// Apply same defaults as DefaultSseEventBusConfiguration
79+
SseEventBusConfigurer config = configurer != null
80+
? configurer
81+
: new SseEventBusConfigurer() {
82+
/* defaults */
83+
};
84+
85+
SubscriptionRegistry registry =
86+
subscriptionRegistry != null ? subscriptionRegistry : new DefaultSubscriptionRegistry();
87+
88+
ReplayStore store = replayStore != null ? replayStore : config.replayStore();
89+
90+
List<DataObjectConverter> converters = dataObjectConverters != null ? dataObjectConverters : new ArrayList<>();
91+
if (objectMapper != null) {
92+
converters.add(new JacksonDataObjectConverter(objectMapper));
93+
} else {
94+
converters.add(new DefaultDataObjectConverter());
95+
}
96+
97+
return new SseEventBus(
98+
config, registry, converters, store, observationRegistry, observationConvention, distributedEventBus);
99+
}
45100
}

src/main/java/org/tailormap/api/controller/ServerSentEventsController.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import java.lang.invoke.MethodHandles;
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
15+
import org.springframework.beans.factory.annotation.Qualifier;
1516
import org.springframework.http.HttpStatus;
1617
import org.springframework.scheduling.annotation.Scheduled;
1718
import org.springframework.web.bind.annotation.GetMapping;
1819
import org.springframework.web.bind.annotation.PathVariable;
1920
import org.springframework.web.bind.annotation.RestController;
2021
import org.springframework.web.server.ResponseStatusException;
2122
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
23+
import org.tailormap.api.util.UUIDv7;
2224
import org.tailormap.api.viewer.model.ServerSentEventResponse;
2325
import tools.jackson.core.JacksonException;
2426
import tools.jackson.databind.SerializationFeature;
@@ -33,7 +35,7 @@ public class ServerSentEventsController {
3335

3436
private final JsonMapper jsonMapper;
3537

36-
public ServerSentEventsController(SseEventBus eventBus, JsonMapper jsonMapper) {
38+
public ServerSentEventsController(@Qualifier("viewerSseEventBus") SseEventBus eventBus, JsonMapper jsonMapper) {
3739
this.eventBus = eventBus;
3840
// force unindented/single line output for SSE messages, because we may have set
3941
// spring.jackson.serialization.indent_output=true for debugging/development/test
@@ -60,7 +62,9 @@ public SseEmitter sse(@PathVariable String clientId) {
6062

6163
@Scheduled(fixedRate = 60_000)
6264
public void keepAlive() throws JacksonException {
63-
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(
64-
new ServerSentEventResponse().eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE))));
65+
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(new ServerSentEventResponse()
66+
.eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE)
67+
.id(UUIDv7.randomV7())
68+
.details("keep viewer alive"))));
6569
}
6670
}

src/main/java/org/tailormap/api/controller/admin/ServerSentEventsAdminController.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
2222
import org.tailormap.api.admin.model.ServerSentEvent;
2323
import tools.jackson.core.JacksonException;
24+
import tools.jackson.databind.SerializationFeature;
2425
import tools.jackson.databind.json.JsonMapper;
2526

2627
@RestController
@@ -34,7 +35,16 @@ public class ServerSentEventsAdminController {
3435

3536
public ServerSentEventsAdminController(SseEventBus eventBus, JsonMapper jsonMapper) {
3637
this.eventBus = eventBus;
37-
this.jsonMapper = jsonMapper;
38+
// force unindented/single line output for SSE messages, because we may have set
39+
// spring.jackson.serialization.indent_output=true for debugging/development/test
40+
if (jsonMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
41+
this.jsonMapper = jsonMapper
42+
.rebuild()
43+
.configure(SerializationFeature.INDENT_OUTPUT, false)
44+
.build();
45+
} else {
46+
this.jsonMapper = jsonMapper;
47+
}
3848
}
3949

4050
/**

src/main/java/org/tailormap/api/service/CreateLayerExtractService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.jspecify.annotations.Nullable;
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
51+
import org.springframework.beans.factory.annotation.Qualifier;
5152
import org.springframework.beans.factory.annotation.Value;
5253
import org.springframework.scheduling.annotation.Async;
5354
import org.springframework.scheduling.annotation.Scheduled;
@@ -89,7 +90,9 @@ public class CreateLayerExtractService {
8990
private boolean exactWfsCounts;
9091

9192
public CreateLayerExtractService(
92-
SseEventBus eventBus, JsonMapper jsonMapper, FeatureSourceFactoryHelper featureSourceFactoryHelper) {
93+
@Qualifier("viewerSseEventBus") SseEventBus eventBus,
94+
JsonMapper jsonMapper,
95+
FeatureSourceFactoryHelper featureSourceFactoryHelper) {
9396
this.eventBus = eventBus;
9497
this.featureSourceFactoryHelper = featureSourceFactoryHelper;
9598
// force unindented/single line output for SSE messages, because we may have set

src/test/java/org/tailormap/api/controller/ServerSentEventsControllerIntegrationTest.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@
99
import static java.util.concurrent.TimeUnit.SECONDS;
1010
import static org.hamcrest.MatcherAssert.assertThat;
1111
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
1214
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
15+
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
1316
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
17+
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
1418
import static org.tailormap.api.TestRequestProcessor.setServletPath;
1519

1620
import java.lang.invoke.MethodHandles;
@@ -26,10 +30,13 @@
2630
import org.springframework.beans.factory.annotation.Value;
2731
import org.springframework.boot.webmvc.test.autoconfigure.AutoConfigureMockMvc;
2832
import org.springframework.http.MediaType;
33+
import org.springframework.security.test.context.support.WithMockUser;
2934
import org.springframework.test.web.servlet.MockMvc;
3035
import org.springframework.test.web.servlet.MvcResult;
36+
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
37+
import org.springframework.web.context.WebApplicationContext;
3138
import org.tailormap.api.annotation.PostgresIntegrationTest;
32-
import org.tailormap.api.viewer.model.ServerSentEventResponse;
39+
import org.tailormap.api.persistence.Group;
3340

3441
@PostgresIntegrationTest
3542
@AutoConfigureMockMvc
@@ -46,6 +53,12 @@ class ServerSentEventsControllerIntegrationTest {
4653
@Value("${tailormap-api.base-path}")
4754
private String apiBasePath;
4855

56+
@Value("${tailormap-api.admin.base-path}")
57+
private String adminBasePath;
58+
59+
@Autowired
60+
private WebApplicationContext context;
61+
4962
private MvcResult sseResult;
5063

5164
@BeforeEach
@@ -75,10 +88,72 @@ void should_send_keep_alive_messages_for_two_minutes() {
7588
});
7689
}
7790

91+
/** Check that at least 2 keep-alive messages arrive in 130 seconds. */
92+
@Test
93+
@WithMockUser(
94+
username = "admin",
95+
authorities = {Group.ADMIN})
96+
void admin_and_viewer_should_use_separate_sse_streams() throws Exception {
97+
// start admin sse stream
98+
MockMvc adminMockMvc = MockMvcBuilders.webAppContextSetup(context).build();
99+
final String adminSseUrl = adminBasePath + "/events/" + sseClientId;
100+
MvcResult adminSseResult = adminMockMvc
101+
.perform(get(adminSseUrl)
102+
.accept(MediaType.TEXT_EVENT_STREAM)
103+
.with(setServletPath(adminSseUrl))
104+
.acceptCharset(StandardCharsets.UTF_8))
105+
.andDo(print())
106+
.andExpect(status().isOk())
107+
.andExpect(request().asyncStarted())
108+
.andReturn();
109+
110+
Awaitility.await("waiting at least 2 minutes for keep-alive messages")
111+
.pollDelay(45, SECONDS)
112+
.pollInterval(15, SECONDS)
113+
.atLeast(1, MINUTES)
114+
.atMost(130, SECONDS)
115+
.logging(
116+
logPrinter -> logger.debug("Checking for keep-alive messages in SSE streams... {}", logPrinter))
117+
.untilAsserted(() -> {
118+
final String adminStream = adminSseResult.getResponse().getContentAsString();
119+
logger.debug("admin stream: {}", adminStream);
120+
assertThat(
121+
"There should be at least 2 keep-alive massages for the admin",
122+
count_keep_alive_messages(adminStream),
123+
greaterThanOrEqualTo(2));
124+
// and
125+
final String stream = sseResult.getResponse().getContentAsString();
126+
logger.debug("viewer stream: {}", stream);
127+
assertThat(
128+
"There should be at least 2 keep-alive massages for the viewer",
129+
count_keep_alive_messages(stream),
130+
greaterThanOrEqualTo(2));
131+
assertEquals(
132+
0,
133+
count_viewer_keep_alive_messages(stream),
134+
"There should no keep-alive massages for the admin in the viewer");
135+
assertNotEquals(
136+
count_keep_alive_messages(stream),
137+
count_viewer_keep_alive_messages(stream),
138+
"Admin keep-alive messages should not be sent to viewer SSE stream");
139+
});
140+
}
141+
78142
private int count_keep_alive_messages(String stream) {
79143
int count = 0;
80144
int index = 0;
81-
final String marker = "\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
145+
final String marker = "\"eventType\":\"keep-alive\"";
146+
while ((index = stream.indexOf(marker, index)) != -1) {
147+
count++;
148+
index += marker.length();
149+
}
150+
return count;
151+
}
152+
153+
private int count_viewer_keep_alive_messages(String stream) {
154+
int count = 0;
155+
int index = 0;
156+
final String marker = "\"details\":\"keep viewer alive\",\"eventType\":\"keep-alive\"";
82157
while ((index = stream.indexOf(marker, index)) != -1) {
83158
count++;
84159
index += marker.length();

0 commit comments

Comments
 (0)