Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,23 @@ mvn -Pdeveloping,postgresql verify -Dspring-boot.run.profiles=dev,populate-testd
* You can use `mvn -U org.codehaus.mojo:versions-maven-plugin:display-dependency-updates` to search for dependency
updates

#### SSE streams

We have 2 SSE streams available, one in the admin API at: `api/admin/events/{clientId}` and one in the viewer API
at: `api/events/{clientId}`. When using these streams you must make sure that you are using/injecting the correct
`SseEventBus` for each API.

For the admin use the default `eventBus` bean, inject using: `SseEventBus eventBus` (defined using the
`@EnableSseEventBus` annotation in the `TailormapConfig` class).
Comment thread
mprins marked this conversation as resolved.
For the viewer use the `viewerSseEventBus` bean, inject using: `@Qualifier("viewerSseEventBus") SseEventBus eventBus`
(defined in the `TailormapConfig` class).
See:
- [ServerSentEventsAdminController](src/main/java/org/tailormap/api/controller/admin/ServerSentEventsAdminController.java) for the admin configuration
- [ServerSentEventsController](src/main/java/org/tailormap/api/controller/ServerSentEventsController.java) for the viewer configuration
- [TailormapConfig](src/main/java/org/tailormap/api/configuration/TailormapConfig.java) for the bean definitions

If you inject the wrong one you may not receive the events you want, and you risk sending administrative events to the viewer.

## Releasing

### Prerequisites
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/org/tailormap/api/configuration/TailormapConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,23 @@
*/
package org.tailormap.api.configuration;

import ch.rasc.sse.eventbus.DataObjectConverter;
import ch.rasc.sse.eventbus.DefaultDataObjectConverter;
import ch.rasc.sse.eventbus.DefaultSubscriptionRegistry;
import ch.rasc.sse.eventbus.DistributedEventBus;
import ch.rasc.sse.eventbus.JacksonDataObjectConverter;
import ch.rasc.sse.eventbus.ReplayStore;
import ch.rasc.sse.eventbus.SseEventBus;
import ch.rasc.sse.eventbus.SubscriptionRegistry;
import ch.rasc.sse.eventbus.config.EnableSseEventBus;
import ch.rasc.sse.eventbus.config.SseEventBusConfigurer;
import ch.rasc.sse.eventbus.observation.SseEventBusObservationConvention;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -15,6 +30,7 @@
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.servlet.LocaleResolver;
import org.springframework.web.servlet.i18n.AcceptHeaderLocaleResolver;
import tools.jackson.databind.ObjectMapper;

@Configuration
@EnableConfigurationProperties
Expand Down Expand Up @@ -42,4 +58,46 @@
resolver.setDefaultLocale(Locale.of(defaultLanguage));
return resolver;
}

