diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
index 00a9c85a8..ec7cd5169 100644
--- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
@@ -202,6 +202,22 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
}
+ /**
+ * Sends a publish acknowledgement packet for a QoS 1 PUBLISH that was previously acquired
+ * for manual control.
+ *
+ *
To use manual publish acknowledgement control, call
+ * {@link PublishReturn#acquirePublishAcknowledgementControl()} within the
+ * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a
+ * {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the publish acknowledgement.
+ *
+ * @param publishAcknowledgementControlHandle An opaque handle obtained from
+ * {@link PublishReturn#acquirePublishAcknowledgementControl()}.
+ */
+ public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) {
+ mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId());
+ }
+
/**
* Returns the connectivity state for the Mqtt5Client.
* @return True if the client is connected, false otherwise
@@ -277,4 +293,5 @@ private static native long mqtt5ClientNew(
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback);
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
+ private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId);
}
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java
index 899798980..0bc19726c 100644
--- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java
@@ -372,7 +372,15 @@ public interface LifecycleEvents {
*/
public interface PublishEvents {
/**
- * Called when an MQTT PUBLISH packet is received by the client
+ * Called when an MQTT PUBLISH packet is received by the client.
+ *
+ * To take manual control of the publish acknowledgement for a QoS 1 message, call
+ * {@link PublishReturn#acquirePublishAcknowledgementControl()} within this callback. If you do so,
+ * the client will NOT automatically send the publish acknowledgement. You are responsible for calling
+ * {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} later.
+ *
+ * If you do not call {@code acquirePublishAcknowledgementControl()} within the callback ,
+ * the client will automatically send the publish acknowledgement after this callback returns.
*
* @param client The client that has received the message
* @param publishReturn All of the data that was received from the server
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java
new file mode 100644
index 000000000..c4c879fd8
--- /dev/null
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+package software.amazon.awssdk.crt.mqtt5;
+
+/**
+ * An opaque handle representing manual control over a publish acknowledgement for a received
+ * PUBLISH packet.
+ *
+ * This class cannot be instantiated directly. Instances are only created by the CRT library.
+ */
+public class Mqtt5PublishAcknowledgementControlHandle {
+
+ private final long controlId;
+
+ /**
+ * Creates a new Mqtt5PublishAcknowledgementControlHandle.
+ *
+ * @param controlId The native publish acknowledgement control ID returned by
+ * aws_mqtt5_client_acquire_publish_acknowledgement.
+ */
+ Mqtt5PublishAcknowledgementControlHandle(long controlId) {
+ this.controlId = controlId;
+ }
+
+ /**
+ * Returns the native publish acknowledgement control ID, used by
+ * {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
+ * to pass the control ID to native code.
+ *
+ * @return The native publish acknowledgement control ID.
+ */
+ long getControlId() {
+ return controlId;
+ }
+}
diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
index 4baf2aa4d..b30a75f6d 100644
--- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
+++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
@@ -14,6 +14,22 @@
public class PublishReturn {
private PublishPacket publishPacket;
+ /**
+ * The already-acquired publish acknowledgement control ID, eagerly acquired by native code
+ * before the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback is invoked.
+ * For QoS 0 messages this is 0 (no publish acknowledgement required).
+ * After {@code acquirePublishAcknowledgementControl()} is called, this is set to 0 to prevent
+ * double-use.
+ */
+ private long controlId;
+
+ /**
+ * The threadID of the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
+ * This is set at the beginning of the callback and then used to verify
+ * {@code acquirePublishAcknowledgementControl()} is being called from within.
+ */
+ private long threadID;
+
/**
* Returns the PublishPacket returned from the server or Null if none was returned.
* @return The PublishPacket returned from the server.
@@ -22,12 +38,50 @@ public PublishPacket getPublishPacket() {
return publishPacket;
}
+ /**
+ * Acquires manual control over the publish acknowledgement for this PUBLISH message,
+ * preventing the client from automatically sending an acknowledgement. The returned handle can be
+ * passed to {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
+ * at a later time to send the publish acknowledgement to the broker.
+ *
+ * Important: This method must be called within the
+ * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it outside the
+ * callback (wrong thread) or after it has already been called will return {@code null}.
+ *
+ * This method may only be called once per received PUBLISH. Subsequent calls will return
+ * {@code null}.
+ *
+ * If this method is not called, the client will automatically send a publish acknowledgment
+ * for QoS 1 messages when the callback returns.
+ *
+ * @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send
+ * the acknowledgement, or {@code null} if called outside the callback, called more than
+ * once, or called on a QoS 0 message.
+ */
+ public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() {
+ if (controlId == 0 || threadID != Thread.currentThread().getId()) {
+ return null;
+ }
+ long acquiredControlId = controlId;
+ /* Zero out so it can't be double-called */
+ controlId = 0;
+ return new Mqtt5PublishAcknowledgementControlHandle(acquiredControlId);
+ }
+
/**
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
- * @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1.
- * @return A newly created PublishResult
+ * The controlId is eagerly acquired by native code prior to
+ * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} being called.
+ * The threadID is set to the calling thread and is used when
+ * {@link #acquirePublishAcknowledgementControl()} is called to guarantee the result
+ * is accurate and enforces the requirement of calling it from within the callback.
+ *
+ * @param newPublishPacket The PublishPacket data received from the server.
+ * @param controlId The pre-acquired publish acknowledgement control ID (0 for QoS 0 messages).
*/
- private PublishReturn(PublishPacket newPublishPacket) {
+ private PublishReturn(PublishPacket newPublishPacket, long controlId) {
this.publishPacket = newPublishPacket;
+ this.controlId = controlId;
+ this.threadID = Thread.currentThread().getId();
}
}
diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
index 92322c497..0f818e1e2 100644
--- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
+++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
@@ -1563,8 +1563,13 @@
{
"name": "",
"parameterTypes": [
- "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket"
+ "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket",
+ "long"
]
+ },
+ {
+ "name": "acquirePublishAcknowledgementControl",
+ "parameterTypes": []
}
]
},
diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c
index e555cad20..71be0085b 100644
--- a/src/native/java_class_ids.c
+++ b/src/native/java_class_ids.c
@@ -2262,8 +2262,20 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) {
env,
mqtt5_publish_return_properties.return_class,
"",
- "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V");
+ "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;J)V");
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id);
+ /*
+ * acquirePublishAcknowledgementControl() called by native code after onMessageReceived returns
+ * to check whether the user took manual control during the callback. Returns a non-null handle
+ * if control was not yet acquired (native then auto-invokes the PUBACK), or null if the user
+ * already called it during the callback (user is responsible for invoking the PUBACK).
+ */
+ mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id = (*env)->GetMethodID(
+ env,
+ mqtt5_publish_return_properties.return_class,
+ "acquirePublishAcknowledgementControl",
+ "()Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle;");
+ AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
}
struct java_aws_mqtt5_on_stopped_return_properties mqtt5_on_stopped_return_properties;
diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h
index 6a44aad42..f613e1c6b 100644
--- a/src/native/java_class_ids.h
+++ b/src/native/java_class_ids.h
@@ -928,7 +928,10 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop
/* mqtt5.PublishReturn */
struct java_aws_mqtt5_publish_return_properties {
jclass return_class;
- jmethodID return_constructor_id;
+ jmethodID return_constructor_id; /* (PublishPacket, long) */
+ jmethodID
+ return_acquire_publish_acknowledgement_control_id; /* ()Lsoftware/.../Mqtt5PublishAcknowledgementControlHandle;
+ */
};
extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties;
diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c
index 3720a90e3..f5018a1b8 100644
--- a/src/native/mqtt5_client.c
+++ b/src/native/mqtt5_client.c
@@ -578,12 +578,33 @@ static void s_aws_mqtt5_client_java_publish_received(
goto clean_up;
}
- /* Make the PublishReturn struct that will hold all of the data that is passed to Java */
+ /*
+ * For QoS 1 messages, eagerly acquire the publish acknowledgement control before invoking
+ * the Java callback. The control ID is passed directly as a long to the PublishReturn
+ * constructor.
+ *
+ * After the callback returns, we call wasControlAcquired() on the PublishReturn object to
+ * determine whether the user called acquirePublishAcknowledgementControl() during the callback:
+ * - If wasControlAcquired() returns true, the user took manual control and is responsible
+ * for calling invokePublishAcknowledgement() later.
+ * - If wasControlAcquired() returns false (or an exception occurred), the user did NOT take
+ * control, so we auto-invoke the PUBACK to avoid losing it.
+ */
+ uint64_t control_id = 0;
+ if (publish->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) {
+ /* Eagerly acquire the PUBACK control before invoking the Java callback */
+ control_id = aws_mqtt5_client_acquire_publish_acknowledgement(java_client->client, publish);
+ }
+
+ /* Make the PublishReturn struct that will hold all of the data that is passed to Java.
+ * The constructor takes (PublishPacket, long) where the long is the pre-acquired control ID
+ * (0 for QoS 0 messages). */
publish_packet_return_data = (*env)->NewObject(
env,
mqtt5_publish_return_properties.return_class,
mqtt5_publish_return_properties.return_constructor_id,
- publish_packet_data);
+ publish_packet_data,
+ (jlong)control_id);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
if (java_client->jni_publish_events) {
@@ -595,6 +616,28 @@ static void s_aws_mqtt5_client_java_publish_received(
publish_packet_return_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
}
+
+ /*
+ * After the callback returns, call acquirePublishAcknowledgementControl() on the PublishReturn.
+ * - If the user already called it during the callback, controlId was zeroed out and the
+ * method returns null. The user is responsible for invoking the PUBACK.
+ * - If the user did NOT call it, controlId is still non-zero and we get a non-null handle
+ * back. We then auto-invoke the PUBACK to avoid losing it.
+ */
+ if (control_id != 0 && publish_packet_return_data != NULL) {
+ jobject handle = (*env)->CallObjectMethod(
+ env,
+ publish_packet_return_data,
+ mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
+ aws_jni_check_and_clear_exception(env);
+
+ if (handle != NULL) {
+ /* User did NOT call acquirePublishAcknowledgementControl() during the callback;
+ * auto-invoke the PUBACK. */
+ aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, control_id, NULL);
+ }
+ }
+
goto clean_up;
clean_up:
@@ -2190,6 +2233,45 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl
}
}
+/*******************************************************************************
+ * Manual Publish Acknowledgement Control Functions
+ ******************************************************************************/
+
+/**
+ * Called from Mqtt5Client.mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId).
+ * Calls aws_mqtt5_client_invoke_publish_acknowledgement to send the publish acknowledgement (PUBACK)
+ * for a previously acquired manual publish acknowledgement control handle.
+ */
+JNIEXPORT void JNICALL
+ Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePublishAcknowledgement(
+ JNIEnv *env,
+ jclass jni_class,
+ jlong jni_client,
+ jlong control_id) {
+ (void)jni_class;
+ aws_cache_jni_ids(env);
+
+ struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
+ if (!java_client) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
+ return;
+ }
+ if (!java_client->client) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
+ return;
+ }
+
+ int result = aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, (uint64_t)control_id, NULL);
+ if (result != AWS_OP_SUCCESS) {
+ s_aws_mqtt5_client_log_and_throw_exception(
+ env,
+ "Mqtt5Client.invokePublishAcknowledgement: aws_mqtt5_client_invoke_publish_acknowledgement failed!",
+ aws_last_error());
+ }
+}
+
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
index dfc58c431..6484ede66 100644
--- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
+++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
@@ -3,6 +3,7 @@
import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -2623,6 +2624,382 @@ public void QoS1_UC1() throws Exception {
CrtResource.waitForNoResources();
}
+ /**
+ * ============================================================
+ * Manual Publish Acknowledgement Tests
+ * ============================================================
+ */
+
+ private void doManualPublishAcknowledgement_HoldTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ final long PUBACK_HOLD_TIMEOUT_SEC = 60L;
+
+ CompletableFuture firstDeliveryFuture = new CompletableFuture<>();
+ CompletableFuture redeliveryFuture = new CompletableFuture<>();
+ Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ ConnectPacketBuilder connectOptions = new ConnectPacketBuilder();
+ connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID);
+ builder.withConnectOptions(connectOptions.build());
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ byte[] receivedPayload = publishReturn.getPublishPacket().getPayload();
+ if (!firstDeliveryFuture.isDone()) {
+ // First delivery: acquire manual publish acknowledgement control to hold the PUBACK
+ publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl();
+ firstDeliveryFuture.complete(receivedPayload);
+ } else if (!redeliveryFuture.isDone()) {
+ // Second delivery: broker re-sent because no PUBACK was received
+ redeliveryFuture.complete(receivedPayload);
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe to the topic with QoS 1
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish a QoS 1 message with a unique UUID payload
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the first delivery and confirm PUBACK was held
+ byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload));
+ assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]);
+
+ // Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent)
+ byte[] redeliveredPayload = redeliveryFuture.get(PUBACK_HOLD_TIMEOUT_SEC, TimeUnit.SECONDS);
+ assertTrue("Re-delivered payload should match the original UUID payload",
+ java.util.Arrays.equals(redeliveredPayload, payload));
+
+ // Release the held PUBACK now that we've confirmed re-delivery
+ client.invokePublishAcknowledgement(publishAckHandleHolder[0]);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual publish acknowledgement hold test: hold PUBACK and verify broker re-delivers the message */
+ @Test
+ public void ManualPuback_Hold() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPublishAcknowledgement_InvokeTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ final long NO_REDELIVERY_WAIT_SEC = 60L;
+
+ CompletableFuture firstDeliveryFuture = new CompletableFuture<>();
+ CompletableFuture unexpectedRedeliveryFuture = new CompletableFuture<>();
+ Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ ConnectPacketBuilder connectOptions = new ConnectPacketBuilder();
+ connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID);
+ builder.withConnectOptions(connectOptions.build());
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ byte[] receivedPayload = publishReturn.getPublishPacket().getPayload();
+ if (!firstDeliveryFuture.isDone()) {
+ // First delivery: acquire manual publish acknowledgement control, then immediately invoke it
+ publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl();
+ firstDeliveryFuture.complete(receivedPayload);
+ } else if (java.util.Arrays.equals(receivedPayload, payload) && !unexpectedRedeliveryFuture.isDone()) {
+ // A second delivery of the same payload means the broker re-sent. This should NOT happen
+ unexpectedRedeliveryFuture.complete(receivedPayload);
+ }
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe to the topic with QoS 1
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish a QoS 1 message with a unique UUID payload
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the first delivery and confirm publish acknowledgement handle was acquired
+ byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload));
+ assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]);
+
+ // Immediately invoke the publish acknowledgement using the acquired handle
+ client.invokePublishAcknowledgement(publishAckHandleHolder[0]);
+
+ // Wait 60 seconds and confirm the broker does NOT re-deliver the message
+ boolean redelivered = false;
+ try {
+ unexpectedRedeliveryFuture.get(NO_REDELIVERY_WAIT_SEC, TimeUnit.SECONDS);
+ redelivered = true;
+ } catch (java.util.concurrent.TimeoutException ex) {
+ redelivered = false;
+ }
+ assertTrue("Broker should NOT re-deliver the message after invokePublishAcknowledgement() was called",
+ !redelivered);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual publish acknowledgement invoke test: acquire and immediately invoke, verify no re-delivery */
+ @Test
+ public void ManualPuback_Invoke() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPublishAcknowledgement_AcquireDoubleCallReturnsNullTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture resultFuture = new CompletableFuture<>();
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ // First call should succeed and return a non-null handle
+ Mqtt5PublishAcknowledgementControlHandle handle = publishReturn.acquirePublishAcknowledgementControl();
+ if (handle == null) {
+ resultFuture.complete("first_call_returned_null");
+ return;
+ }
+ // Second call on the same message should return null (controlId was zeroed out)
+ Mqtt5PublishAcknowledgementControlHandle handle2 = publishReturn.acquirePublishAcknowledgementControl();
+ if (handle2 == null) {
+ resultFuture.complete("double_call_returned_null");
+ } else {
+ resultFuture.complete("double_call_returned_non_null");
+ }
+ // handle is valid but we don't invoke it here; let it be GC'd
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertEquals("Expected null on double-call, got: " + result,
+ "double_call_returned_null", result);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual publish acknowledgement double-call test: calling acquirePublishAcknowledgementControl() twice returns null on the second call */
+ @Test
+ public void ManualPuback_AcquireDoubleCallRaises() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquireDoubleCallReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPublishAcknowledgement_AcquirePostCallbackReturnsNullTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture callbackDoneFuture = new CompletableFuture<>();
+ PublishReturn[] savedPublishReturnHolder = new PublishReturn[1];
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ // Save the PublishReturn but do NOT call acquirePublishAcknowledgementControl() within the callback.
+ // Native code will call it after the callback returns (auto-invoking the PUBACK), zeroing controlId.
+ savedPublishReturnHolder[0] = publishReturn;
+ callbackDoneFuture.complete(null);
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Wait for the callback to complete
+ callbackDoneFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Call acquirePublishAcknowledgementControl() after the callback has returned.
+ // Native code already called it (zeroing controlId), so this returns null due to wrong thread or controlID being zero.
+ assertNotNull("savedPublishReturn should have been set", savedPublishReturnHolder[0]);
+ Mqtt5PublishAcknowledgementControlHandle handle =
+ savedPublishReturnHolder[0].acquirePublishAcknowledgementControl();
+ assertNull("acquirePublishAcknowledgementControl() should return null after callback returns",
+ handle);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual publish acknowledgement post-callback test: calling acquirePublishAcknowledgementControl() after callback returns returns null */
+ @Test
+ public void ManualPuback_AcquirePostCallbackReturnsNull() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquirePostCallbackReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
+ private void doManualPuback_Qos0AcquireReturnsNullTest() {
+ try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
+ AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+ TlsContext tlsContext = new TlsContext(tlsOptions)) {
+
+ String testUUID = UUID.randomUUID().toString();
+ String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
+ byte[] payload = testUUID.getBytes();
+
+ CompletableFuture resultFuture = new CompletableFuture<>();
+
+ Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
+ LifecycleEvents_Futured events = new LifecycleEvents_Futured();
+ builder.withLifecycleEvents(events);
+ builder.withTlsContext(tlsContext);
+
+ builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() {
+ @Override
+ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
+ // For QoS 0, controlId is 0, so acquirePublishAcknowledgementControl() returns null.
+ Mqtt5PublishAcknowledgementControlHandle handle =
+ publishReturn.acquirePublishAcknowledgementControl();
+ resultFuture.complete(handle);
+ }
+ });
+
+ try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
+ client.start();
+ events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0)
+ SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
+ client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ // Publish at QoS 0, there is no PUBACK involved
+ PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_MOST_ONCE, payload);
+ client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+
+ Mqtt5PublishAcknowledgementControlHandle handle =
+ resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ assertNull("acquirePublishAcknowledgementControl() should return null for QoS 0 messages",
+ handle);
+
+ client.stop();
+ events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /* Manual publish acknowledgement QoS 0 test: acquirePublishAcknowledgementControl() returns null for QoS 0 messages */
+ @Test
+ public void ManualPuback_Qos0AcquireReturnsNull() throws Exception {
+ skipIfNetworkUnavailable();
+ Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
+
+ TestUtils.doRetryableTest(this::doManualPuback_Qos0AcquireReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
+
+ CrtResource.waitForNoResources();
+ }
+
/**
* ============================================================
* Retain Tests