Skip to content

[feat][io] Add MQTT sink connector#22

Merged
david-streamlio merged 3 commits into
apache:masterfrom
Dream95:feat_mqtt_connector
May 10, 2026
Merged

[feat][io] Add MQTT sink connector#22
david-streamlio merged 3 commits into
apache:masterfrom
Dream95:feat_mqtt_connector

Conversation

@Dream95
Copy link
Copy Markdown
Contributor

@Dream95 Dream95 commented May 6, 2026

Motivation

Add MQTT sink to publish Pulsar records to MQTT brokers.

Solution

use HiveMQ MQTT Client

  • Config :
    • serverHost, serverPort (default 1883)
    • topic
    • optional clientId
    • optional username / password (also supports secrets named username / password)
    • qos (0|1|2, default 0)
    • keepAliveIntervalSec (default 60)
    • connectionTimeoutMs for connect/disconnect wait (default 10000)
    • cleanStart (default true)
    • sslEnabled

Main Issue: #21

Signed-off-by: Dream95 <zhou_8621@163.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new MQTT sink connector module to the Pulsar IO connectors suite, enabling publishing Pulsar records to an MQTT broker using the HiveMQ MQTT client (tracked by Issue #21).

Changes:

  • Introduces a new mqtt Gradle subproject with an MQTT sink implementation and configuration model.
  • Adds service descriptor metadata and distribution wiring so the connector is packaged with the IO distribution.
  • Adds unit tests for config loading/validation and an end-to-end sink test using a Mosquitto Testcontainers broker.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
settings.gradle.kts Registers the new mqtt subproject in the Gradle build.
README.md Lists MQTT among the supported connectors.
mqtt/build.gradle.kts Adds the MQTT connector module build (plugins + dependencies).
mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java Implements the MQTT sink using HiveMQ MQTT 5 async client.
mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java Adds connector configuration, YAML/map loading, and validation.
mqtt/src/main/resources/META-INF/services/pulsar-io.yaml Declares the connector service metadata for Pulsar IO discovery.
mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java Adds an E2E publish/subscribe test against Mosquitto via Testcontainers.
mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java Tests YAML and map+secrets config loading and validation behavior.
mqtt/src/test/resources/sinkConfig.yaml Provides a sample config used by config-loading tests.
gradle/libs.versions.toml Introduces the HiveMQ MQTT client dependency version and catalog entry.
distribution/io/build.gradle.kts Ensures the MQTT connector NAR is included in the IO distribution build.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mqtt/build.gradle.kts
implementation(libs.jackson.databind)
implementation(libs.jackson.dataformat.yaml)
implementation(libs.commons.lang3)
implementation(libs.hivemq.mqtt.client)
# specific language governing permissions and limitations
# under the License.
#
name: mqtt-sink
Comment on lines +35 to +40
@Connector(
name = "mqtt-sink",
type = IOType.SINK,
help = "A sink connector that moves messages from Pulsar to MQTT.",
configClass = MqttSinkConfig.class
)
Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2");
Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0");
Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0");
Comment on lines +114 to +117
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
}
Comment on lines +109 to +112
subscriber.disconnectWith()
.sessionExpiryInterval(0)
.send()
.get(10, TimeUnit.SECONDS);
@david-streamlio
Copy link
Copy Markdown
Contributor

david-streamlio commented May 8, 2026

Suggestion: split unit tests (Mockito) from integration tests (TestContainers)

MqttSinkTest currently does two different jobs in one file:

  • It stands up a real eclipse-mosquitto:2 broker via TestContainers and runs a full publish/subscribe round‑trip — that's an integration test.
  • It uses mock(SinkContext.class) from Mockito — that's a unit‑test idiom.

Because the class is named *Test, Gradle's default test task will execute it on every build, which means ./gradlew test now requires a running Docker daemon for every contributor and every CI lane. That's a slow and brittle default for what should be the fast feedback loop.

I'd suggest separating the two concerns:

1. MqttSinkTest — pure unit tests with Mockito (no Docker).
Cover the connector's own logic in isolation by mocking Mqtt5AsyncClient (and SinkContext, Record). Good candidates:

  • record.ack() is invoked when the publish CompletableFuture completes successfully.
  • record.fail() is invoked when the publish future completes exceptionally, and on the synchronous catch path in write().
  • A null payload is normalized to new byte[0] before publishing.
  • Config validation failures from MqttSinkConfig#validate propagate out of open().
  • close() is a safe no‑op when open() was never called or failed mid‑way.
  • Username present → simpleAuth() branch; username blank → plain connect branch.

To make these mockable, MqttSink#open would benefit from a small seam — e.g. a package‑private Mqtt5AsyncClient buildClient(MqttSinkConfig) method, or a MqttClientFactory injected via constructor — so tests can substitute a mock without PowerMock or reflection.

2. MqttSinkIntegrationTest (or MqttSinkIT) — TestContainers + real Mosquitto.
Keep the current end‑to‑end scenario here. This is where TestContainers shines: it validates the wire protocol, QoS semantics, auth, TLS, reconnect behavior — things mocks can't catch. Run it via a dedicated source set / Gradle task (e.g. integrationTest) so it stays out of the default test lane and only runs where Docker is available.

