diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java index 00a9c85a8..ec7cd5169 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java @@ -202,6 +202,22 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() { return mqtt5ClientInternalGetOperationStatistics(getNativeHandle()); } + /** + * Sends a publish acknowledgement packet for a QoS 1 PUBLISH that was previously acquired + * for manual control. + * + *

To use manual publish acknowledgement control, call + * {@link PublishReturn#acquirePublishAcknowledgementControl()} within the + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a + * {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the publish acknowledgement.

+ * + * @param publishAcknowledgementControlHandle An opaque handle obtained from + * {@link PublishReturn#acquirePublishAcknowledgementControl()}. + */ + public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) { + mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId()); + } + /** * Returns the connectivity state for the Mqtt5Client. * @return True if the client is connected, false otherwise @@ -277,4 +293,5 @@ private static native long mqtt5ClientNew( private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback); private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException; private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client); + private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId); } diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java index 899798980..0bc19726c 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java @@ -372,7 +372,15 @@ public interface LifecycleEvents { */ public interface PublishEvents { /** - * Called when an MQTT PUBLISH packet is received by the client + * Called when an MQTT PUBLISH packet is received by the client. + * + *

To take manual control of the publish acknowledgement for a QoS 1 message, call + * {@link PublishReturn#acquirePublishAcknowledgementControl()} within this callback. If you do so, + * the client will NOT automatically send the publish acknowledgement. You are responsible for calling + * {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} later.

+ * + *

If you do not call {@code acquirePublishAcknowledgementControl()} within the callback , + * the client will automatically send the publish acknowledgement after this callback returns.

* * @param client The client that has received the message * @param publishReturn All of the data that was received from the server diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java new file mode 100644 index 000000000..c4c879fd8 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle.java @@ -0,0 +1,37 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.mqtt5; + +/** + * An opaque handle representing manual control over a publish acknowledgement for a received + * PUBLISH packet. + * + *

This class cannot be instantiated directly. Instances are only created by the CRT library.

+ */ +public class Mqtt5PublishAcknowledgementControlHandle { + + private final long controlId; + + /** + * Creates a new Mqtt5PublishAcknowledgementControlHandle. + * + * @param controlId The native publish acknowledgement control ID returned by + * aws_mqtt5_client_acquire_publish_acknowledgement. + */ + Mqtt5PublishAcknowledgementControlHandle(long controlId) { + this.controlId = controlId; + } + + /** + * Returns the native publish acknowledgement control ID, used by + * {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} + * to pass the control ID to native code. + * + * @return The native publish acknowledgement control ID. + */ + long getControlId() { + return controlId; + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java index 4baf2aa4d..b30a75f6d 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java @@ -14,6 +14,22 @@ public class PublishReturn { private PublishPacket publishPacket; + /** + * The already-acquired publish acknowledgement control ID, eagerly acquired by native code + * before the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback is invoked. + * For QoS 0 messages this is 0 (no publish acknowledgement required). + * After {@code acquirePublishAcknowledgementControl()} is called, this is set to 0 to prevent + * double-use. + */ + private long controlId; + + /** + * The threadID of the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. + * This is set at the beginning of the callback and then used to verify + * {@code acquirePublishAcknowledgementControl()} is being called from within. + */ + private long threadID; + /** * Returns the PublishPacket returned from the server or Null if none was returned. * @return The PublishPacket returned from the server. @@ -22,12 +38,50 @@ public PublishPacket getPublishPacket() { return publishPacket; } + /** + * Acquires manual control over the publish acknowledgement for this PUBLISH message, + * preventing the client from automatically sending an acknowledgement. The returned handle can be + * passed to {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} + * at a later time to send the publish acknowledgement to the broker. + * + *

Important: This method must be called within the + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it outside the + * callback (wrong thread) or after it has already been called will return {@code null}.

+ * + *

This method may only be called once per received PUBLISH. Subsequent calls will return + * {@code null}.

+ * + *

If this method is not called, the client will automatically send a publish acknowledgment + * for QoS 1 messages when the callback returns.

+ * + * @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send + * the acknowledgement, or {@code null} if called outside the callback, called more than + * once, or called on a QoS 0 message. + */ + public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() { + if (controlId == 0 || threadID != Thread.currentThread().getId()) { + return null; + } + long acquiredControlId = controlId; + /* Zero out so it can't be double-called */ + controlId = 0; + return new Mqtt5PublishAcknowledgementControlHandle(acquiredControlId); + } + /** * This is only called in JNI to make a new PublishReturn with a PUBLISH packet. - * @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1. - * @return A newly created PublishResult + * The controlId is eagerly acquired by native code prior to + * {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} being called. + * The threadID is set to the calling thread and is used when + * {@link #acquirePublishAcknowledgementControl()} is called to guarantee the result + * is accurate and enforces the requirement of calling it from within the callback. + * + * @param newPublishPacket The PublishPacket data received from the server. + * @param controlId The pre-acquired publish acknowledgement control ID (0 for QoS 0 messages). */ - private PublishReturn(PublishPacket newPublishPacket) { + private PublishReturn(PublishPacket newPublishPacket, long controlId) { this.publishPacket = newPublishPacket; + this.controlId = controlId; + this.threadID = Thread.currentThread().getId(); } } diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 92322c497..0f818e1e2 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -1563,8 +1563,13 @@ { "name": "", "parameterTypes": [ - "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket" + "software.amazon.awssdk.crt.mqtt5.packets.PublishPacket", + "long" ] + }, + { + "name": "acquirePublishAcknowledgementControl", + "parameterTypes": [] } ] }, diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index e555cad20..71be0085b 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -2262,8 +2262,20 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) { env, mqtt5_publish_return_properties.return_class, "", - "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V"); + "(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;J)V"); AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id); + /* + * acquirePublishAcknowledgementControl() called by native code after onMessageReceived returns + * to check whether the user took manual control during the callback. Returns a non-null handle + * if control was not yet acquired (native then auto-invokes the PUBACK), or null if the user + * already called it during the callback (user is responsible for invoking the PUBACK). + */ + mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id = (*env)->GetMethodID( + env, + mqtt5_publish_return_properties.return_class, + "acquirePublishAcknowledgementControl", + "()Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle;"); + AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id); } struct java_aws_mqtt5_on_stopped_return_properties mqtt5_on_stopped_return_properties; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 6a44aad42..f613e1c6b 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -928,7 +928,10 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop /* mqtt5.PublishReturn */ struct java_aws_mqtt5_publish_return_properties { jclass return_class; - jmethodID return_constructor_id; + jmethodID return_constructor_id; /* (PublishPacket, long) */ + jmethodID + return_acquire_publish_acknowledgement_control_id; /* ()Lsoftware/.../Mqtt5PublishAcknowledgementControlHandle; + */ }; extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties; diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c index 3720a90e3..f5018a1b8 100644 --- a/src/native/mqtt5_client.c +++ b/src/native/mqtt5_client.c @@ -578,12 +578,33 @@ static void s_aws_mqtt5_client_java_publish_received( goto clean_up; } - /* Make the PublishReturn struct that will hold all of the data that is passed to Java */ + /* + * For QoS 1 messages, eagerly acquire the publish acknowledgement control before invoking + * the Java callback. The control ID is passed directly as a long to the PublishReturn + * constructor. + * + * After the callback returns, we call wasControlAcquired() on the PublishReturn object to + * determine whether the user called acquirePublishAcknowledgementControl() during the callback: + * - If wasControlAcquired() returns true, the user took manual control and is responsible + * for calling invokePublishAcknowledgement() later. + * - If wasControlAcquired() returns false (or an exception occurred), the user did NOT take + * control, so we auto-invoke the PUBACK to avoid losing it. + */ + uint64_t control_id = 0; + if (publish->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { + /* Eagerly acquire the PUBACK control before invoking the Java callback */ + control_id = aws_mqtt5_client_acquire_publish_acknowledgement(java_client->client, publish); + } + + /* Make the PublishReturn struct that will hold all of the data that is passed to Java. + * The constructor takes (PublishPacket, long) where the long is the pre-acquired control ID + * (0 for QoS 0 messages). */ publish_packet_return_data = (*env)->NewObject( env, mqtt5_publish_return_properties.return_class, mqtt5_publish_return_properties.return_constructor_id, - publish_packet_data); + publish_packet_data, + (jlong)control_id); aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ if (java_client->jni_publish_events) { @@ -595,6 +616,28 @@ static void s_aws_mqtt5_client_java_publish_received( publish_packet_return_data); aws_jni_check_and_clear_exception(env); /* To hide JNI warning */ } + + /* + * After the callback returns, call acquirePublishAcknowledgementControl() on the PublishReturn. + * - If the user already called it during the callback, controlId was zeroed out and the + * method returns null. The user is responsible for invoking the PUBACK. + * - If the user did NOT call it, controlId is still non-zero and we get a non-null handle + * back. We then auto-invoke the PUBACK to avoid losing it. + */ + if (control_id != 0 && publish_packet_return_data != NULL) { + jobject handle = (*env)->CallObjectMethod( + env, + publish_packet_return_data, + mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id); + aws_jni_check_and_clear_exception(env); + + if (handle != NULL) { + /* User did NOT call acquirePublishAcknowledgementControl() during the callback; + * auto-invoke the PUBACK. */ + aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, control_id, NULL); + } + } + goto clean_up; clean_up: @@ -2190,6 +2233,45 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl } } +/******************************************************************************* + * Manual Publish Acknowledgement Control Functions + ******************************************************************************/ + +/** + * Called from Mqtt5Client.mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId). + * Calls aws_mqtt5_client_invoke_publish_acknowledgement to send the publish acknowledgement (PUBACK) + * for a previously acquired manual publish acknowledgement control handle. + */ +JNIEXPORT void JNICALL + Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePublishAcknowledgement( + JNIEnv *env, + jclass jni_class, + jlong jni_client, + jlong control_id) { + (void)jni_class; + aws_cache_jni_ids(env); + + struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client; + if (!java_client) { + s_aws_mqtt5_client_log_and_throw_exception( + env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT); + return; + } + if (!java_client->client) { + s_aws_mqtt5_client_log_and_throw_exception( + env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT); + return; + } + + int result = aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, (uint64_t)control_id, NULL); + if (result != AWS_OP_SUCCESS) { + s_aws_mqtt5_client_log_and_throw_exception( + env, + "Mqtt5Client.invokePublishAcknowledgement: aws_mqtt5_client_invoke_publish_acknowledgement failed!", + aws_last_error()); + } +} + #if UINTPTR_MAX == 0xffffffff # if defined(_MSC_VER) # pragma warning(pop) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index dfc58c431..6484ede66 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -3,6 +3,7 @@ import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -2623,6 +2624,382 @@ public void QoS1_UC1() throws Exception { CrtResource.waitForNoResources(); } + /** + * ============================================================ + * Manual Publish Acknowledgement Tests + * ============================================================ + */ + + private void doManualPublishAcknowledgement_HoldTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_ManualPuback_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + final long PUBACK_HOLD_TIMEOUT_SEC = 60L; + + CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); + CompletableFuture redeliveryFuture = new CompletableFuture<>(); + Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID); + builder.withConnectOptions(connectOptions.build()); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); + if (!firstDeliveryFuture.isDone()) { + // First delivery: acquire manual publish acknowledgement control to hold the PUBACK + publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl(); + firstDeliveryFuture.complete(receivedPayload); + } else if (!redeliveryFuture.isDone()) { + // Second delivery: broker re-sent because no PUBACK was received + redeliveryFuture.complete(receivedPayload); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe to the topic with QoS 1 + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish a QoS 1 message with a unique UUID payload + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the first delivery and confirm PUBACK was held + byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); + assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]); + + // Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent) + byte[] redeliveredPayload = redeliveryFuture.get(PUBACK_HOLD_TIMEOUT_SEC, TimeUnit.SECONDS); + assertTrue("Re-delivered payload should match the original UUID payload", + java.util.Arrays.equals(redeliveredPayload, payload)); + + // Release the held PUBACK now that we've confirmed re-delivery + client.invokePublishAcknowledgement(publishAckHandleHolder[0]); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual publish acknowledgement hold test: hold PUBACK and verify broker re-delivers the message */ + @Test + public void ManualPuback_Hold() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_HoldTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPublishAcknowledgement_InvokeTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + final long NO_REDELIVERY_WAIT_SEC = 60L; + + CompletableFuture firstDeliveryFuture = new CompletableFuture<>(); + CompletableFuture unexpectedRedeliveryFuture = new CompletableFuture<>(); + Mqtt5PublishAcknowledgementControlHandle[] publishAckHandleHolder = new Mqtt5PublishAcknowledgementControlHandle[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withClientId("test/MQTT5_ManualPublishAcknowledgement_Java_" + testUUID); + builder.withConnectOptions(connectOptions.build()); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + byte[] receivedPayload = publishReturn.getPublishPacket().getPayload(); + if (!firstDeliveryFuture.isDone()) { + // First delivery: acquire manual publish acknowledgement control, then immediately invoke it + publishAckHandleHolder[0] = publishReturn.acquirePublishAcknowledgementControl(); + firstDeliveryFuture.complete(receivedPayload); + } else if (java.util.Arrays.equals(receivedPayload, payload) && !unexpectedRedeliveryFuture.isDone()) { + // A second delivery of the same payload means the broker re-sent. This should NOT happen + unexpectedRedeliveryFuture.complete(receivedPayload); + } + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe to the topic with QoS 1 + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish a QoS 1 message with a unique UUID payload + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the first delivery and confirm publish acknowledgement handle was acquired + byte[] firstPayload = firstDeliveryFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertTrue("First delivery payload should match", java.util.Arrays.equals(firstPayload, payload)); + assertNotNull("acquirePublishAcknowledgementControl() should have returned a handle", publishAckHandleHolder[0]); + + // Immediately invoke the publish acknowledgement using the acquired handle + client.invokePublishAcknowledgement(publishAckHandleHolder[0]); + + // Wait 60 seconds and confirm the broker does NOT re-deliver the message + boolean redelivered = false; + try { + unexpectedRedeliveryFuture.get(NO_REDELIVERY_WAIT_SEC, TimeUnit.SECONDS); + redelivered = true; + } catch (java.util.concurrent.TimeoutException ex) { + redelivered = false; + } + assertTrue("Broker should NOT re-deliver the message after invokePublishAcknowledgement() was called", + !redelivered); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual publish acknowledgement invoke test: acquire and immediately invoke, verify no re-delivery */ + @Test + public void ManualPuback_Invoke() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_InvokeTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPublishAcknowledgement_AcquireDoubleCallReturnsNullTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture resultFuture = new CompletableFuture<>(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + // First call should succeed and return a non-null handle + Mqtt5PublishAcknowledgementControlHandle handle = publishReturn.acquirePublishAcknowledgementControl(); + if (handle == null) { + resultFuture.complete("first_call_returned_null"); + return; + } + // Second call on the same message should return null (controlId was zeroed out) + Mqtt5PublishAcknowledgementControlHandle handle2 = publishReturn.acquirePublishAcknowledgementControl(); + if (handle2 == null) { + resultFuture.complete("double_call_returned_null"); + } else { + resultFuture.complete("double_call_returned_non_null"); + } + // handle is valid but we don't invoke it here; let it be GC'd + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + String result = resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertEquals("Expected null on double-call, got: " + result, + "double_call_returned_null", result); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual publish acknowledgement double-call test: calling acquirePublishAcknowledgementControl() twice returns null on the second call */ + @Test + public void ManualPuback_AcquireDoubleCallRaises() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquireDoubleCallReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPublishAcknowledgement_AcquirePostCallbackReturnsNullTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture callbackDoneFuture = new CompletableFuture<>(); + PublishReturn[] savedPublishReturnHolder = new PublishReturn[1]; + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + // Save the PublishReturn but do NOT call acquirePublishAcknowledgementControl() within the callback. + // Native code will call it after the callback returns (auto-invoking the PUBACK), zeroing controlId. + savedPublishReturnHolder[0] = publishReturn; + callbackDoneFuture.complete(null); + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_LEAST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Wait for the callback to complete + callbackDoneFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Call acquirePublishAcknowledgementControl() after the callback has returned. + // Native code already called it (zeroing controlId), so this returns null due to wrong thread or controlID being zero. + assertNotNull("savedPublishReturn should have been set", savedPublishReturnHolder[0]); + Mqtt5PublishAcknowledgementControlHandle handle = + savedPublishReturnHolder[0].acquirePublishAcknowledgementControl(); + assertNull("acquirePublishAcknowledgementControl() should return null after callback returns", + handle); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual publish acknowledgement post-callback test: calling acquirePublishAcknowledgementControl() after callback returns returns null */ + @Test + public void ManualPuback_AcquirePostCallbackReturnsNull() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPublishAcknowledgement_AcquirePostCallbackReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + + private void doManualPuback_Qos0AcquireReturnsNullTest() { + try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions)) { + + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5_Binding_Java_" + testUUID; + byte[] payload = testUUID.getBytes(); + + CompletableFuture resultFuture = new CompletableFuture<>(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + + builder.withPublishEvents(new Mqtt5ClientOptions.PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + // For QoS 0, controlId is 0, so acquirePublishAcknowledgementControl() returns null. + Mqtt5PublishAcknowledgementControlHandle handle = + publishReturn.acquirePublishAcknowledgementControl(); + resultFuture.complete(handle); + } + }); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0) + SubscribePacketBuilder subscribeBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE); + client.subscribe(subscribeBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + // Publish at QoS 0, there is no PUBACK involved + PublishPacketBuilder publishBuilder = new PublishPacketBuilder(testTopic, QOS.AT_MOST_ONCE, payload); + client.publish(publishBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + Mqtt5PublishAcknowledgementControlHandle handle = + resultFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + assertNull("acquirePublishAcknowledgementControl() should return null for QoS 0 messages", + handle); + + client.stop(); + events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /* Manual publish acknowledgement QoS 0 test: acquirePublishAcknowledgementControl() returns null for QoS 0 messages */ + @Test + public void ManualPuback_Qos0AcquireReturnsNull() throws Exception { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + + TestUtils.doRetryableTest(this::doManualPuback_Qos0AcquireReturnsNullTest, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS); + + CrtResource.waitForNoResources(); + } + /** * ============================================================ * Retain Tests