/**
* Define a new viewer SseEventBus bean for viewer-specific SSE traffic.
*
* @return the viewerSseEventBus instance
*/
@Bean("viewerSseEventBus")
public SseEventBus viewerSseEventBus(
@Autowired(required = false) @Nullable SseEventBusConfigurer configurer,
@Autowired(required = false) @Nullable ObjectMapper objectMapper,
@Autowired(required = false) @Nullable List<DataObjectConverter> dataObjectConverters,
@Autowired(required = false) @Nullable SubscriptionRegistry subscriptionRegistry,
@Autowired(required = false) @Nullable ReplayStore replayStore,
@Autowired(required = false) @Nullable ObservationRegistry observationRegistry,
@Autowired(required = false) @Nullable SseEventBusObservationConvention observationConvention,
@Autowired(required = false) @Nullable DistributedEventBus distributedEventBus) {

// Apply same defaults as DefaultSseEventBusConfiguration
SseEventBusConfigurer config = configurer != null
? configurer

Check warning on line 80 in src/main/java/org/tailormap/api/configuration/TailormapConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/tailormap/api/configuration/TailormapConfig.java#L80

Added line #L80 was not covered by tests
: new SseEventBusConfigurer() {
/* defaults */
};

SubscriptionRegistry registry =
subscriptionRegistry != null ? subscriptionRegistry : new DefaultSubscriptionRegistry();

ReplayStore store = replayStore != null ? replayStore : config.replayStore();

List<DataObjectConverter> converters =
dataObjectConverters != null ? new ArrayList<>(dataObjectConverters) : new ArrayList<>();
if (converters.isEmpty()) {
if (objectMapper != null) {
converters.add(new JacksonDataObjectConverter(objectMapper));
} else {
converters.add(new DefaultDataObjectConverter());

Check warning on line 96 in src/main/java/org/tailormap/api/configuration/TailormapConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/tailormap/api/configuration/TailormapConfig.java#L96

Added line #L96 was not covered by tests
}
}

return new SseEventBus(
config, registry, converters, store, observationRegistry, observationConvention, distributedEventBus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
import ch.rasc.sse.eventbus.SseEvent;
import ch.rasc.sse.eventbus.SseEventBus;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.tailormap.api.util.UUIDv7;
import org.tailormap.api.viewer.model.ServerSentEventResponse;
import tools.jackson.core.JacksonException;
import tools.jackson.databind.SerializationFeature;
Expand All @@ -33,7 +36,7 @@ public class ServerSentEventsController {

private final JsonMapper jsonMapper;

public ServerSentEventsController(SseEventBus eventBus, JsonMapper jsonMapper) {
public ServerSentEventsController(@Qualifier("viewerSseEventBus") SseEventBus eventBus, JsonMapper jsonMapper) {
this.eventBus = eventBus;
// force unindented/single line output for SSE messages, because we may have set
// spring.jackson.serialization.indent_output=true for debugging/development/test
Expand All @@ -60,7 +63,9 @@ public SseEmitter sse(@PathVariable String clientId) {

@Scheduled(fixedRate = 60_000)
public void keepAlive() throws JacksonException {
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(
new ServerSentEventResponse().eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE))));
this.eventBus.handleEvent(SseEvent.ofData(jsonMapper.writeValueAsString(new ServerSentEventResponse()
.eventType(ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE)
.id(UUIDv7.randomV7())
.details(Collections.emptyMap()))));
Comment thread
mprins marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.tailormap.api.admin.model.ServerSentEvent;
import tools.jackson.core.JacksonException;
import tools.jackson.databind.SerializationFeature;
import tools.jackson.databind.json.JsonMapper;

@RestController
Expand All @@ -34,7 +35,16 @@ public class ServerSentEventsAdminController {

public ServerSentEventsAdminController(SseEventBus eventBus, JsonMapper jsonMapper) {
this.eventBus = eventBus;
this.jsonMapper = jsonMapper;
// force unindented/single line output for SSE messages, because we may have set
// spring.jackson.serialization.indent_output=true for debugging/development/test
if (jsonMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
this.jsonMapper = jsonMapper
.rebuild()
.configure(SerializationFeature.INDENT_OUTPUT, false)
.build();
} else {
this.jsonMapper = jsonMapper;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
Expand Down Expand Up @@ -89,7 +90,9 @@ public class CreateLayerExtractService {
private boolean exactWfsCounts;

public CreateLayerExtractService(
SseEventBus eventBus, JsonMapper jsonMapper, FeatureSourceFactoryHelper featureSourceFactoryHelper) {
@Qualifier("viewerSseEventBus") SseEventBus eventBus,
JsonMapper jsonMapper,
FeatureSourceFactoryHelper featureSourceFactoryHelper) {
this.eventBus = eventBus;
this.featureSourceFactoryHelper = featureSourceFactoryHelper;
// force unindented/single line output for SSE messages, because we may have set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.tailormap.api.TestRequestProcessor.setServletPath;

import java.lang.invoke.MethodHandles;
Expand All @@ -26,15 +28,18 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.webmvc.test.autoconfigure.AutoConfigureMockMvc;
import org.springframework.http.MediaType;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
import org.tailormap.api.annotation.PostgresIntegrationTest;
import org.tailormap.api.viewer.model.ServerSentEventResponse;
import org.tailormap.api.persistence.Group;

@PostgresIntegrationTest
@AutoConfigureMockMvc
@Execution(ExecutionMode.CONCURRENT)
class ServerSentEventsControllerIntegrationTest {
class ServerSentEventsControllerIntegrationTest extends SseParsingUtils {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Unique id avoids interference with parallel/other tests.
Expand All @@ -46,6 +51,12 @@ class ServerSentEventsControllerIntegrationTest {
@Value("${tailormap-api.base-path}")
private String apiBasePath;

@Value("${tailormap-api.admin.base-path}")
private String adminBasePath;

@Autowired
private WebApplicationContext context;

private MvcResult sseResult;

@BeforeEach
Expand All @@ -55,6 +66,7 @@ void start_sse_stream() throws Exception {
.accept(MediaType.TEXT_EVENT_STREAM)
.with(setServletPath(sseUrl))
.acceptCharset(StandardCharsets.UTF_8))
.andExpect(status().isOk())
.andExpect(request().asyncStarted())
.andReturn();
}
Expand All @@ -71,18 +83,67 @@ void should_send_keep_alive_messages_for_two_minutes() {
.logging(logPrinter -> logger.debug("Checking for keep-alive messages in SSE stream... {}", logPrinter))
.untilAsserted(() -> {
final String stream = sseResult.getResponse().getContentAsString();
assertThat(count_keep_alive_messages(stream), greaterThanOrEqualTo(2));
assertThat(count_all_keep_alive_messages(stream), greaterThanOrEqualTo(2));
});
}

private int count_keep_alive_messages(String stream) {
int count = 0;
int index = 0;
final String marker = "\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
while ((index = stream.indexOf(marker, index)) != -1) {
count++;
index += marker.length();
}
return count;
/** Check that at least 2 keep-alive messages arrive in 130 seconds. */
@Test
@WithMockUser(
username = "admin",
authorities = {Group.ADMIN})
void admin_and_viewer_should_use_separate_sse_streams() throws Exception {
// start admin sse stream
MockMvc adminMockMvc = MockMvcBuilders.webAppContextSetup(context).build();
final String adminSseUrl = adminBasePath + "/events/" + sseClientId;
MvcResult adminSseResult = adminMockMvc
.perform(get(adminSseUrl)
.accept(MediaType.TEXT_EVENT_STREAM)
.with(setServletPath(adminSseUrl))
.acceptCharset(StandardCharsets.UTF_8))
.andExpect(status().isOk())
Comment thread
mprins marked this conversation as resolved.
.andExpect(request().asyncStarted())
.andReturn();

Awaitility.await("Waiting at least 2 minutes for any keep-alive messages")
.pollDelay(45, SECONDS)
.pollInterval(15, SECONDS)
.atLeast(1, MINUTES)
.atMost(130, SECONDS)
.logging(
logPrinter -> logger.debug("Checking for keep-alive messages in SSE streams... {}", logPrinter))
.untilAsserted(() -> {
// check admin stream
final String adminStream = adminSseResult.getResponse().getContentAsString();
logger.debug("admin stream: {}", adminStream);
assertThat(
"There should be at least 2 keep-alive messages for the admin",
count_all_keep_alive_messages(adminStream),
greaterThanOrEqualTo(2));
assertEquals(
0,
count_viewer_keep_alive_messages(adminStream),
"There should be no keep-alive messages for the viewer in the admin");
assertEquals(
count_all_keep_alive_messages(adminStream),
count_admin_keep_alive_messages(adminStream),
"We should only get admin keep-alive messages in the admin SSE stream");

// and viewer stream
final String stream = sseResult.getResponse().getContentAsString();
logger.debug("viewer stream: {}", stream);
assertThat(
"There should be at least 2 keep-alive messages for the viewer",
count_all_keep_alive_messages(stream),
greaterThanOrEqualTo(2));
assertEquals(
count_all_keep_alive_messages(stream),
count_viewer_keep_alive_messages(stream),
"Admin keep-alive messages should not be sent to viewer SSE stream");
assertEquals(
0,
count_admin_keep_alive_messages(stream),
"There should be no keep-alive messages for the admin in the viewer SSE stream");
});
}
}
35 changes: 35 additions & 0 deletions src/test/java/org/tailormap/api/controller/SseParsingUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,39 @@ int count_completed_messages(String s) {
}
return count;
}

int count_all_keep_alive_messages(String stream) {
int count = 0;
int index = 0;
final String marker = "\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
while ((index = stream.indexOf(marker, index)) != -1) {
count++;
index += marker.length();
}
return count;
}

int count_viewer_keep_alive_messages(String stream) {
int count = 0;
int index = 0;
final String marker =
"\"details\":{},\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\",\"id\"";
while ((index = stream.indexOf(marker, index)) != -1) {
count++;
Comment thread
mprins marked this conversation as resolved.
index += marker.length();
}
return count;
}

int count_admin_keep_alive_messages(String stream) {
int count = 0;
int index = 0;
final String marker =
"\"details\":null,\"eventType\":\"" + ServerSentEventResponse.EventTypeEnum.KEEP_ALIVE + "\"";
while ((index = stream.indexOf(marker, index)) != -1) {
count++;
index += marker.length();
}
return count;
}
}
Loading