Concretely in mqtt/build.gradle.kts:

  • Add testImplementation(libs.mockito.core) (and mockito-junit-jupiter / mockito-testng to match the test runner).
  • Add an integrationTest source set + task, move testcontainers to integrationTestImplementation, and wire it into CI separately from test.

Net effect: ./gradlew test stays fast and Docker‑free for day‑to‑day development, while ./gradlew integrationTest gives you the real‑broker confidence you currently get from MqttSinkTest.


Example: MqttSinkIntegrationTest

A cleaned‑up version of what's in the PR today, lifted into the integration‑test source set. Note that mocking SinkContext here is fine — it's a collaborator we don't care to validate, not the system under test.

// mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
package org.apache.pulsar.io.mqtt;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class MqttSinkIntegrationTest {

    private static final int MQTT_PORT = 1883;
    private static final String TEST_TOPIC = "pulsar/mqtt/it";

    private final GenericContainer<?> mosquitto = new GenericContainer<>(
            DockerImageName.parse("eclipse-mosquitto:2"))
            .withExposedPorts(MQTT_PORT);

    @BeforeClass(alwaysRun = true)
    public void startBroker() {
        mosquitto.start();
    }

    @AfterClass(alwaysRun = true)
    public void stopBroker() {
        mosquitto.stop();
    }

    @Test
    public void publishesPayloadsToBroker() throws Exception {
        BlockingQueue<String> received = new LinkedBlockingQueue<>();
        CountDownLatch acked = new CountDownLatch(3);
        AtomicBoolean failed = new AtomicBoolean(false);

        try (Mqtt5BlockingClient subscriber = MqttClient.builder()
                .useMqttVersion5()
                .serverHost(mosquitto.getHost())
                .serverPort(mosquitto.getMappedPort(MQTT_PORT))
                .identifier("mqtt-it-subscriber")
                .buildBlocking()) {

            subscriber.connect();
            subscriber.toAsync().subscribeWith()
                    .topicFilter(TEST_TOPIC)
                    .callback(p -> received.add(
                            new String(p.getPayloadAsBytes(), StandardCharsets.UTF_8)))
                    .send()
                    .get(10, TimeUnit.SECONDS);

            Map<String, Object> config = new HashMap<>();
            config.put("serverHost", mosquitto.getHost());
            config.put("serverPort", mosquitto.getMappedPort(MQTT_PORT));
            config.put("topic", TEST_TOPIC);
            config.put("qos", 1);
            config.put("clientId", "mqtt-it-publisher");

            try (MqttSink sink = new MqttSink()) {
                sink.open(config, mock(SinkContext.class));
                for (int i = 0; i < 3; i++) {
                    sink.write(new TestRecord(
                            ("msg-" + i).getBytes(StandardCharsets.UTF_8), acked, failed));
                }

                assertTrue(acked.await(10, TimeUnit.SECONDS), "publish acks timed out");
                assertFalse(failed.get(), "no record should have failed");
                assertEquals(received.poll(10, TimeUnit.SECONDS), "msg-0");
                assertEquals(received.poll(10, TimeUnit.SECONDS), "msg-1");
                assertEquals(received.poll(10, TimeUnit.SECONDS), "msg-2");
            }
        }
    }

    private static final class TestRecord implements Record<byte[]> {
        private final byte[] value;
        private final CountDownLatch ack;
        private final AtomicBoolean failed;

        TestRecord(byte[] value, CountDownLatch ack, AtomicBoolean failed) {
            this.value = value;
            this.ack = ack;
            this.failed = failed;
        }

        @Override public byte[] getValue() { return value; }
        @Override public void ack() { ack.countDown(); }
        @Override public void fail() { failed.set(true); }
    }
}

And the matching Gradle wiring (Kotlin DSL) for mqtt/build.gradle.kts:

val integrationTest by sourceSets.creating {
    compileClasspath += sourceSets.main.get().output + sourceSets.test.get().output
    runtimeClasspath += output + compileClasspath
}

configurations["integrationTestImplementation"].extendsFrom(configurations.testImplementation.get())
configurations["integrationTestRuntimeOnly"].extendsFrom(configurations.testRuntimeOnly.get())

dependencies {
    "integrationTestImplementation"(libs.testcontainers)
    "integrationTestImplementation"(libs.mockito.core)
}

tasks.register<Test>("integrationTest") {
    description = "Runs integration tests that require Docker."
    group = "verification"
    testClassesDirs = integrationTest.output.classesDirs
    classpath = integrationTest.runtimeClasspath
    useTestNG()
    shouldRunAfter("test")
}

tasks.named("check") { dependsOn("integrationTest") }

With that in place, ./gradlew test runs only the fast Mockito‑based unit tests, and ./gradlew integrationTest (or ./gradlew check) runs the Mosquitto‑backed end‑to‑end test above.

@Dream95
Copy link
Copy Markdown
Contributor Author

Dream95 commented May 9, 2026

@david-streamlio I've addressed the review comments and updated the PR. Could you please take another look? Thanks!

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

private String serverHost;

@FieldDoc(
required = true,
Comment on lines +125 to +131
try {
mqttClient.disconnectWith()
.send()
.get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to disconnect MQTT client cleanly", e);
}
@david-streamlio david-streamlio requested a review from lhotari May 9, 2026 15:19
@david-streamlio david-streamlio merged commit 0537935 into apache:master May 10, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants