Skip to content

Commit f1b83f5

Browse files
authored
[ISSUE-48] Support for Spring Cloud Stream (#318)
* Support for Spring Cloud Stream (#48)
1 parent d181d74 commit f1b83f5

28 files changed

Lines changed: 1748 additions & 1 deletion

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright 2019 The JoyQueue Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<parent>
23+
<artifactId>joyqueue-client-samples</artifactId>
24+
<groupId>org.joyqueue</groupId>
25+
<version>4.2.8-SNAPSHOT</version>
26+
</parent>
27+
<modelVersion>4.0.0</modelVersion>
28+
29+
<artifactId>joyqueue-client-samples-springcloud-stream</artifactId>
30+
<name>JoyQueue-Client-Samples-SpringCloud-Stream</name>
31+
<description>Client samples using SpringCloud Stream</description>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>org.joyqueue</groupId>
36+
<artifactId>openmessaging-spring-cloud-stream-binder</artifactId>
37+
<version>${project.version}</version>
38+
</dependency>
39+
</dependencies>
40+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright 2019 The JoyQueue Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.joyqueue.client.samples.springcloud.stream;
17+
18+
import org.springframework.cloud.stream.annotation.Input;
19+
import org.springframework.cloud.stream.annotation.Output;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessageChannel;
22+
import org.springframework.messaging.SubscribableChannel;
23+
import org.springframework.stereotype.Component;
24+
25+
/**
26+
* Custom Processor
27+
*/
28+
@Component
29+
public interface CustomProcessor {
30+
31+
String INPUT_ORDER = "inputOrder";
32+
33+
String OUTPUT_ORDER = "outputOrder";
34+
35+
/**
36+
* 主题订阅通道
37+
* <p>
38+
* 订阅消息通道{@link SubscribableChannel}为消息通道{@link MessageChannel}子类,该通道的所有消息被{@link org.springframework.messaging.MessageHandler}消息处理器所订阅
39+
* </p>
40+
* @return {@link SubscribableChannel}
41+
*/
42+
@Input(INPUT_ORDER)
43+
SubscribableChannel inputOrder();
44+
45+
/**
46+
* 主题消息发布通道
47+
*
48+
* <p>
49+
* 消息通道{@link MessageChannel}用于接收消息,调用{@link MessageChannel#send(Message)}方法可以将消息发送至该消息通道中
50+
* </p>
51+
* @return {@link MessageChannel}
52+
*/
53+
@Output(OUTPUT_ORDER)
54+
MessageChannel outputOrder();
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2019 The JoyQueue Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.joyqueue.client.samples.springcloud.stream;
17+
18+
import org.springframework.boot.SpringApplication;
19+
import org.springframework.boot.autoconfigure.SpringBootApplication;
20+
import org.springframework.cloud.stream.annotation.EnableBinding;
21+
import org.springframework.context.ConfigurableApplicationContext;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.messaging.support.GenericMessage;
24+
25+
@EnableBinding(CustomProcessor.class)
26+
@SpringBootApplication(scanBasePackages = {"org.joyqueue.client.samples.springcloud.stream"})
27+
public class StreamBootstrap {
28+
29+
public static void main(String[] args) throws InterruptedException {
30+
System.setProperty("spring.profiles.active", "stream");
31+
ConfigurableApplicationContext run = SpringApplication.run(StreamBootstrap.class, args);
32+
CustomProcessor processor = run.getBean(CustomProcessor.class);
33+
34+
for (int i = 0; i < 100; i++) {
35+
Message<String> message = new GenericMessage<>("Hello - " + i);
36+
//Message<String> received = (Message<String>) messageCollector.forChannel(processor.output()).poll();
37+
//Assert.assertThat(received.getPayload(), equalTo("hello world"));
38+
processor.outputOrder().send(message);
39+
40+
Thread.sleep(1000);
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright 2019 The JoyQueue Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.joyqueue.client.samples.springcloud.stream;
17+
18+
import org.springframework.cloud.stream.annotation.StreamListener;
19+
import org.springframework.messaging.Message;
20+
import org.springframework.stereotype.Service;
21+
22+
/**
23+
* Test for Stream Listener
24+
*/
25+
@Service
26+
public class StreamListenerService {
27+
28+
@StreamListener(CustomProcessor.INPUT_ORDER)
29+
public void receiveMessage(Message<String> message) {
30+
System.out.println(String.format("接收到消息对象,headers=%s, payload=%s", message.getHeaders().toString(), message.getPayload()));
31+
}
32+
33+
@StreamListener(CustomProcessor.INPUT_ORDER)
34+
public void receiveMessageBody(String receiveMsg) {
35+
System.out.println("接收到消息体: " + receiveMsg);
36+
}
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#
2+
# Copyright 2019 The JoyQueue Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
spring:
17+
cloud:
18+
stream:
19+
bindings:
20+
outputOrder:
21+
binder: oms1
22+
destination: jqtopic
23+
content-type: text/plain
24+
producer:
25+
#通过该参数指定了分区键的表达式规则,可以根据实际的输出消息规则配置 SpEL 来生成合适的分区键
26+
partitionKeyExpression: payload
27+
partitionCount: 2
28+
inputOrder:
29+
binder: oms1
30+
destination: jqtopic
31+
content-type: text/plain
32+
group: group1
33+
consumer:
34+
concurrency: 50
35+
binders:
36+
oms1:
37+
type: oms
38+
default-binder: oms
39+
40+
oms:
41+
binder:
42+
url: oms:joyqueue://jqbinder@test-nameserver.jmq.xx.local:50088/UNKNOWN
43+
attributes:
44+
ACCOUNT_KEY: xxxx
45+
bindings:
46+
outputOrder:
47+
producer:
48+
group: demo-group
49+
sync: true
50+
inputOrder:
51+
consumer:
52+
enable: true
53+
batch: false
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Copyright 2019 The JoyQueue Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
# 采用Spring Cloud Stream与Spring Boot配置冲突, Spring Boot优先级会更高
18+
#spring.oms:
19+
# url: oms:joyqueue://test_app@127.0.0.1:50088/UNKNOWN
20+
# attributes:
21+
# - ACCOUNT_KEY: test_token

joyqueue-client/joyqueue-client-samples/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
<module>joyqueue-client-samples-spring</module>
3434
<module>joyqueue-client-samples-springboot</module>
3535
<module>joyqueue-client-samples-kafka</module>
36+
<module>joyqueue-client-samples-springcloud-stream</module>
3637
</modules>
3738

3839
<dependencies>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Spring Cloud Stream Binder for Openmessaging (JoyQueue)
2+
## 关于(About Spring Cloud Stream)
3+
- 官方文档(Official document): [docs.spring.io/spring-cloud-stream](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html)
4+
5+
## 如何使用(How to use)
6+
- 使用示例(Use sample) [Client Sample for Spring Cloud Stream](../../joyqueue-client/joyqueue-client-samples/joyqueue-client-samples-springcloud-stream)
7+
8+
## 配置(Configuration)
9+
自定义消息处理接口
10+
```java
11+
@Component
12+
public interface CustomProcessor {
13+
14+
String INPUT_ORDER = "inputOrder";
15+
16+
String OUTPUT_ORDER = "outputOrder";
17+
18+
/**
19+
* 主题订阅通道
20+
* <p>
21+
* 订阅消息通道{@link SubscribableChannel}为消息通道{@link MessageChannel}子类,该通道的所有消息被{@link org.springframework.messaging.MessageHandler}消息处理器所订阅
22+
* </p>
23+
* @return {@link SubscribableChannel}
24+
*/
25+
@Input(INPUT_ORDER)
26+
SubscribableChannel inputOrder();
27+
28+
/**
29+
* 主题消息发布通道
30+
*
31+
* <p>
32+
* 消息通道{@link MessageChannel}用于接收消息,调用{@link MessageChannel#send(Message)}方法可以将消息发送至该消息通道中
33+
* </p>
34+
* @return {@link MessageChannel}
35+
*/
36+
@Output(OUTPUT_ORDER)
37+
MessageChannel outputOrder();
38+
39+
}
40+
41+
```
42+
Configuration file
43+
```yaml
44+
spring:
45+
cloud:
46+
stream:
47+
bindings:
48+
#对应CustomProcessor接口定义的outputOrder()
49+
outputOrder:
50+
binder: oms1
51+
destination: jqtopic
52+
content-type: text/plain
53+
producer:
54+
#通过该参数指定了分区键的表达式规则,可以根据实际的输出消息规则配置 SpEL 来生成合适的分区键
55+
partitionKeyExpression: payload
56+
partitionCount: 2
57+
inputOrder:
58+
binder: oms1
59+
destination: jqtopic
60+
content-type: text/plain
61+
group: group1
62+
consumer:
63+
concurrency: 50
64+
binders:
65+
oms1:
66+
type: oms
67+
default-binder: oms
68+
#OMS(JoyQueue)对应的连接配置
69+
oms:
70+
binder:
71+
url: oms:joyqueue://jqbinder@test-nameserver.jmq.xx.local:50088/UNKNOWN
72+
attributes:
73+
ACCOUNT_KEY: xxxx
74+
bindings:
75+
#OMS的自定义配置
76+
outputOrder:
77+
producer:
78+
group: demo-group
79+
sync: true
80+
inputOrder:
81+
consumer:
82+
enable: true
83+
batch: true
84+
```

0 commit comments

Comments
 (0)