Skip to content

Commit bf6ca72

Browse files
author
kindywu
committed
init
1 parent 82ffb63 commit bf6ca72

2 files changed

Lines changed: 168 additions & 0 deletions

File tree

modules/jooby-kafka/pom.xml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
5+
6+
<parent>
7+
<groupId>io.jooby</groupId>
8+
<artifactId>modules</artifactId>
9+
<version>2.9.3-SNAPSHOT</version>
10+
</parent>
11+
12+
<modelVersion>4.0.0</modelVersion>
13+
<artifactId>jooby-kafka</artifactId>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>com.google.code.findbugs</groupId>
18+
<artifactId>jsr305</artifactId>
19+
<scope>provided</scope>
20+
</dependency>
21+
22+
<dependency>
23+
<groupId>io.jooby</groupId>
24+
<artifactId>jooby</artifactId>
25+
<version>${jooby.version}</version>
26+
</dependency>
27+
28+
<!--add kafka dependencies-->
29+
<dependency>
30+
<groupId>org.apache.kafka</groupId>
31+
<artifactId>kafka-clients</artifactId>
32+
<version>2.6.0</version>
33+
</dependency>
34+
35+
<!-- Test dependencies -->
36+
<dependency>
37+
<groupId>org.junit.jupiter</groupId>
38+
<artifactId>junit-jupiter-engine</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.jacoco</groupId>
44+
<artifactId>org.jacoco.agent</artifactId>
45+
<classifier>runtime</classifier>
46+
<scope>test</scope>
47+
</dependency>
48+
</dependencies>
49+
</project>
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.kafka;
7+
8+
import com.typesafe.config.Config;
9+
import io.jooby.Extension;
10+
import io.jooby.Jooby;
11+
import io.jooby.ServiceRegistry;
12+
import org.apache.kafka.clients.consumer.KafkaConsumer;
13+
import org.apache.kafka.clients.producer.KafkaProducer;
14+
15+
import javax.annotation.Nonnull;
16+
import java.util.Properties;
17+
18+
/**
19+
* Redis module: https://jooby.io/modules/redis.
20+
* <p>
21+
* Usage:
22+
*
23+
* <pre>{@code
24+
* {
25+
* install(new RedisModule());
26+
*
27+
* get("/", ctx -> {
28+
* StatefulRedisConnection redis = require(StatefulRedisConnection.class);
29+
* // work with redis
30+
* });
31+
* }
32+
* }</pre>
33+
* <p>
34+
* application.conf:
35+
*
36+
* <pre>
37+
* redis = "redis://localhost:6379"
38+
* </pre>
39+
* <p>
40+
* Module is built on top of <a href="https://lettuce.io/">lettuce</a>. Once installed you are able
41+
* to work with:
42+
*
43+
* <ul>
44+
* <li>io.lettuce.core.RedisClient</li>
45+
* <li>io.lettuce.core.api.StatefulRedisConnection</li>
46+
* <li>io.lettuce.core.pubsub.StatefulRedisPubSubConnection</li>
47+
* </ul>
48+
* <p>
49+
* Alternative you can pass a redis URI:
50+
* <pre>
51+
* install(new RedisModule("redis://localhost:6379"));
52+
* </pre>
53+
*
54+
* @author edgar
55+
* @since 2.8.5
56+
*/
57+
public class KafkaModule implements Extension {
58+
Properties producerProps;
59+
Properties consumerProps;
60+
61+
/**
62+
* Creates a new kafka module. Value must be:
63+
* - Valid redis URI; or
64+
* - Property name
65+
*
66+
* @param producerProps kafka producer properties.
67+
* @param consumerProps kafka consumer properties.
68+
*/
69+
public KafkaModule(@Nonnull Properties producerProps, @Nonnull Properties consumerProps) {
70+
this.producerProps = producerProps;
71+
this.consumerProps = consumerProps;
72+
}
73+
74+
/**
75+
* Create a new redis module. The application configuration file must have a redis property, like:
76+
* <pre>
77+
* redis = "redis://localhost:6379"
78+
* </pre>
79+
*/
80+
public KafkaModule() {
81+
}
82+
83+
@Override
84+
public void install(@Nonnull Jooby application) {
85+
Config config = application.getConfig();
86+
87+
if (this.producerProps == null) {
88+
this.producerProps = properties(config, "kafka.producer");
89+
}
90+
91+
if (this.consumerProps == null) {
92+
this.consumerProps = properties(config, "kafka.consumer");
93+
}
94+
95+
ServiceRegistry registry = application.getServices();
96+
97+
if (this.producerProps != null) {
98+
KafkaProducer producer = new KafkaProducer(producerProps);
99+
application.onStop(producer::close);
100+
registry.putIfAbsent(KafkaProducer.class, producer);
101+
}
102+
103+
if (this.consumerProps != null) {
104+
KafkaConsumer consumer = new KafkaConsumer(consumerProps);
105+
application.onStop(consumer::close);
106+
registry.putIfAbsent(KafkaConsumer.class, consumer);
107+
}
108+
}
109+
110+
private static Properties properties(final Config config, final String key) {
111+
Properties props = new Properties();
112+
if (config.hasPath(key)) {
113+
// dump
114+
config.getConfig(key).entrySet().forEach(
115+
e -> props.setProperty(e.getKey(), e.getValue().unwrapped().toString()));
116+
}
117+
return props;
118+
}
119+
}

0 commit comments

Comments
 (0)