Skip to content

Commit 01f4b04

Browse files
committed
HTM-2016: Create a separate SseEventBus bean for the viewer so viewers don't get aministrative braodcasts
1 parent 8ded6a3 commit 01f4b04

7 files changed

Lines changed: 206 additions & 17 deletions

File tree

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,23 @@ mvn -Pdeveloping,postgresql verify -Dspring-boot.run.profiles=dev,populate-testd
217217
* You can use `mvn -U org.codehaus.mojo:versions-maven-plugin:display-dependency-updates` to search for dependency
218218
updates
219219

220+
#### SSE streams
221+
222+
We have 2 SSE streams available, one in the admin API at: `api/admin/events/{clientId}` and one in the viewer API
223+
at: `api/events/{clientId}`. When using these streams you must make sure that you are using/injecting the correct
224+
`SseEventBus` for each API.
225+
226+
For the admin use the default `eventBus` bean, inject using: `SseEventBus eventBus` (defined using the
227+
`@EnableSseEventBus` annotation in the `TailormapConfig` class).
228+
For the viewer use the `viewerSseEventBus` bean, inject using: `@Qualifier("viewerSseEventBus") SseEventBus eventBus`
229+
(defined in the `TailormapConfig` class).
230+
See:
231+
- [ServerSentEventsAdminController](src/main/java/org/tailormap/api/controller/admin/ServerSentEventsAdminController.java) for the admin configuration
232+
- [ServerSentEventsController](src/main/java/org/tailormap/api/controller/ServerSentEventsController.java) for the viewer configuration
233+
- [TailormapConfig](src/main/java/org/tailormap/api/configuration/TailormapConfig.java) for the bean definitions
234+
235+
If you inject the wrong one you may not receive the events you want, and you risk sending administrative events to the viewer.
236+
220237
## Releasing
221238

222239
### Prerequisites

src/main/java/org/tailormap/api/configuration/TailormapConfig.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,23 @@
55
*/
66
package org.tailormap.api.configuration;
77

8+
import ch.rasc.sse.eventbus.DataObjectConverter;
9+
import ch.rasc.sse.eventbus.DefaultDataObjectConverter;
10+
import ch.rasc.sse.eventbus.DefaultSubscriptionRegistry;
11+
import ch.rasc.sse.eventbus.DistributedEventBus;
12+
import ch.rasc.sse.eventbus.JacksonDataObjectConverter;
13+
import ch.rasc.sse.eventbus.ReplayStore;
14+
import ch.rasc.sse.eventbus.SseEventBus;
15+
import ch.rasc.sse.eventbus.SubscriptionRegistry;
816
import ch.rasc.sse.eventbus.config.EnableSseEventBus;
17+
import ch.rasc.sse.eventbus.config.SseEventBusConfigurer;
18+
import ch.rasc.sse.eventbus.observation.SseEventBusObservationConvention;
19+
import io.micrometer.observation.ObservationRegistry;
20+
import java.util.ArrayList;
21+
import java.util.List;
922
import java.util.Locale;
23+
import org.jspecify.annotations.Nullable;
24+
import org.springframework.beans.factory.annotation.Autowired;
1025
import org.springframework.beans.factory.annotation.Value;
1126
import org.springframework.boot.context.properties.ConfigurationProperties;
1227
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -15,6 +30,7 @@
1530
import org.springframework.scheduling.annotation.EnableScheduling;
1631
import org.springframework.web.servlet.LocaleResolver;
1732
import org.springframework.web.servlet.i18n.AcceptHeaderLocaleResolver;
33+
import tools.jackson.databind.ObjectMapper;
1834

