Skip to content

Commit c02617a

Browse files
authored
Merge pull request #2089 from kindywu/2.x
add kafka module to jooby
2 parents b18e213 + 39a4679 commit c02617a

3 files changed

Lines changed: 206 additions & 0 deletions

File tree

modules/jooby-kafka/pom.xml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
5+
6+
<parent>
7+
<groupId>io.jooby</groupId>
8+
<artifactId>modules</artifactId>
9+
<version>2.9.3-SNAPSHOT</version>
10+
</parent>
11+
12+
<modelVersion>4.0.0</modelVersion>
13+
<artifactId>jooby-kafka</artifactId>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>com.google.code.findbugs</groupId>
18+
<artifactId>jsr305</artifactId>
19+
<scope>provided</scope>
20+
</dependency>
21+
22+
<dependency>
23+
<groupId>io.jooby</groupId>
24+
<artifactId>jooby</artifactId>
25+
<version>${jooby.version}</version>
26+
</dependency>
27+
28+
<!--add kafka dependencies-->
29+
<dependency>
30+
<groupId>org.apache.kafka</groupId>
31+
<artifactId>kafka-clients</artifactId>
32+
<version>2.6.0</version>
33+
</dependency>
34+
35+
<!-- Test dependencies -->
36+
<dependency>
37+
<groupId>org.junit.jupiter</groupId>
38+
<artifactId>junit-jupiter-engine</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.jacoco</groupId>
44+
<artifactId>org.jacoco.agent</artifactId>
45+
<classifier>runtime</classifier>
46+
<scope>test</scope>
47+
</dependency>
48+
</dependencies>
49+
</project>
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.kafka;
7+
8+
import com.typesafe.config.Config;
9+
import io.jooby.Extension;
10+
import io.jooby.Jooby;
11+
import io.jooby.ServiceRegistry;
12+
import org.apache.kafka.clients.consumer.KafkaConsumer;
13+
import org.apache.kafka.clients.producer.KafkaProducer;
14+
15+
import javax.annotation.Nonnull;
16+
import java.util.Properties;
17+
18+
/**
19+
* Redis module: https://jooby.io/modules/redis.
20+
* <p>
21+
* Usage:
22+
*
23+
* <pre>{@code
24+
* {
25+
* install(new RedisModule());
26+
*
27+
* get("/", ctx -> {
28+
* StatefulRedisConnection redis = require(StatefulRedisConnection.class);
29+
* // work with redis
30+
* });
31+
* }
32+
* }</pre>
33+
* <p>
34+
* application.conf:
35+
*
36+
* <pre>
37+
* redis = "redis://localhost:6379"
38+
* </pre>
39+
* <p>
40+
* Module is built on top of <a href="https://lettuce.io/">lettuce</a>. Once installed you are able
41+
* to work with:
42+
*
43+
* <ul>
44+
* <li>io.lettuce.core.RedisClient</li>
45+
* <li>io.lettuce.core.api.StatefulRedisConnection</li>
46+
* <li>io.lettuce.core.pubsub.StatefulRedisPubSubConnection</li>
47+
* </ul>
48+
* <p>
49+
* Alternative you can pass a redis URI:
50+
* <pre>
51+
* install(new RedisModule("redis://localhost:6379"));
52+
* </pre>
53+
*
54+
* @author edgar
55+
* @since 2.8.5
56+
*/
57+
public class KafkaModule implements Extension {
58+
String producerKey;
59+
String consumerKey;
60+
Properties producerProps;
61+
Properties consumerProps;
62+
63+
/**
64+
* Creates a new kafka producer module using the <code>kafka.producer</code> property key.
65+
* This key must be present in the application configuration file, like:
66+
*
67+
* <pre>{@code
68+
* kafka.producer.bootstrap.servers = "localhost:9092"
69+
* kafka.producer.acks = "all"
70+
* kafka.producer.retries = 0
71+
* kafka.producer.key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
72+
* kafka.producer.value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
73+
* }</pre>
74+
*
75+
* Creates a new kafka consumer module using the <code>kafka.consumer</code> property key.
76+
* This key must be present in the application configuration file, like:
77+
*
78+
* <pre>{@code
79+
* kafka.consumer.bootstrap.servers = "localhost:9092"
80+
* kafka.consumer.group.id = "group A"
81+
* kafka.consumer.key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
82+
* kafka.consumer.value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
83+
* }</pre>
84+
*/
85+
public KafkaModule() {
86+
this("kafka.producer", "kafka.consumer");
87+
}
88+
89+
/**
90+
* Creates a new kafka producer module. The producer parameter can be one of:
91+
*
92+
* - A property key defined in your application configuration file, like <code>producerKey</code>.
93+
*
94+
* @param producerKey Database key
95+
*
96+
* Creates a new kafka consumer module. The consumer parameter can be one of:
97+
*
98+
* - A property key defined in your application configuration file, like <code>consumerKey</code>.
99+
*
100+
* @param consumerKey Database key
101+
*/
102+
public KafkaModule(@Nonnull String producerKey, @Nonnull String consumerKey) {
103+
this.producerKey = producerKey;
104+
this.consumerKey = consumerKey;
105+
}
106+
107+
/**
108+
* Creates a new kafka module.
109+
*
110+
* @param producerProps kafka producer properties.
111+
* @param consumerProps kafka consumer properties.
112+
*/
113+
public KafkaModule(@Nonnull Properties producerProps, @Nonnull Properties consumerProps) {
114+
this("kafka.producer", "kafka.consumer");
115+
this.producerProps = producerProps;
116+
this.consumerProps = consumerProps;
117+
}
118+
119+
120+
@Override
121+
public void install(@Nonnull Jooby application) {
122+
Config config = application.getConfig();
123+
124+
if (this.producerProps == null) {
125+
this.producerProps = properties(config, this.producerKey);
126+
}
127+
128+
if (this.consumerProps == null) {
129+
this.consumerProps = properties(config, this.consumerKey);
130+
}
131+
132+
ServiceRegistry registry = application.getServices();
133+
134+
if (this.producerProps != null) {
135+
KafkaProducer producer = new KafkaProducer(producerProps);
136+
application.onStop(producer::close);
137+
registry.putIfAbsent(KafkaProducer.class, producer);
138+
}
139+
140+
if (this.consumerProps != null) {
141+
KafkaConsumer consumer = new KafkaConsumer(consumerProps);
142+
application.onStop(consumer::close);
143+
registry.putIfAbsent(KafkaConsumer.class, consumer);
144+
}
145+
}
146+
147+
private static Properties properties(final Config config, final String key) {
148+
Properties props = new Properties();
149+
if (config.hasPath(key)) {
150+
// dump
151+
config.getConfig(key).entrySet().forEach(
152+
e -> props.setProperty(e.getKey(), e.getValue().unwrapped().toString()));
153+
}
154+
return props;
155+
}
156+
}

modules/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
<module>jooby-flyway</module>
3434
<module>jooby-ebean</module>
3535
<module>jooby-redis</module>
36+
<module>jooby-kafka</module>
3637
<module>jooby-caffeine</module>
3738

3839
<module>jooby-jackson</module>

0 commit comments

Comments
 (0)