You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+30-30Lines changed: 30 additions & 30 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -49,10 +49,10 @@ We want to implement this flow:
49
49
50
50

51
51
52
-
* User will POST string payloads to `/text` endpoint
53
-
* A **KafkaProducer** will send these payloads to topic `pub.texts` as `{ "text" : string }`
54
-
* A **KafkaStreams** transformation will consume from topic `pub.texts` and produce events to topic `pub.lengths` as `{ "length" : number }`
55
-
* A **KafkaConsumer** will consume events from topic `pub.lengths` and log them to the console
52
+
* User will POST string payloads to **/text** endpoint
53
+
* A **KafkaProducer** will send these payloads to topic **pub.texts** as `{ "text" : string }`
54
+
* A **KafkaStreams** transformation will consume from topic **pub.texts** and produce events to topic **pub.lengths** as `{ "length" : number }`
55
+
* A **KafkaConsumer** will consume events from topic **pub.lengths** and log them to the console
56
56
57
57
So we will use two Spring Cloud Stream binders:
58
58
***Kafka**
@@ -61,8 +61,8 @@ So we will use two Spring Cloud Stream binders:
61
61
## Create the project
62
62
63
63
We use this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.rogervinas&artifactId=springcloudstreammultibinder&name=springcloudstreammultibinder&description=Spring%20Cloud%20Streams%20%26%20Kafka%20Streams%20Binder&packageName=com.rogervinas.springcloudstreammultibinder&dependencies=cloud-stream) and we add:
* A **Kafka Streams binder** connected to `localhost:9094`
138
-
* A **Kafka binder** connected to `localhost:9094`
137
+
* A **Kafka Streams binder** connected to **localhost:9094**
138
+
* A **Kafka binder** connected to **localhost:9094**
139
139
* Following the Spring Cloud Stream *functional programming model conventions* we should create:
140
-
* A bean named `textProducer` that should implement:
141
-
* In **Java**: `Supplier<Flux<TextEvent>>` interface
142
-
* In **Kotlin**: `() -> Flux<TextEvent>` lambda
143
-
* A bean named `textLengthProcessor` that should implement:
144
-
* In **Java**: `Function<KStream<String, TextEvent>, KStream<String, LengthEvent>>` interface
140
+
* A bean named **textProducer** that should implement:
141
+
* In **Java**: **Supplier<Flux<TextEvent>>** interface
142
+
* In **Kotlin**: **() -> Flux<TextEvent>** lambda
143
+
* A bean named **textLengthProcessor** that should implement:
144
+
* In **Java**: **Function<KStream<String, TextEvent>, KStream<String, LengthEvent>>** interface
145
145
* In **Kotlin**: the same, there is no support for lambdas yet 😅
146
-
* A bean named `lengthConsumer` that should implement:
147
-
* In **Java**: `Consumer<LengthEvent>` interface
148
-
* In **Kotlin**: `(LengthEvent>) -> Unit` lambda
146
+
* A bean named **lengthConsumer** that should implement:
147
+
* In **Java**: **Consumer<LengthEvent>** interface
148
+
* In **Kotlin**: **(LengthEvent>) -> Unit** lambda
149
149
150
-
💡 We use different values for the Kafka Streams `applicationId` and the Kafka Consumers `group` to avoid undesired behaviors.
150
+
💡 We use different values for the Kafka Streams **applicationId** and the Kafka Consumers **group** to avoid undesired behaviors.
151
151
152
-
💡 We are using Spring Cloud Stream's default serialization/deserialization of Kotlin data classes to Json. In order for this to work we need to add `com.fasterxml.jackson.module:jackson-module-kotlin` dependency.
152
+
💡 We are using Spring Cloud Stream's default serialization/deserialization of Kotlin data classes to Json. In order for this to work we need to add **com.fasterxml.jackson.module:jackson-module-kotlin** dependency.
153
153
154
154
💡 You can find all the available configuration properties documented in:
@@ -261,7 +261,7 @@ class TextLengthProcessor : Function<KStream<String, TextEvent>, KStream<String,
261
261
}
262
262
```
263
263
264
-
And we can test it using `kafka-streams-test-utils`:
264
+
And we can test it using **kafka-streams-test-utils**:
265
265
266
266
```kotlin
267
267
privateconstvalTOPIC_IN="topic.in"
@@ -321,7 +321,7 @@ internal class TextLengthProcessorTest {
321
321
322
322
## LengthConsumer
323
323
324
-
We implement `LengthStreamConsumer` as expected by Spring Cloud Stream conventions like this:
324
+
We implement **LengthStreamConsumer** as expected by Spring Cloud Stream conventions like this:
325
325
326
326
```kotlin
327
327
data classLengthEvent(vallength:Int)
@@ -334,7 +334,7 @@ class LengthStreamConsumer(private val processor: LengthProcessor) : (LengthEven
334
334
}
335
335
```
336
336
337
-
We decouple the final implementation using the interface `LengthProcessor`:
337
+
We decouple the final implementation using the interface **LengthProcessor**:
338
338
339
339
```kotlin
340
340
interfaceLengthProcessor {
@@ -420,13 +420,13 @@ class MyApplicationConfiguration {
420
420
```
421
421
422
422
Please note that:
423
-
* The three Spring Cloud functions defined in [application.yml](src/main/resources/application.yml) will be bound **by name** to the beans `textProducer`, `textLengthProcessor` and `lengthConsumer`.
424
-
* For the Kafka binder ones, `textProducer` and `lengthConsumer`, we have to **define them explicitly as Kotlin lambdas** (required by `KotlinLambdaToFunctionAutoConfiguration`).
425
-
* If we were using **Java** we should use `java.util.function` types: `Supplier` and `Consumer`.
426
-
* For the Kafka Stream binder one, `textLengthProcessor`, we have to **define it explicitly as a `java.util.function.Function`**, there is no support for **Kotlin** lambdas yet (check `KafkaStreamsFunctionBeanPostProcessor`).
427
-
* Beans `textFluxProducer` and `textProducer` return the same instance ...
428
-
* We need `textFluxProducer` to inject it whenever a `TextProducer` interface is needed (the `TextController` for example).
429
-
* We need `textProducer` to bind it to the `textProducer` Spring Cloud function required by the Kafka binder.
423
+
* The three Spring Cloud functions defined in [application.yml](src/main/resources/application.yml) will be bound **by name** to the beans **textProducer**, **textLengthProcessor** and **lengthConsumer**.
424
+
* For the Kafka binder ones, **textProducer** and **lengthConsumer**, we have to **define them explicitly as Kotlin lambdas** (required by **KotlinLambdaToFunctionAutoConfiguration**).
425
+
* If we were using **Java** we should use **java.util.function** types: **Supplier** and **Consumer**.
426
+
* For the Kafka Stream binder one, **textLengthProcessor**, we have to **define it explicitly as a java.util.function.Function**, there is no support for **Kotlin** lambdas yet (check **KafkaStreamsFunctionBeanPostProcessor**).
427
+
* Beans **textFluxProducer** and **textProducer** return the same instance ...
428
+
* We need **textFluxProducer** to inject it whenever a **TextProducer** interface is needed (the **TextController** for example).
429
+
* We need **textProducer** to bind it to the **textProducer** Spring Cloud function required by the Kafka binder.
430
430
431
431
And that is it, now [MyApplicationIntegrationTest](src/test/kotlin/com/rogervinas/multibinder/MyApplicationIntegrationTest.kt) should work! 🤞
0 commit comments