Skip to content

Commit 7342eac

Browse files
authored
客户端拦截器允许修改主题 (#331)
* 调整拦截器,可以动态修改生产主题 * 调整客户端元数据,返回策略字段 * 添加报错信息
1 parent 81b294a commit 7342eac

13 files changed

Lines changed: 107 additions & 28 deletions

File tree

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumeContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.common.collect.Maps;
2020
import com.google.common.collect.Sets;
2121
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
22+
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
2223
import org.joyqueue.client.internal.nameserver.NameServerConfig;
2324

2425
import java.util.List;
@@ -36,14 +37,16 @@ public class ConsumeContext {
3637
private String topic;
3738
private String app;
3839
private NameServerConfig nameserver;
40+
private TopicMetadata topicMetadata;
3941
private List<ConsumeMessage> messages;
4042
private Map<Object, Object> attributes;
4143
private Set<ConsumeMessage> messageFilter;
4244

43-
public ConsumeContext(String topic, String app, NameServerConfig nameserver, List<ConsumeMessage> messages) {
45+
public ConsumeContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List<ConsumeMessage> messages) {
4446
this.topic = topic;
4547
this.app = app;
46-
this.nameserver = nameserver;
48+
this.nameserver = nameserverConfig;
49+
this.topicMetadata = topicMetadata;
4750
this.messages = messages;
4851
}
4952

@@ -59,6 +62,10 @@ public NameServerConfig getNameserver() {
5962
return nameserver;
6063
}
6164

65+
public TopicMetadata getTopicMetadata() {
66+
return topicMetadata;
67+
}
68+
6269
public List<ConsumeMessage> getMessages() {
6370
return messages;
6471
}

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumerInvocation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
*/
1616
package org.joyqueue.client.internal.consumer.interceptor;
1717

18+
import org.apache.commons.collections.CollectionUtils;
1819
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
1920
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
2021
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
22+
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
2123
import org.joyqueue.client.internal.nameserver.NameServerConfig;
22-
import org.apache.commons.collections.CollectionUtils;
2324

2425
import java.util.Collections;
2526
import java.util.List;
@@ -35,22 +36,24 @@ public class ConsumerInvocation {
3536
private ConsumerConfig config;
3637
private String topic;
3738
private NameServerConfig nameServerConfig;
39+
private TopicMetadata topicMetadata;
3840
private List<ConsumeMessage> messages;
3941
private ConsumerInterceptorManager consumerInterceptorManager;
4042
private ConsumerInvoker consumerInvoker;
4143

42-
public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, List<ConsumeMessage> messages,
44+
public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, TopicMetadata topicMetadata, List<ConsumeMessage> messages,
4345
ConsumerInterceptorManager consumerInterceptorManager, ConsumerInvoker consumerInvoker) {
4446
this.config = config;
4547
this.topic = topic;
4648
this.nameServerConfig = nameServerConfig;
49+
this.topicMetadata = topicMetadata;
4750
this.messages = messages;
4851
this.consumerInterceptorManager = consumerInterceptorManager;
4952
this.consumerInvoker = consumerInvoker;
5053
}
5154

5255
public List<ConsumeReply> invoke() {
53-
ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, Collections.unmodifiableList(messages));
56+
ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages));
5457
List<ConsumerInterceptor> interceptors = consumerInterceptorManager.getSortedInterceptors();
5558

5659
if (CollectionUtils.isEmpty(interceptors)) {

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/support/TopicMessageConsumerDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ protected List<ConsumeReply> doDispatch(TopicMetadata topicMetadata, ConsumerPol
109109

110110
protected List<ConsumeReply> doBatchDispatch(TopicMetadata topicMetadata, ConsumerPolicy consumerPolicy,
111111
List<ConsumeMessage> messages, List<BatchMessageListener> listeners) {
112-
return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager,
112+
return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager,
113113
new BatchConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke();
114114
}
115115

116116
protected List<ConsumeReply> doOnceDispatch(TopicMetadata topicMetadata, final ConsumerPolicy consumerPolicy, final List<ConsumeMessage> messages, final List<MessageListener> listeners) {
117-
return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager,
117+
return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager,
118118
new OnceConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke();
119119
}
120120
}

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/converter/ClusterMetadataConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map<I
149149

150150
return new TopicMetadata(code, topic.getProducerPolicy(), topic.getConsumerPolicy(), topic.getType(), partitionGroups, partitions, partitionMap, partitionGroupMap, brokers,
151151
Lists.newArrayList(writableBrokers), Lists.newArrayList(readableBrokers), nearbyBrokers, Lists.newArrayList(writableNearbyBrokers), Lists.newArrayList(readableNearbyBrokers),
152-
brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getCode());
152+
brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getParams(), topic.getCode());
153153
}
154154