1935
@Configuration
2036
@EnableConfigurationProperties
@@ -42,4 +58,46 @@ public LocaleResolver localeResolver() {
4258
resolver.setDefaultLocale(Locale.of(defaultLanguage));
4359
return resolver;
4460
}
61+
62+
/**
63+
* Define a new viewer SseEventBus bean for viewer-specific SSE traffic.
64+
*
65+
* @return the viewerSseEventBus instance
66+
*/
67+
@Bean("viewerSseEventBus")
68+
public SseEventBus viewerSseEventBus(
69+
@Autowired(required = false) @Nullable SseEventBusConfigurer configurer,
70+
@Autowired(required = false) @Nullable ObjectMapper objectMapper,
71+
@Autowired(required = false) @Nullable List<DataObjectConverter> dataObjectConverters,
72+
@Autowired(required = false) @Nullable SubscriptionRegistry subscriptionRegistry,
73+
@Autowired(required = false) @Nullable ReplayStore replayStore,
74+
@Autowired(required = false) @Nullable ObservationRegistry observationRegistry,
75+
@Autowired(required = false) @Nullable SseEventBusObservationConvention observationConvention,
76+
@Autowired(required = false) @Nullable DistributedEventBus distributedEventBus) {
77+
78+
// Apply same defaults as DefaultSseEventBusConfiguration
79+
SseEventBusConfigurer config = configurer != null
80+
? configurer
81+
: new SseEventBusConfigurer() {
82+
/* defaults */
83+
};
84+
85+
SubscriptionRegistry registry =
86+
subscriptionRegistry != null ? subscriptionRegistry : new DefaultSubscriptionRegistry();
87+
88+
ReplayStore store = replayStore != null ? replayStore : config.replayStore();
89+
90+
List<DataObjectConverter> converters =
91+
dataObjectConverters != null ? new ArrayList<>(dataObjectConverters) : new ArrayList<>();
92+
if (converters.isEmpty()) {
93+
if (objectMapper != null) {
94+
converters.add(new JacksonDataObjectConverter(objectMapper));
95+
} else {
96+
converters.add(new DefaultDataObjectConverter());
97+
}
98+
}
99+
100+
return new SseEventBus(
101+
config, registry, converters, store, observationRegistry, observationConvention, distributedEventBus);
102+
}
45103
}

src/main/java/org/tailormap/api/controller/ServerSentEventsController.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,18 @@
1010
import ch.rasc.sse.eventbus.SseEvent;
1111
import ch.rasc.sse.eventbus.SseEventBus;
1212
import java.lang.invoke.MethodHandles;
13+
import java.util.Collections;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
16+
import org.springframework.beans.factory.annotation.Qualifier;
1517
import org.springframework.http.HttpStatus;
1618
import org.springframework.scheduling.annotation.Scheduled;
1719
import org.springframework.web.bind.annotation.GetMapping;
1820
import org.springframework.web.bind.annotation.PathVariable;
1921
import org.springframework.web.bind.annotation.RestController;
2022
import org.springframework.web.server.ResponseStatusException;
2123
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
24+
import org.tailormap.api.util.UUIDv7;
2225
import org.tailormap.api.viewer.model.ServerSentEventResponse;
2326
import tools.jackson.core.JacksonException;
2427
import tools.jackson.databind.SerializationFeature;
@@ -33,7 +36,7 @@ public class ServerSentEventsController {
3336

3437
private final JsonMapper jsonMapper;
3538

36-
public ServerSentEventsController(SseEventBus eventBus, JsonMapper jsonMapper) {
39+
public ServerSentEventsController(@Qualifier("viewerSseEventBus") SseEventBus eventBus, JsonMapper jsonMapper) {
3740
this.eventBus = eventBus;
3841
// force unindented/single line output for SSE messages, because we may have set
3942
// spring.jackson.serialization.indent_output=true for debugging/development/test
@@ -60,7 +63,9 @@ public SseEmitter sse(@PathVariable String clientId) {
6063

6164
@Scheduled(fixedRate = 60_000)
6265
public void keepAlive() throws JacksonException {
63-
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(
64-
new ServerSentEventResponse().eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE))));
66+
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(new ServerSentEventResponse()
67+
.eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE)
68+
.id(UUIDv7.randomV7())
69+
.details(Collections.EMPTY_MAP))));
6570
}
6671
}

src/main/java/org/tailormap/api/controller/admin/ServerSentEventsAdminController.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
2222
import org.tailormap.api.admin.model.ServerSentEvent;
2323
import tools.jackson.core.JacksonException;
24+
import tools.jackson.databind.SerializationFeature;
2425
import tools.jackson.databind.json.JsonMapper;
2526

