[feat][io] Add MQTT sink connector#22
Conversation
Signed-off-by: Dream95 <zhou_8621@163.com>
There was a problem hiding this comment.
Pull request overview
Adds a new MQTT sink connector module to the Pulsar IO connectors suite, enabling publishing Pulsar records to an MQTT broker using the HiveMQ MQTT client (tracked by Issue #21).
Changes:
- Introduces a new
mqttGradle subproject with an MQTT sink implementation and configuration model. - Adds service descriptor metadata and distribution wiring so the connector is packaged with the IO distribution.
- Adds unit tests for config loading/validation and an end-to-end sink test using a Mosquitto Testcontainers broker.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| settings.gradle.kts | Registers the new mqtt subproject in the Gradle build. |
| README.md | Lists MQTT among the supported connectors. |
| mqtt/build.gradle.kts | Adds the MQTT connector module build (plugins + dependencies). |
| mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java | Implements the MQTT sink using HiveMQ MQTT 5 async client. |
| mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java | Adds connector configuration, YAML/map loading, and validation. |
| mqtt/src/main/resources/META-INF/services/pulsar-io.yaml | Declares the connector service metadata for Pulsar IO discovery. |
| mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java | Adds an E2E publish/subscribe test against Mosquitto via Testcontainers. |
| mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java | Tests YAML and map+secrets config loading and validation behavior. |
| mqtt/src/test/resources/sinkConfig.yaml | Provides a sample config used by config-loading tests. |
| gradle/libs.versions.toml | Introduces the HiveMQ MQTT client dependency version and catalog entry. |
| distribution/io/build.gradle.kts | Ensures the MQTT connector NAR is included in the IO distribution build. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| implementation(libs.jackson.databind) | ||
| implementation(libs.jackson.dataformat.yaml) | ||
| implementation(libs.commons.lang3) | ||
| implementation(libs.hivemq.mqtt.client) |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # | ||
| name: mqtt-sink |
| @Connector( | ||
| name = "mqtt-sink", | ||
| type = IOType.SINK, | ||
| help = "A sink connector that moves messages from Pulsar to MQTT.", | ||
| configClass = MqttSinkConfig.class | ||
| ) |
| Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); | ||
| Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2"); | ||
| Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0"); | ||
| Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0"); |
| private File getFile(String name) { | ||
| ClassLoader classLoader = getClass().getClassLoader(); | ||
| return new File(classLoader.getResource(name).getFile()); | ||
| } |
| subscriber.disconnectWith() | ||
| .sessionExpiryInterval(0) | ||
| .send() | ||
| .get(10, TimeUnit.SECONDS); |
|
Suggestion: split unit tests (Mockito) from integration tests (TestContainers)
Because the class is named I'd suggest separating the two concerns: 1.
To make these mockable, 2. Concretely in
Net effect: Example:
|
Signed-off-by: Dream95 <zhou_8621@163.com>
|
@david-streamlio I've addressed the review comments and updated the PR. Could you please take another look? Thanks! |
| private String serverHost; | ||
|
|
||
| @FieldDoc( | ||
| required = true, |
| try { | ||
| mqttClient.disconnectWith() | ||
| .send() | ||
| .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); | ||
| } catch (Exception e) { | ||
| log.warn("Failed to disconnect MQTT client cleanly", e); | ||
| } |
Signed-off-by: Dream95 <zhou_8621@163.com>
Motivation
Add MQTT sink to publish Pulsar records to MQTT brokers.
Solution
use HiveMQ MQTT Client
serverHost,serverPort(default1883)topicclientIdusername/password(also supports secrets namedusername/password)qos(0|1|2, default0)keepAliveIntervalSec(default60)connectionTimeoutMsfor connect/disconnect wait (default10000)cleanStart(defaulttrue)sslEnabledMain Issue: #21