|
| 1 | +[[_TOC_]] |
| 2 | + |
| 3 | +# Streaming real-time data with Spring Boot |
| 4 | + |
| 5 | +### Goals |
| 6 | + |
| 7 | +This guide will show you how to use the Caplin platform and [Spring Boot](https://spring.io/projects/spring-boot) to rapidly build an application that can deliver on-demand, real time data to a browser or mobile application. |
| 8 | + |
| 9 | +### Pre-requisites |
| 10 | + |
| 11 | +This guide assumes that you are familiar with Spring Boot, else it would be beneficial to follow the [Building an Application with Spring Boot](https://spring.io/guides/gs/spring-boot/) guide before returning. |
| 12 | + |
| 13 | +#### Software requirements |
| 14 | + |
| 15 | +* Java JDK 17 or later |
| 16 | +* Docker, or a similar container runtime that supports compose files. |
| 17 | +* A Java or Kotlin IDE |
| 18 | + |
| 19 | +### Project setup |
| 20 | + |
| 21 | +Now let's create a simple application. |
| 22 | + |
| 23 | +* Navigate to [Spring Initializr](https://start.spring.io/) |
| 24 | + |
| 25 | +* It's recommended to choose _Gradle - Kotlin_ for the Project, and _Kotlin_ for the Language, though you may of course use _Java_. |
| 26 | + |
| 27 | +* Choose the latest Spring Boot release version, at the time of writing this is 3.5.3. |
| 28 | + |
| 29 | +* Generate the project, unzip it, and then import it into your IDE. |
| 30 | + |
| 31 | +* Now we need to add our DataSource Starter dependency, so open up `build.gradle.kts` and add `implementation("com.caplin.integration.datasourcex:spring-boot-starter-datasource:1.0.0")` to the `dependencies` block. |
| 32 | + |
| 33 | +* You will also need to add the Caplin repository to be able to access the Caplin DataSource libraries. To do so, add the following to the `repositories` block. Note the credentials here should be retrieved from your Caplin Account Manager. These are best passed in from the command line, via environment variable or via your global `gradle.properties` file to ensure they are not inadvertently exposed: |
| 34 | + ```kotlin |
| 35 | + maven { |
| 36 | + url = uri("https://repository.caplin.com/repository/caplin-release") |
| 37 | + credentials { |
| 38 | + username = <username> |
| 39 | + password = <password> |
| 40 | + } |
| 41 | + } |
| 42 | + ``` |
| 43 | + |
| 44 | +* Lastly, we'll need to configure the Liberator host that DataSource will connect to, so open `src/main/resources/application.properties` and add the line `caplin.datasource.managed.peer.outgoing=ws://localhost:19000` |
| 45 | + |
| 46 | +### Running the Caplin platform |
| 47 | + |
| 48 | +To launch the Caplin platform you can use the [example Docker Compose file](https://github.com/caplin/DataSource-Extensions/tree/main/examples) from the repository examples. Please refer to the brief readme for instructions. This will launch a container running a preconfigured Liberator and expose two ports; `18080` for inbound front end application connections and `19000` for the inbound WebSocket connection from our new server application. |
| 49 | + |
| 50 | +### Creating a simple browser client |
| 51 | + |
| 52 | +We'll want to be able to request and display some data from our server, so let us create a basic browser client application to do so. Add the following to your project as `./index.html`. This code sets up a connection to the platform with the StreamLink library (In this case, hosted by our Liberator container at `http://localhost:18080/sljs/streamlink.js`) and enables the library's support for handling streaming JSON patches behind the scenes. |
| 53 | + |
| 54 | +> For the sake of clarity, we are omitting most error handling code. |
| 55 | +
|
| 56 | +```html |
| 57 | +<html lang="en"> |
| 58 | +<head> |
| 59 | + <title>Streaming Demo</title> |
| 60 | + <script src="https://cdn.jsdelivr.net/npm/@tailwindcss/browser@4"></script> |
| 61 | + <script src="http://localhost:18080/sljs/streamlink.js"></script> |
| 62 | + <script type="module"> |
| 63 | + import * as jsonpatch from 'https://cdnjs.cloudflare.com/ajax/libs/fast-json-patch/3.1.1/fast-json-patch.min.js'; |
| 64 | +
|
| 65 | + export let streamLink = caplin.streamlink.StreamLinkFactory.create({ |
| 66 | + liberator_urls: "rttp://localhost:18080", |
| 67 | + username: "admin", |
| 68 | + password: "admin", |
| 69 | +
|
| 70 | + json_handler: { |
| 71 | + parse: function (jsonString) { |
| 72 | + return JSON.parse(jsonString); |
| 73 | + }, |
| 74 | + patch: function (existingObject, jsonPatchString) { |
| 75 | + const patch = JSON.parse(jsonPatchString); |
| 76 | + return patch.reduce(jsonpatch.applyReducer, existingObject); |
| 77 | + }, |
| 78 | + format: function (obj) { |
| 79 | + return JSON.stringify(obj, null, "\t"); |
| 80 | + } |
| 81 | + } |
| 82 | + }); |
| 83 | +
|
| 84 | + streamLink.addConnectionListener({ |
| 85 | + onConnectionStatusChange: function(connectionStatusEvent) { |
| 86 | + document.getElementById("connection-status").innerHTML = `<pre>${connectionStatusEvent}</pre>` |
| 87 | + }, |
| 88 | + }); |
| 89 | +
|
| 90 | + window.onbeforeunload = function(event) { |
| 91 | + streamLink.disconnect() |
| 92 | + } |
| 93 | +
|
| 94 | + streamLink.connect(); |
| 95 | +
|
| 96 | + // TODO subscriptions |
| 97 | + </script> |
| 98 | +</head> |
| 99 | +<body class="p-4"> |
| 100 | +<div class="text-xl p-4 m-4 bg-gray-100 rounded-lg" id="connection-status"></div> |
| 101 | +</body> |
| 102 | +</html> |
| 103 | +``` |
| 104 | + |
| 105 | +If you open this in your browser, you should see that we have successfully connected to Liberator. |
| 106 | + |
| 107 | +``` |
| 108 | +ConnectionStatusEventImpl [LiberatorURL=ws://localhost:18080, connectionState=LOGGEDIN] |
| 109 | +``` |
| 110 | + |
| 111 | +### Providing static data |
| 112 | + |
| 113 | +Now let's add some data! In this case our client wants to retrieve the local time and time zone of the server. To handle this we'll create a new `@Controller` class providing an aptly named `/serverTime` endpoint. |
| 114 | + |
| 115 | +```kotlin |
| 116 | +@Controller |
| 117 | +class StreamingController { |
| 118 | + |
| 119 | + data class TimeEvent(val time: LocalTime, val zoneId: ZoneId) |
| 120 | + |
| 121 | + @MessageMapping("/serverTime") |
| 122 | + fun time(): TimeEvent = TimeEvent(LocalTime.now(), ZoneId.systemDefault()) |
| 123 | +} |
| 124 | +``` |
| 125 | + |
| 126 | +Let's launch our application and see what happens - to do so you can either |
| 127 | + |
| 128 | +* Run the main method in `com.example.demo.DemoApplication` through your IDE |
| 129 | +* Run `./gradlew bootRun` from the terminal in your project's directory. |
| 130 | + |
| 131 | +In the resulting logs we should see our application successfully connect to the platform |
| 132 | + |
| 133 | +``` |
| 134 | +Peer 0 (localhost/127.0.0.1:19000): is connected |
| 135 | +``` |
| 136 | + |
| 137 | +and we should see log line indicating our subject has been bound correctly |
| 138 | + |
| 139 | +``` |
| 140 | +Registering [/serverTime] as Static |
| 141 | +``` |
| 142 | + |
| 143 | +If we now edit our `index.html` to subscribe to this, adding the code provided below in place of the existing `//TODO subscriptions` placeholder, and refresh our browser, we should see the server's time and time zone data being displayed. |
| 144 | + |
| 145 | +```javascript |
| 146 | +document.body.innerHTML += `<div class="text-xl p-4 m-4 bg-gray-100 rounded-lg" id="serverTime"></div>` |
| 147 | + |
| 148 | +let timeSubject = "/serverTime" |
| 149 | +streamLink.subscribe(timeSubject, { |
| 150 | + onJsonUpdate: function (subscription, event) { |
| 151 | + document.getElementById("serverTime").innerHTML = `<pre>${timeSubject} - ${JSON.stringify(event.getJson(), null, 2)}</pre>` |
| 152 | + } |
| 153 | +}) |
| 154 | +``` |
| 155 | + |
| 156 | +Note that serialization to JSON is handled for us automatically by way of Spring's [Jackson](https://github.com/FasterXML/jackson) integration. |
| 157 | + |
| 158 | +### Providing streaming data |
| 159 | + |
| 160 | +Now as nice as that is, we'd like more than a single response, so let's modify our endpoint to provide a stream of events, rather than just the initial response. For Kotlin we'll be returning a [Flow](https://kotlinlang.org/docs/flow.html). For Java you can instead make use of Reactor's [Flux](https://projectreactor.io/docs/core/release/reference/#flux). Both are powerful abstractions over a stream of data. |
| 161 | + |
| 162 | +Modify the `StreamingController` class to replace our previous function with the following: |
| 163 | + |
| 164 | +```kotlin |
| 165 | +@MessageMapping("/serverTime") |
| 166 | +fun serverTime(): Flow<TimeEvent> = flow { |
| 167 | + while (true) { |
| 168 | + emit(TimeEvent(LocalTime.now(), ZoneId.systemDefault())) |
| 169 | + delay(100) |
| 170 | + } |
| 171 | +} |
| 172 | +``` |
| 173 | + |
| 174 | +One brief restart of the server application later, and you should see the client updating in real time! |
| 175 | + |
| 176 | +### Request parameters |
| 177 | + |
| 178 | +Now, imagine that the browser client needs to fetch the local time in a specific time zone. To achieve this we can fairly simply add a new endpoint to our controller, this time named `zonedTime` and taking a `@DestinationVariable` that is extracted from the requested subject. |
| 179 | + |
| 180 | +```kotlin |
| 181 | +@MessageMapping("/zonedTime/{zoneId}") |
| 182 | +fun zonedTime(@DestinationVariable zoneId: ZoneId): Flow<TimeEvent> = flow { |
| 183 | + while (true) { |
| 184 | + val now = ZonedDateTime.now(zoneId) |
| 185 | + emit(TimeEvent(now.toLocalTime(), zoneId)) |
| 186 | + delay(100) |
| 187 | + } |
| 188 | +} |
| 189 | +``` |
| 190 | + |
| 191 | +To test this we can add to our client code to specify a time zone on a second request. |
| 192 | + |
| 193 | +> As our parameter contains a `/` character, the zone ID must be URL encoded in order to match our subject defined in the `@MessageMapping`: |
| 194 | +
|
| 195 | +```javascript |
| 196 | +document.body.innerHTML += `<div class="text-xl p-4 m-4 bg-gray-100 rounded-lg" id="zonedTime"></div>` |
| 197 | +let zonedTimeSubject = "/zonedTime/Africa%2FLusaka" |
| 198 | +streamLink.subscribe(zonedTimeSubject, { |
| 199 | + onJsonUpdate: function (subscription, event) { |
| 200 | + document.getElementById("zonedTime").innerHTML = `<pre>${zonedTimeSubject} - ${JSON.stringify(event.getJson(), null, 2)}</pre>` |
| 201 | + } |
| 202 | +}) |
| 203 | +``` |
| 204 | + |
| 205 | +After another quick restart of our server, and a refresh of our browser, we now have two time streams being displayed. |
| 206 | + |
| 207 | +### Request payloads |
| 208 | + |
| 209 | +But what if our stream request becomes a bit more complicated, perhaps containing optional or arrays of parameters? At this point it's more natural to represent our request as a payload object. Let's assume our client now wishes to make a single subscription to the time in various user specified zones, again we can support this with a few minor additions to our server. Create a new endpoint named `/times` in your controller, this time receiving single non-annotated method parameter which will be our payload from the client. |
| 210 | + |
| 211 | +```kotlin |
| 212 | +data class TimesRequest( |
| 213 | + val zones: List<ZoneId>, |
| 214 | +) |
| 215 | + |
| 216 | +@MessageMapping("/times") |
| 217 | +fun times(timesRequest: TimesRequest): Flow<List<TimeEvent>> = flow { |
| 218 | + while (true) { |
| 219 | + val now = Instant.now() |
| 220 | + fun timeAtZone(zoneId: ZoneId) = TimeEvent(now.atZone(zoneId).toLocalTime(), zoneId) |
| 221 | + emit(timesRequest.zones.map(::timeAtZone)) |
| 222 | + delay(100) |
| 223 | + } |
| 224 | +} |
| 225 | +``` |
| 226 | + |
| 227 | +Now for our client we need to do something a bit different for this case - we'll need to establish a channel rather than a plain subscription, and then send our request. This is quite simple: |
| 228 | + |
| 229 | +```javascript |
| 230 | +document.body.innerHTML += `<div class="text-xl p-4 m-4 bg-gray-100 rounded-lg" id="times"></div>` |
| 231 | +let timesSubject = "/times" |
| 232 | +let timesChannel = streamLink.createJsonChannel(timesSubject, { |
| 233 | + onChannelData: function (channel, event) { |
| 234 | + document.getElementById("times").innerHTML = `<pre>${timesSubject} - ${JSON.stringify(event.getJson(), null, "\t")}</pre>`; |
| 235 | + }, |
| 236 | +}, null); |
| 237 | + |
| 238 | +timesChannel.send({ |
| 239 | + zones: ["America/Costa_Rica", "Australia/Sydney", "Africa/Lusaka"] |
| 240 | +}); |
| 241 | +``` |
| 242 | + |
| 243 | +Running this we'll now see all the requested times being displayed and updating in sync. |
| 244 | + |
| 245 | +### Two-way communication |
| 246 | + |
| 247 | +Lastly, say we now want our client to have the ability to add new zones to the stream in an ad-hoc manner. Fortunately, we can do this with a just few tweaks. |
| 248 | + |
| 249 | +On the client we'll add a simple text entry box and button, the clicking of which will send a message through the channel to let the server know to add a new zone. |
| 250 | + |
| 251 | +```javascript |
| 252 | +window.addZone = function () { |
| 253 | + timesChannel.send({ |
| 254 | + zones: [document.getElementById("zone").value] |
| 255 | + }); |
| 256 | +} |
| 257 | +document.body.innerHTML += `<div><input type="text" id="zone" value="Chile/EasterIsland"><button type="button" onclick="addZone()">Add zone</button> </div>` |
| 258 | +``` |
| 259 | + |
| 260 | +And on the server we can update our `/times` endpoint to accept a stream of data from the client by changing our parameter to be either a Flow or Flux accordingly, and then update our responses to include the newly requested zones: |
| 261 | + |
| 262 | +```kotlin |
| 263 | +data class TimesRequest( |
| 264 | + val zones: List<ZoneId>, |
| 265 | +) |
| 266 | + |
| 267 | +@MessageMapping("/times") |
| 268 | +fun times(zoneRequests: Flow<TimesRequest>): Flow<List<TimeEvent>> = zoneRequests |
| 269 | + .runningFold(emptyList<ZoneId>()) { accumulator, zoneRequest -> accumulator + zoneRequest.zones } |
| 270 | + .transformLatest { zones -> |
| 271 | + while (true) { |
| 272 | + val now = Instant.now() |
| 273 | + fun timeAtZone(zoneId: ZoneId) = TimeEvent(now.atZone(zoneId).toLocalTime(), zoneId) |
| 274 | + emit(zones.map(::timeAtZone)) |
| 275 | + delay(100) |
| 276 | + } |
| 277 | + } |
| 278 | +``` |
| 279 | + |
| 280 | +One final restart of our application, and by clicking the button we can now see additional times being added to our stream each time we add a new zone. |
0 commit comments