2627
@RestController
@@ -34,7 +35,16 @@ public class ServerSentEventsAdminController {
3435

3536
public ServerSentEventsAdminController(SseEventBus eventBus, JsonMapper jsonMapper) {
3637
this.eventBus = eventBus;
37-
this.jsonMapper = jsonMapper;
38+
// force unindented/single line output for SSE messages, because we may have set
39+
// spring.jackson.serialization.indent_output=true for debugging/development/test
40+
if (jsonMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
41+
this.jsonMapper = jsonMapper
42+
.rebuild()
43+
.configure(SerializationFeature.INDENT_OUTPUT, false)
44+
.build();
45+
} else {
46+
this.jsonMapper = jsonMapper;
47+
}
3848
}
3949

4050
/**

src/main/java/org/tailormap/api/service/CreateLayerExtractService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.jspecify.annotations.Nullable;
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
51+
import org.springframework.beans.factory.annotation.Qualifier;
5152
import org.springframework.beans.factory.annotation.Value;
5253
import org.springframework.scheduling.annotation.Async;
5354
import org.springframework.scheduling.annotation.Scheduled;
@@ -89,7 +90,9 @@ public class CreateLayerExtractService {
8990
private boolean exactWfsCounts;
9091

9192
public CreateLayerExtractService(
92-
SseEventBus eventBus, JsonMapper jsonMapper, FeatureSourceFactoryHelper featureSourceFactoryHelper) {
93+
@Qualifier("viewerSseEventBus") SseEventBus eventBus,
94+
JsonMapper jsonMapper,
95+
FeatureSourceFactoryHelper featureSourceFactoryHelper) {
9396
this.eventBus = eventBus;
9497
this.featureSourceFactoryHelper = featureSourceFactoryHelper;
9598
// force unindented/single line output for SSE messages, because we may have set

src/test/java/org/tailormap/api/controller/ServerSentEventsControllerIntegrationTest.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
import static java.util.concurrent.TimeUnit.SECONDS;
1010
import static org.hamcrest.MatcherAssert.assertThat;
1111
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
1213
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
1314
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
15+
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
1416
import static org.tailormap.api.TestRequestProcessor.setServletPath;
1517

1618
import java.lang.invoke.MethodHandles;
@@ -26,15 +28,18 @@
2628
import org.springframework.beans.factory.annotation.Value;
2729
import org.springframework.boot.webmvc.test.autoconfigure.AutoConfigureMockMvc;
2830
import org.springframework.http.MediaType;
31+
import org.springframework.security.test.context.support.WithMockUser;
2932
import org.springframework.test.web.servlet.MockMvc;
3033
import org.springframework.test.web.servlet.MvcResult;
34+
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
35+
import org.springframework.web.context.WebApplicationContext;
3136
import org.tailormap.api.annotation.PostgresIntegrationTest;
32-
import org.tailormap.api.viewer.model.ServerSentEventResponse;
37+
import org.tailormap.api.persistence.Group;
3338

3439
@PostgresIntegrationTest
3540
@AutoConfigureMockMvc
3641
@Execution(ExecutionMode.CONCURRENT)
37-
class ServerSentEventsControllerIntegrationTest {
42+
class ServerSentEventsControllerIntegrationTest extends SseParsingUtils {
3843
private static final Logger logger =
3944
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
4045
// Unique id avoids interference with parallel/other tests.
@@ -46,6 +51,12 @@ class ServerSentEventsControllerIntegrationTest {
4651
@Value("${tailormap-api.base-path}")
4752
private String apiBasePath;
4853

54+
@Value("${tailormap-api.admin.base-path}")
55+
private String adminBasePath;
56+
57+
@Autowired
58+
private WebApplicationContext context;
59+
4960
private MvcResult sseResult;
5061

5162
@BeforeEach
@@ -55,6 +66,7 @@ void start_sse_stream() throws Exception {
5566
.accept(MediaType.TEXT_EVENT_STREAM)
5667
.with(setServletPath(sseUrl))
5768
.acceptCharset(StandardCharsets.UTF_8))
69+
.andExpect(status().isOk())
5870
.andExpect(request().asyncStarted())
5971
.andReturn();
6072
}
@@ -71,18 +83,67 @@ void should_send_keep_alive_messages_for_two_minutes() {
7183
.logging(logPrinter -> logger.debug("Checking for keep-alive messages in SSE stream... {}", logPrinter))
7284
.untilAsserted(() -> {
7385
final String stream = sseResult.getResponse().getContentAsString();
74-
assertThat(count_keep_alive_messages(stream), greaterThanOrEqualTo(2));
86+
assertThat(count_all_keep_alive_messages(stream), greaterThanOrEqualTo(2));
7587
});
7688
}
7789

78-
private int count_keep_alive_messages(String stream) {
79-
int count = 0;
80-
int index = 0;
81-
final String marker = "\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
82-
while ((index = stream.indexOf(marker, index)) != -1) {
83-
count++;
84-
index += marker.length();
85-
}
86-
return count;
90+
/** Check that at least 2 keep-alive messages arrive in 130 seconds. */
91+
@Test
92+
@WithMockUser(
93+
username = "admin",
94+
authorities = {Group.ADMIN})
95+
void admin_and_viewer_should_use_separate_sse_streams() throws Exception {
96+
// start admin sse stream
97+
MockMvc adminMockMvc = MockMvcBuilders.webAppContextSetup(context).build();
98+
final String adminSseUrl = adminBasePath + "/events/" + sseClientId;
99+
MvcResult adminSseResult = adminMockMvc
100+
.perform(get(adminSseUrl)
101+
.accept(MediaType.TEXT_EVENT_STREAM)
102+
.with(setServletPath(adminSseUrl))
103+
.acceptCharset(StandardCharsets.UTF_8))
104+
.andExpect(status().isOk())
105+
.andExpect(request().asyncStarted())
106+
.andReturn();
107+
108+
Awaitility.await("Waiting at least 2 minutes for any keep-alive messages")
109+
.pollDelay(45, SECONDS)
110+
.pollInterval(15, SECONDS)
111+
.atLeast(1, MINUTES)
112+
.atMost(130, SECONDS)
113+
.logging(
114+
logPrinter -> logger.debug("Checking for keep-alive messages in SSE streams... {}", logPrinter))
115+
.untilAsserted(() -> {
116+
// check admin stream
117+
final String adminStream = adminSseResult.getResponse().getContentAsString();
118+
logger.debug("admin stream: {}", adminStream);
119+
assertThat(
120+
"There should be at least 2 keep-alive messages for the admin",
121+
count_all_keep_alive_messages(adminStream),
122+
greaterThanOrEqualTo(2));
123+
assertEquals(
124+
0,
125+
count_viewer_keep_alive_messages(adminStream),
126+
"There should be no keep-alive messages for the viewer in the admin");
127+
assertEquals(
128+
count_all_keep_alive_messages(adminStream),
129+
count_admin_keep_alive_messages(adminStream),
130+
"We should only get admin keep-alive messages in the admin SSE stream");
131+
132+
// and viewer stream
133+
final String stream = sseResult.getResponse().getContentAsString();
134+
logger.debug("viewer stream: {}", stream);
135+
assertThat(
136+
"There should be at least 2 keep-alive messages for the viewer",
137+
count_all_keep_alive_messages(stream),
138+
greaterThanOrEqualTo(2));
139+
assertEquals(
140+
count_all_keep_alive_messages(stream),
141+
count_viewer_keep_alive_messages(stream),
142+
"Admin keep-alive messages should not be sent to viewer SSE stream");
143+
assertEquals(
144+
0,
145+
count_admin_keep_alive_messages(stream),
146+
"There should be no keep-alive messages for the admin in the viewer SSE stream");
147+
});
87148
}
88149
}

src/test/java/org/tailormap/api/controller/SseParsingUtils.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,39 @@ int count_completed_messages(String s) {
4646
}
4747
return count;
4848
}
49+
50+
int count_all_keep_alive_messages(String stream) {
51+
int count = 0;
52+
int index = 0;
53+
final String marker = "\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
54+
while ((index = stream.indexOf(marker, index)) != -1) {
55+
count++;
56+
index += marker.length();
57+
}
58+
return count;
59+
}
60+
61+
int count_viewer_keep_alive_messages(String stream) {
62+
int count = 0;
63+
int index = 0;
64+
final String marker =
65+
"\"details\":{},\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\",\"id\"";
66+
while ((index = stream.indexOf(marker, index)) != -1) {
67+
count++;
68+
index += marker.length();
69+
}
70+
return count;
71+
}
72+
73+
int count_admin_keep_alive_messages(String stream) {
74+
int count = 0;
75+
int index = 0;
76+
final String marker =
77+
"\"details\":null,\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
78+
while ((index = stream.indexOf(marker, index)) != -1) {
79+
count++;
80+
index += marker.length();
81+
}
82+
return count;
83+
}
4984
}

0 commit comments

Comments
 (0)