diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json index c537844dc84a..e0266d62f2e0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 4 } diff --git a/sdks/java/io/mqtt/build.gradle b/sdks/java/io/mqtt/build.gradle index 85d6c6c16f25..1f2d808ed7e0 100644 --- a/sdks/java/io/mqtt/build.gradle +++ b/sdks/java/io/mqtt/build.gradle @@ -27,8 +27,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.slf4j_api implementation library.java.joda_time - implementation "org.fusesource.mqtt-client:mqtt-client:1.15" - implementation "org.fusesource.hawtbuf:hawtbuf:1.11" + implementation "com.hivemq:hivemq-mqtt-client:1.3.15" testImplementation project(path: ":sdks:java:io:common") testImplementation library.java.activemq_broker testImplementation library.java.activemq_mqtt diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 72449c0697ae..aa9500e7f7b1 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -21,17 +21,24 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import java.io.IOException; import java.io.Serializable; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -52,12 +59,6 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.FutureConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.Message; -import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -299,29 +300,42 @@ private void populateDisplayData(DisplayData.Builder builder) { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - private MQTT createClient() throws Exception { + private Mqtt3BlockingClient createClient() throws Exception { LOG.debug("Creating MQTT client to {}", getServerUri()); - MQTT client = new MQTT(); - client.setHost(getServerUri()); - if (getUsername() != null) { - LOG.debug("MQTT client uses username {}", getUsername()); - client.setUserName(getUsername()); - client.setPassword(getPassword()); + URI uri = new URI(getServerUri()); + String host = uri.getHost(); + int port = uri.getPort(); + if (port == -1) { + port = "ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme()) ? 8883 : 1883; + } + + Mqtt3ClientBuilder builder = Mqtt3Client.builder().serverHost(host).serverPort(port); + + if ("ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme())) { + builder = builder.sslWithDefaultConfig(); } - if (getClientId() != null) { - String clientId = getClientId() + "-" + UUID.randomUUID().toString(); - clientId = - clientId.substring(0, Math.min(clientId.length(), MQTT_3_1_MAX_CLIENT_ID_LENGTH)); - LOG.debug("MQTT client id set to {}", clientId); - client.setClientId(clientId); + + String clientId = getClientId(); + if (clientId == null) { + clientId = UUID.randomUUID().toString(); } else { - String clientId = UUID.randomUUID().toString(); - clientId = - clientId.substring(0, Math.min(clientId.length(), MQTT_3_1_MAX_CLIENT_ID_LENGTH)); - LOG.debug("MQTT client id set to random value {}", clientId); - client.setClientId(clientId); + clientId = clientId + "-" + UUID.randomUUID().toString(); } - return client; + clientId = clientId.substring(0, Math.min(clientId.length(), MQTT_3_1_MAX_CLIENT_ID_LENGTH)); + LOG.debug("MQTT client id set to {}", clientId); + builder = builder.identifier(clientId); + + if (getUsername() != null) { + LOG.debug("MQTT client uses username {}", getUsername()); + builder = + builder + .simpleAuth() + .username(getUsername()) + .password(getPassword().getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth(); + } + + return builder.buildBlocking(); } } @@ -429,9 +443,9 @@ public void populateDisplayData(DisplayData.Builder builder) { static class MqttCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { @VisibleForTesting String clientId; - @VisibleForTesting transient List messages = new ArrayList<>(); + @VisibleForTesting transient List messages = new ArrayList<>(); - public MqttCheckpointMark(String id, List messages) { + public MqttCheckpointMark(String id, List messages) { this.clientId = id; this.messages = messages; } @@ -444,9 +458,9 @@ public MqttCheckpointMark(String id, List messages) { @Override public void finalizeCheckpoint() { LOG.debug("Finalizing checkpoint acknowledging pending messages for client ID {}", clientId); - for (Message message : messages) { + for (Mqtt3Publish message : messages) { try { - message.ack(); + message.acknowledge(); } catch (Exception e) { LOG.warn("Can't ack message for client ID {}", clientId, e); } @@ -480,7 +494,7 @@ public int hashCode() { static class Preparer { @VisibleForTesting String clientId; @VisibleForTesting Instant oldestMessageTimestamp = Instant.now(); - @VisibleForTesting transient List messages = new ArrayList<>(); + @VisibleForTesting transient List messages = new ArrayList<>(); public Preparer(MqttCheckpointMark checkpointMark) { clientId = checkpointMark.clientId; @@ -493,7 +507,7 @@ public Preparer(String id) { public Preparer() {} - public void add(Message message, Instant timestamp) { + public void add(Mqtt3Publish message, Instant timestamp) { if (timestamp.isBefore(oldestMessageTimestamp)) { oldestMessageTimestamp = timestamp; } @@ -501,7 +515,7 @@ public void add(Message message, Instant timestamp) { } MqttCheckpointMark newCheckpoint() { - List currentMessages = messages; + List currentMessages = messages; messages = new ArrayList<>(); oldestMessageTimestamp = Instant.now(); return new MqttCheckpointMark(clientId, currentMessages); @@ -532,7 +546,8 @@ public UnboundedReader createReader( new UnboundedMqttReader<>( this, preparer, - message -> (T) MqttRecord.of(message.getTopic(), message.getPayload())); + message -> + (T) MqttRecord.of(message.getTopic().toString(), message.getPayloadAsBytes())); } else { unboundedMqttReader = new UnboundedMqttReader<>(this, preparer); } @@ -570,12 +585,13 @@ static class UnboundedMqttReader extends UnboundedSource.UnboundedReader { private final UnboundedMqttSource source; - private MQTT client; - private BlockingConnection connection; + private Mqtt3BlockingClient client; + private Mqtt3BlockingClient.Mqtt3Publishes publishes; + private String clientId = ""; private T current; private Instant currentTimestamp; private final MqttCheckpointMark.Preparer checkpointPreparer; - private SerializableFunction extractFn; + private SerializableFunction extractFn; public UnboundedMqttReader( UnboundedMqttSource source, MqttCheckpointMark.Preparer checkpointPreparer) { @@ -586,13 +602,13 @@ public UnboundedMqttReader( } else { this.checkpointPreparer = new MqttCheckpointMark.Preparer(); } - this.extractFn = message -> (T) message.getPayload(); + this.extractFn = message -> (T) message.getPayloadAsBytes(); } public UnboundedMqttReader( UnboundedMqttSource source, MqttCheckpointMark.Preparer checkpointPreparer, - SerializableFunction extractFn) { + SerializableFunction extractFn) { this(source, checkpointPreparer); this.extractFn = extractFn; } @@ -603,11 +619,20 @@ public boolean start() throws IOException { Read spec = source.spec; try { client = spec.connectionConfiguration().createClient(); - LOG.debug("Reader client ID is {}", client.getClientId()); - checkpointPreparer.clientId = client.getClientId().toString(); - connection = createConnection(client); - connection.subscribe( - new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)}); + this.clientId = client.getConfig().getClientIdentifier().map(Object::toString).orElse(""); + LOG.debug("Reader client ID is {}", clientId); + checkpointPreparer.clientId = clientId; + client.connect(); + + // Subscribe and get the publishes stream with manual acks enabled + publishes = client.publishes(MqttGlobalPublishFilter.ALL, true); + + client + .subscribeWith() + .topicFilter(spec.connectionConfiguration().getTopic()) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + return advance(); } catch (Exception e) { throw new IOException(e); @@ -617,11 +642,12 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { try { - LOG.trace("MQTT reader (client ID {}) waiting message ...", client.getClientId()); - Message message = connection.receive(1, TimeUnit.SECONDS); - if (message == null) { + LOG.trace("MQTT reader (client ID {}) waiting message ...", clientId); + Optional messageOpt = publishes.receive(1, TimeUnit.SECONDS); + if (!messageOpt.isPresent()) { return false; } + Mqtt3Publish message = messageOpt.get(); current = this.extractFn.apply(message); currentTimestamp = Instant.now(); checkpointPreparer.add(message, currentTimestamp); @@ -633,10 +659,13 @@ public boolean advance() throws IOException { @Override public void close() throws IOException { - LOG.debug("Closing MQTT reader (client ID {})", client.getClientId()); + LOG.debug("Closing MQTT reader (client ID {})", clientId); try { - if (connection != null) { - connection.disconnect(); + if (publishes != null) { + publishes.close(); + } + if (client != null) { + client.disconnect(); } } catch (Exception e) { throw new IOException(e); @@ -764,8 +793,7 @@ private static class WriteFn extends DoFn { private final SerializableFunction payloadFn; private final boolean retained; - private transient MQTT client; - private transient BlockingConnection connection; + private transient Mqtt3BlockingClient client; public WriteFn(Write spec) { this.spec = spec; @@ -783,8 +811,9 @@ public WriteFn(Write spec) { public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); this.client = this.spec.connectionConfiguration().createClient(); - LOG.debug("MQTT writer client ID is {}", client.getClientId()); - this.connection = createConnection(client); + String clientId = client.getConfig().getClientIdentifier().map(Object::toString).orElse(""); + LOG.debug("MQTT writer client ID is {}", clientId); + this.client.connect(); } @ProcessElement @@ -793,32 +822,25 @@ public void processElement(ProcessContext context) throws Exception { byte[] payload = this.payloadFn.apply(element); String topic = this.topicFn.apply(element); LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8)); - this.connection.publish(topic, payload, QoS.AT_LEAST_ONCE, this.retained); + + client + .publishWith() + .topic(topic) + .payload(payload) + .qos(MqttQos.AT_LEAST_ONCE) + .retain(this.retained) + .send(); } @Teardown public void closeMqttClient() throws Exception { - if (this.connection != null) { - LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId()); - this.connection.disconnect(); + if (this.client != null) { + String clientId = + client.getConfig().getClientIdentifier().map(Object::toString).orElse(""); + LOG.debug("Disconnecting MQTT connection (client ID {})", clientId); + this.client.disconnect(); } } } } - - /** Create a connected MQTT BlockingConnection from given client, aware of connection timeout. */ - static BlockingConnection createConnection(MQTT client) throws Exception { - FutureConnection futureConnection = client.futureConnection(); - org.fusesource.mqtt.client.Future connecting = futureConnection.connect(); - while (true) { - try { - connecting.await(1, TimeUnit.MINUTES); - } catch (TimeoutException e) { - LOG.warn("Connection to {} pending after waiting for 1 minute", client.getHost()); - continue; - } - break; - } - return new BlockingConnection(futureConnection); - } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 71e24a6859c1..dfadc818a469 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -21,16 +21,24 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; @@ -49,13 +57,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.fusesource.hawtbuf.Buffer; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.Callback; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.Message; -import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; @@ -130,21 +131,21 @@ public void testReadNoClientId() throws Exception { // produce messages on the brokerService in another thread // This thread prevents to block the pipeline waiting for new messages - MQTT client = new MQTT(); - client.setHost("tcp://localhost:" + port); - final BlockingConnection publishConnection = client.blockingConnection(); - publishConnection.connect(); + Mqtt3BlockingClient publishClient = + Mqtt3Client.builder().serverHost("localhost").serverPort(port).buildBlocking(); + publishClient.connect(); Thread publisherThread = new Thread( () -> { try { doConnect(connection -> !connection.getConnectionId().isEmpty()); for (int i = 0; i < 10; i++) { - publishConnection.publish( - topicName, - ("This is test " + i).getBytes(StandardCharsets.UTF_8), - QoS.EXACTLY_ONCE, - false); + publishClient + .publishWith() + .topic(topicName) + .payload(("This is test " + i).getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.EXACTLY_ONCE) + .send(); } } catch (Exception e) { // nothing to do @@ -153,7 +154,7 @@ public void testReadNoClientId() throws Exception { publisherThread.start(); pipeline.run(); - publishConnection.disconnect(); + publishClient.disconnect(); publisherThread.join(); } @@ -181,21 +182,21 @@ public void testRead() throws Exception { // produce messages on the brokerService in another thread // This thread prevents to block the pipeline waiting for new messages - MQTT client = new MQTT(); - client.setHost("tcp://localhost:" + port); - final BlockingConnection publishConnection = client.blockingConnection(); - publishConnection.connect(); + Mqtt3BlockingClient publishClient = + Mqtt3Client.builder().serverHost("localhost").serverPort(port).buildBlocking(); + publishClient.connect(); Thread publisherThread = new Thread( () -> { try { doConnect(connection -> connection.getConnectionId().startsWith("READ_PIPELINE")); for (int i = 0; i < 10; i++) { - publishConnection.publish( - "READ_TOPIC", - ("This is test " + i).getBytes(StandardCharsets.UTF_8), - QoS.EXACTLY_ONCE, - false); + publishClient + .publishWith() + .topic("READ_TOPIC") + .payload(("This is test " + i).getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.EXACTLY_ONCE) + .send(); } } catch (Exception e) { // nothing to do @@ -205,7 +206,7 @@ public void testRead() throws Exception { pipeline.run(); publisherThread.join(); - publishConnection.disconnect(); + publishClient.disconnect(); } @Test(timeout = 60 * 1000) @@ -237,10 +238,9 @@ public void testReadWithMetadata() throws Exception { // produce messages on the brokerService in another thread // This thread prevents to block the pipeline waiting for new messages - MQTT client = new MQTT(); - client.setHost("tcp://localhost:" + port); - final BlockingConnection publishConnection = client.blockingConnection(); - publishConnection.connect(); + Mqtt3BlockingClient publishClient = + Mqtt3Client.builder().serverHost("localhost").serverPort(port).buildBlocking(); + publishClient.connect(); Thread publisherThread = new Thread( () -> { @@ -249,18 +249,20 @@ public void testReadWithMetadata() throws Exception { // Sleep two seconds, to give enough time for client to be ready to accept messages Thread.sleep(2 * 1000); for (int i = 0; i < 5; i++) { - publishConnection.publish( - topic1, - ("This is test " + i).getBytes(StandardCharsets.UTF_8), - QoS.EXACTLY_ONCE, - false); + publishClient + .publishWith() + .topic(topic1) + .payload(("This is test " + i).getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.EXACTLY_ONCE) + .send(); } for (int i = 5; i < 10; i++) { - publishConnection.publish( - topic2, - ("This is test " + i).getBytes(StandardCharsets.UTF_8), - QoS.EXACTLY_ONCE, - false); + publishClient + .publishWith() + .topic(topic2) + .payload(("This is test " + i).getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.EXACTLY_ONCE) + .send(); } } catch (Exception e) { @@ -271,7 +273,7 @@ public void testReadWithMetadata() throws Exception { publisherThread.start(); pipeline.run(); - publishConnection.disconnect(); + publishClient.disconnect(); publisherThread.join(); } @@ -289,34 +291,53 @@ public void testReceiveWithTimeoutAndNoData() throws Exception { pipeline.run(); } - private static class FakeMessage extends Message { + private static class FakeMessage implements Mqtt3Publish { + private int ackCount = 0; - private int ackCount; + @Override + public void acknowledge() { + ackCount++; + } - public FakeMessage() { - super(null, null, null, null); - this.ackCount = 0; + public int getAckCount() { + return ackCount; } @Override - public void ack() { - ++ackCount; + public Optional getPayload() { + return Optional.empty(); } @Override - public void ack(final Callback unused) { - ++ackCount; + public byte[] getPayloadAsBytes() { + return new byte[0]; } - public int getAckCount() { - return ackCount; + @Override + public MqttTopic getTopic() { + return MqttTopic.of("temp"); + } + + @Override + public MqttQos getQos() { + return MqttQos.AT_LEAST_ONCE; + } + + @Override + public boolean isRetain() { + return false; + } + + @Override + public Mqtt3PublishBuilder.Complete extend() { + return null; } } @Test public void testReadCheckpoint() { MqttIO.MqttCheckpointMark.Preparer preparer = new MqttIO.MqttCheckpointMark.Preparer("id"); - ArrayList messages = new ArrayList<>(); + ArrayList messages = new ArrayList<>(); for (int i = 0; i < 5; ++i) { messages.add(new FakeMessage()); } @@ -328,30 +349,35 @@ public void testReadCheckpoint() { preparer.add(messages.get(3), Instant.ofEpochMilli(40)); preparer.add(messages.get(4), Instant.ofEpochMilli(50)); MqttIO.MqttCheckpointMark checkpointB = preparer.newCheckpoint(); - assertTrue( - Arrays.stream(messages.toArray()).allMatch(m -> ((FakeMessage) m).getAckCount() == 0)); + + // Check that ackCount is 0 for all + assertTrue(messages.stream().allMatch(m -> m.getAckCount() == 0)); + checkpointA.finalizeCheckpoint(); // only messages in finalized checkpoint acked - assertTrue( - Arrays.stream(messages.subList(0, 3).toArray()) - .allMatch(m -> ((FakeMessage) m).getAckCount() == 1)); - assertTrue( - Arrays.stream(messages.subList(3, 5).toArray()) - .allMatch(m -> ((FakeMessage) m).getAckCount() == 0)); + for (int i = 0; i < 3; ++i) { + assertEquals(1, messages.get(i).getAckCount()); + } + for (int i = 3; i < 5; ++i) { + assertEquals(0, messages.get(i).getAckCount()); + } + checkpointB.finalizeCheckpoint(); - // all messaged acked once - assertTrue( - Arrays.stream(messages.toArray()).allMatch(m -> ((FakeMessage) m).getAckCount() == 1)); + // all messages acked once + for (int i = 0; i < 5; ++i) { + assertEquals(1, messages.get(i).getAckCount()); + } } @Test public void testWrite() throws Exception { final int numberOfTestMessages = 200; - MQTT client = new MQTT(); - client.setHost("tcp://localhost:" + port); - final BlockingConnection connection = client.blockingConnection(); - connection.connect(); - connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)}); + Mqtt3BlockingClient client = + Mqtt3Client.builder().serverHost("localhost").serverPort(port).buildBlocking(); + client.connect(); + Mqtt3BlockingClient.Mqtt3Publishes publishes = + client.publishes(MqttGlobalPublishFilter.ALL, true); + client.subscribeWith().topicFilter("WRITE_TOPIC").qos(MqttQos.EXACTLY_ONCE).send(); final Set messages = new ConcurrentSkipListSet<>(); @@ -360,9 +386,9 @@ public void testWrite() throws Exception { () -> { try { for (int i = 0; i < numberOfTestMessages; i++) { - Message message = connection.receive(); - messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); - message.ack(); + Mqtt3Publish message = publishes.receive(); + messages.add(new String(message.getPayloadAsBytes(), StandardCharsets.UTF_8)); + message.acknowledge(); } } catch (Exception e) { LOG.error("Can't receive message", e); @@ -384,7 +410,7 @@ public void testWrite() throws Exception { pipeline.run(); subscriber.join(); - connection.disconnect(); + client.disconnect(); assertEquals(numberOfTestMessages, messages.size()); for (int i = 0; i < numberOfTestMessages; i++) { @@ -398,17 +424,15 @@ public void testDynamicWrite() throws Exception { final int numberOfTopic2Count = 100; final int numberOfTestMessages = numberOfTopic1Count + numberOfTopic2Count; - MQTT client = new MQTT(); - client.setHost("tcp://localhost:" + port); - final BlockingConnection connection = client.blockingConnection(); - connection.connect(); + Mqtt3BlockingClient client = + Mqtt3Client.builder().serverHost("localhost").serverPort(port).buildBlocking(); + client.connect(); + Mqtt3BlockingClient.Mqtt3Publishes publishes = + client.publishes(MqttGlobalPublishFilter.ALL, true); final String writeTopic1 = "WRITE_TOPIC_1"; final String writeTopic2 = "WRITE_TOPIC_2"; - connection.subscribe( - new Topic[] { - new Topic(Buffer.utf8(writeTopic1), QoS.EXACTLY_ONCE), - new Topic(Buffer.utf8(writeTopic2), QoS.EXACTLY_ONCE) - }); + client.subscribeWith().topicFilter(writeTopic1).qos(MqttQos.EXACTLY_ONCE).send(); + client.subscribeWith().topicFilter(writeTopic2).qos(MqttQos.EXACTLY_ONCE).send(); final Map> messageMap = new ConcurrentSkipListMap<>(); final Thread subscriber = @@ -416,14 +440,14 @@ public void testDynamicWrite() throws Exception { () -> { try { for (int i = 0; i < numberOfTestMessages; i++) { - Message message = connection.receive(); - List messages = messageMap.get(message.getTopic()); + Mqtt3Publish message = publishes.receive(); + List messages = messageMap.get(message.getTopic().toString()); if (messages == null) { messages = new ArrayList<>(); } - messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); - messageMap.put(message.getTopic(), messages); - message.ack(); + messages.add(new String(message.getPayloadAsBytes(), StandardCharsets.UTF_8)); + messageMap.put(message.getTopic().toString(), messages); + message.acknowledge(); } } catch (Exception e) { LOG.error("Can't receive message", e); @@ -455,7 +479,7 @@ public void testDynamicWrite() throws Exception { pipeline.run(); subscriber.join(); - connection.disconnect(); + client.disconnect(); assertEquals( numberOfTestMessages, messageMap.values().stream().mapToLong(Collection::size).sum());