Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit befa8c7

Browse files
beyond-upgaryli1019
authored andcommitted
[BitSail][Connector] Migrate rocketmq Sink connector to V1 interface (#457)
1 parent d18eeb6 commit befa8c7

16 files changed

Lines changed: 1240 additions & 1 deletion

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.constants;
18+
19+
public class OptionalProducerConfig {
20+
public static final String INSTANCE_NAME = "instanceName";
21+
public static final String VIP_CHANNEL = "vipChannelEnabled";
22+
public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums";
23+
public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch";
24+
public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval";
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.constants;
18+
19+
public class RocketMQConstants {
20+
21+
public static final String CONNECTOR_NAME = "rocketmq";
22+
23+
}

bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ public enum RocketMQErrorCode implements ErrorCode {
2222

2323
CONSUMER_CREATE_FAILED("RocketMQ-1", "RocketMQ Consumer create failed."),
2424
CONSUMER_FETCH_OFFSET_FAILED("RocketMQ-2", "RocketMQ Consumer fetch offset failed."),
25-
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed.");
25+
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."),
26+
REQUIRED_VALUE("RocketMQ-4", "You missed parameter which is required, please check your configuration."),
27+
UNSUPPORTED_FORMAT("RocketMQ-5", "Unsupported output format.");
2628

2729
public String code;
2830
public String description;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.format;
18+
19+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
20+
import com.bytedance.bitsail.common.row.Row;
21+
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
22+
import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema;
23+
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.nio.charset.StandardCharsets;
29+
import java.util.List;
30+
import java.util.stream.Collectors;
31+
32+
public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema {
33+
private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class);
34+
35+
private static final long serialVersionUID = 3L;
36+
37+
private final List<Integer> partitionKeyIndices;
38+
private final List<Integer> keyIndices;
39+
private final transient JsonRowSerializationSchema rowSerializationSchema;
40+
41+
public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo,
42+
List<Integer> partitionKeyIndices, List<Integer> keyIndices) {
43+
this.partitionKeyIndices = partitionKeyIndices;
44+
this.keyIndices = keyIndices;
45+
this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo);
46+
}
47+
48+
@Override
49+
public byte[] serializeKey(Row row) {
50+
if (keyIndices != null) {
51+
String key = this.keyIndices.stream()
52+
.map(i -> {
53+
Object keyField = row.getField(i);
54+
if (keyField != null) {
55+
return keyField.toString();
56+
}
57+
LOG.warn("Found null key in row: [{}]", row);
58+
return null;
59+
})
60+
.filter(StringUtils::isNotEmpty)
61+
.collect(Collectors.joining());
62+
return key.getBytes(StandardCharsets.UTF_8);
63+
} else {
64+
return null;
65+
}
66+
}
67+
68+
@Override
69+
public byte[] serializeValue(Row row) {
70+
return rowSerializationSchema.serialize(row);
71+
}
72+
73+
@Override
74+
public String getPartitionKey(Row row) {
75+
if (partitionKeyIndices != null) {
76+
return this.partitionKeyIndices.stream()
77+
.map(i -> {
78+
Object partitionField = row.getField(i);
79+
if (partitionField != null) {
80+
return partitionField.toString();
81+
}
82+
LOG.warn("Found null key in row: [{}]", row);
83+
return null;
84+
})
85+
.filter(StringUtils::isNotEmpty)
86+
.collect(Collectors.joining());
87+
} else {
88+
return null;
89+
}
90+
}
91+
}
92+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.format;
18+
19+
import com.bytedance.bitsail.common.BitSailException;
20+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
21+
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
22+
import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode;
23+
import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat;
24+
25+
import java.util.List;
26+
27+
public class RocketMQSerializationFactory {
28+
29+
RowTypeInfo rowTypeInfo;
30+
List<Integer> partitionIndices;
31+
List<Integer> keyIndices;
32+
33+
public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List<Integer> partitionIndices, List<Integer> keyIndices) {
34+
this.rowTypeInfo = rowTypeInfo;
35+
this.partitionIndices = partitionIndices;
36+
this.keyIndices = keyIndices;
37+
}
38+
39+
public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) {
40+
if (format == RocketMQSinkFormat.JSON) {
41+
return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices);
42+
}
43+
throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT,
44+
"unsupported sink format: " + format);
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.format;
18+
19+
import com.bytedance.bitsail.common.row.Row;
20+
21+
public interface RocketMQSerializationSchema {
22+
23+
byte[] serializeKey(Row row);
24+
25+
byte[] serializeValue(Row row);
26+
27+
default String getPartitionKey(Row row) {
28+
return null;
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.option;
18+
19+
import com.bytedance.bitsail.common.annotation.Essential;
20+
import com.bytedance.bitsail.common.option.ConfigOption;
21+
import com.bytedance.bitsail.common.option.WriterOptions;
22+
23+
import static com.bytedance.bitsail.common.option.ConfigOptions.key;
24+
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;
25+
26+
public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions {
27+
28+
@Essential
29+
ConfigOption<String> NAME_SERVER_ADDRESS =
30+
key(WRITER_PREFIX + "name_server_address")
31+
.noDefaultValue(String.class);
32+
33+
ConfigOption<String> PRODUCER_GROUP =
34+
key(WRITER_PREFIX + "producer_group")
35+
.noDefaultValue(String.class);
36+
37+
@Essential
38+
ConfigOption<String> TOPIC =
39+
key(WRITER_PREFIX + "topic")
40+
.noDefaultValue(String.class);
41+
42+
ConfigOption<String> TAG =
43+
key(WRITER_PREFIX + "tag")
44+
.noDefaultValue(String.class);
45+
46+
ConfigOption<Boolean> ENABLE_BATCH_FLUSH =
47+
key(WRITER_PREFIX + "enable_batch_flush")
48+
.defaultValue(true);
49+
50+
ConfigOption<Integer> BATCH_SIZE =
51+
key(WRITER_PREFIX + "batch_size")
52+
.defaultValue(100);
53+
54+
/**
55+
* when encounter errors while sending:<br/>
56+
* true: log the error<br/>
57+
* false: throw exceptions
58+
*/
59+
ConfigOption<Boolean> LOG_FAILURES_ONLY =
60+
key(WRITER_PREFIX + "log_failures_only")
61+
.defaultValue(false);
62+
63+
ConfigOption<Boolean> ENABLE_SYNC_SEND =
64+
key(WRITER_PREFIX + "enable_sync_send")
65+
.defaultValue(false);
66+
67+
ConfigOption<String> ACCESS_KEY =
68+
key(WRITER_PREFIX + "access_key")
69+
.noDefaultValue(String.class);
70+
71+
ConfigOption<String> SECRET_KEY =
72+
key(WRITER_PREFIX + "secret_key")
73+
.noDefaultValue(String.class);
74+
75+
ConfigOption<Integer> SEND_FAILURE_RETRY_TIMES =
76+
key(WRITER_PREFIX + "send_failure_retry_times")
77+
.defaultValue(3);
78+
79+
ConfigOption<Integer> SEND_MESSAGE_TIMEOUT =
80+
key(WRITER_PREFIX + "send_message_timeout_ms")
81+
.defaultValue(3000);
82+
83+
ConfigOption<Integer> MAX_MESSAGE_SIZE =
84+
key(WRITER_PREFIX + "max_message_size_bytes")
85+
.defaultValue(4194304);
86+
87+
ConfigOption<String> KEY_FIELDS =
88+
key(WRITER_PREFIX + "key")
89+
.noDefaultValue(String.class);
90+
91+
ConfigOption<String> PARTITION_FIELDS =
92+
key(WRITER_PREFIX + "partition_fields")
93+
.noDefaultValue(String.class);
94+
95+
ConfigOption<String> FORMAT =
96+
key(WRITER_PREFIX + "format")
97+
.defaultValue("json");
98+
99+
ConfigOption<Integer> DEFAULT_TOPIC_QUEUE_NUMS =
100+
key(WRITER_PREFIX + "default_topic_queue_nums")
101+
.defaultValue(4);
102+
103+
ConfigOption<Integer> COMPRESS_MSG_BODY_SIZE =
104+
key(WRITER_PREFIX + "compress_msg_body_over_how_much")
105+
.defaultValue(4096);
106+
107+
ConfigOption<Integer> HEART_BEAT_BROKER_INTERVAL =
108+
key(WRITER_PREFIX + "heart_beat_broker_interval")
109+
.defaultValue(30000);
110+
111+
ConfigOption<Boolean> VIP_CHANNEL_ENABLED =
112+
key(WRITER_PREFIX + "vip_channel_enabled")
113+
.defaultValue(false);
114+
115+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.rocketmq.sink;
18+
19+
import org.apache.rocketmq.client.producer.MessageQueueSelector;
20+
import org.apache.rocketmq.common.message.Message;
21+
import org.apache.rocketmq.common.message.MessageQueue;
22+
23+
import java.util.List;
24+
25+
public class HashQueueSelector implements MessageQueueSelector {
26+
27+
private int nullKeyCount;
28+
29+
public HashQueueSelector() {
30+
super();
31+
nullKeyCount = 0;
32+
}
33+
34+
@Override
35+
public MessageQueue select(List<MessageQueue> mqList, Message message, Object partitionKeys) {
36+
int queueId;
37+
38+
if (partitionKeys != null) {
39+
queueId = partitionKeys.hashCode() % mqList.size();
40+
} else {
41+
queueId = nullKeyCount % mqList.size();
42+
nullKeyCount = (nullKeyCount + 1) % mqList.size();
43+
}
44+
45+
return mqList.get(queueId);
46+
}
47+
}
48+

0 commit comments

Comments
 (0)