155155
public static PartitionGroupMetadata convertPartitionGroupMetadata(String topic, TopicPartitionGroup partitionGroup, Map<Integer, BrokerNode> brokers) {

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/domain/TopicMetadata.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class TopicMetadata implements Serializable {
3939
private ProducerPolicy producerPolicy;
4040
private ConsumerPolicy consumerPolicy;
4141
private TopicType type;
42+
private Map<String, String> params;
4243
private JoyQueueCode code;
4344

4445
private List<PartitionGroupMetadata> partitionGroups;
@@ -76,7 +77,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy
7677
List<PartitionMetadata> partitions, Map<Short, PartitionMetadata> partitionMap, Map<Integer, PartitionGroupMetadata> partitionGroupMap, List<BrokerNode> brokers,
7778
List<BrokerNode> writableBrokers, List<BrokerNode> readableBrokers, List<BrokerNode> nearbyBrokers, List<BrokerNode> writableNearbyBrokers,
7879
List<BrokerNode> readableNearbyBrokers, Map<Integer, BrokerNode> brokerMap, Map<Integer, List<PartitionMetadata>> brokerPartitions,
79-
Map<Integer, List<PartitionGroupMetadata>> brokerPartitionGroups, boolean allAvailable, JoyQueueCode code) {
80+
Map<Integer, List<PartitionGroupMetadata>> brokerPartitionGroups, boolean allAvailable, Map<String, String> params, JoyQueueCode code) {
8081
this.topic = topic;
8182
this.producerPolicy = producerPolicy;
8283
this.consumerPolicy = consumerPolicy;
@@ -95,6 +96,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy
9596
this.brokerPartitions = brokerPartitions;
9697
this.brokerPartitionGroups = brokerPartitionGroups;
9798
this.allAvailable = allAvailable;
99+
this.params = params;
98100
this.code = code;
99101
}
100102

@@ -174,6 +176,10 @@ public JoyQueueCode getCode() {
174176
return code;
175177
}
176178

179+
public Map<String, String> getParams() {
180+
return params;
181+
}
182+
177183
public void setAttachments(ConcurrentMap<Object, Object> attachments) {
178184
this.attachments = attachments;
179185
}

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProduceContext.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.joyqueue.client.internal.producer.interceptor;
1717

1818
import com.google.common.collect.Maps;
19+
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
1920
import org.joyqueue.client.internal.nameserver.NameServerConfig;
2021
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
2122

@@ -32,14 +33,16 @@ public class ProduceContext {
3233

3334
private String topic;
3435
private String app;
35-
private NameServerConfig nameserver;
36+
private NameServerConfig nameserverConfig;
37+
private TopicMetadata topicMetadata;
3638
private List<ProduceMessage> messages;
3739
private Map<Object, Object> attributes;
3840

39-
public ProduceContext(String topic, String app, NameServerConfig nameserver, List<ProduceMessage> messages) {
41+
public ProduceContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List<ProduceMessage> messages) {
4042
this.topic = topic;
4143
this.app = app;
42-
this.nameserver = nameserver;
44+
this.nameserverConfig = nameserverConfig;
45+
this.topicMetadata = topicMetadata;
4346
this.messages = messages;
4447
}
4548

@@ -51,14 +54,18 @@ public String getApp() {
5154
return app;
5255
}
5356

54-
public NameServerConfig getNameserver() {
55-
return nameserver;
57+
public NameServerConfig getNameserverConfig() {
58+
return nameserverConfig;
5659
}
5760

5861
public List<ProduceMessage> getMessages() {
5962
return messages;
6063
}
6164

65+
public TopicMetadata getTopicMetadata() {
66+
return topicMetadata;
67+
}
68+
6269
public <T> T getAttribute(Object key) {
6370
if (attributes == null) {
6471
return null;

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProducerInvocation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public ProducerInvocation(ProducerConfig config, NameServerConfig nameServerConf
5151
}
5252

5353
public List<SendResult> invoke() {
54-
ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, Collections.unmodifiableList(messages));
54+
ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages));
5555
List<ProducerInterceptor> interceptors = producerInterceptorManager.getSortedInterceptors();
5656

5757
if (CollectionUtils.isEmpty(interceptors)) {

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/support/MessageProducerInner.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,30 @@ public List<SendResult> batchSend(List<ProduceMessage> messages, String txId, lo
122122
return doBatchSend(messages, txId, timeout, timeoutUnit, isOneway, failover, callback);
123123
}
124124

125-
public List<SendResult> doBatchSend(List<ProduceMessage> messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) {
125+
protected List<SendResult> doBatchSend(List<ProduceMessage> messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) {
126126
TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic());
127-
List<BrokerNode> brokerNodes = getAvailableBrokers(topicMetadata);
128127

129-
return doBatchSend(messages, topicMetadata, brokerNodes,
130-
txId, timeout, timeoutUnit, isOneway, failover, callback);
128+
try {
129+
return new ProducerInvocation(config, nameServerConfig, topicMetadata, messages, producerInterceptorManager, new ProducerInvoker() {
130+
@Override
131+
public List<SendResult> invoke(ProduceContext context) {
132+
TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic());
133+
List<BrokerNode> brokerNodes = getAvailableBrokers(topicMetadata);
134+
return doBatchSendInternal(messages, topicMetadata, brokerNodes, txId, timeout, timeoutUnit, isOneway, failover, callback);
135+
}
136+
137+
@Override
138+
public List<SendResult> reject(ProduceContext context) {
139+
throw new ProducerException("reject send", JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
140+
}
141+
}).invoke();
142+
} catch (Exception e) {
143+
if (e instanceof ProducerException) {
144+
throw (ProducerException) e;
145+
} else {
146+
throw new ProducerException(e);
147+
}
148+
}
131149
}
132150

133151
public List<SendResult> doBatchSend(List<ProduceMessage> messages, TopicMetadata topicMetadata, List<BrokerNode> brokers,

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/trace/interceptor/TraceProducerInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public boolean preSend(ProduceContext context) {
4040
TraceCaller caller = TraceBuilder.newInstance()
4141
.topic(context.getTopic())
4242
.app(context.getApp())
43-
.namespace(context.getNameserver().getNamespace())
43+
.namespace(context.getNameserverConfig().getNamespace())
4444
.type(TraceType.PRODUCER_SEND)
4545
.begin();
4646
context.putAttribute(CALLER_KEY, caller);

joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/transport/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public Command sync(Command request, long timeout) {
128128
}
129129

130130
if (!response.isSuccess()) {
131-
throw new ClientException(response.getHeader().getError(), response.getHeader().getStatus());
131+
throw new ClientException(transport.remoteAddress() + ":" + response.getHeader().getError(), response.getHeader().getStatus());
132132
}
133133
return response;
134134
}

0 commit comments

Comments
 (0)