Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cadd516
bind manual puback
sbSteveK Mar 4, 2026
cc10600
GraalVM uses ahead of time compilation requiring the jni config to be…
sbSteveK Mar 4, 2026
0452994
Merge branch 'main' into manual-puback
sbSteveK Mar 5, 2026
45286e2
merge with main
sbSteveK Mar 5, 2026
f02d9d2
Merge branch 'main' into manual-puback
sbSteveK Mar 9, 2026
c02496e
Merge branch 'main' into manual-puback
sbSteveK Mar 10, 2026
9709faa
point to new version of aws-c-mqtt
sbSteveK Mar 12, 2026
23a8573
aws-c-auth v0.10.0
sbSteveK Mar 12, 2026
a164681
aws-c-event-stream v0.6.0
sbSteveK Mar 12, 2026
13d973e
aws-c-http v0.10.11
sbSteveK Mar 12, 2026
bcb8ecc
aws-lc e50a5f29ee416a7c99be4e72957e8f96aa51dbb9
sbSteveK Mar 12, 2026
42dfa69
s2n v1.7.0
sbSteveK Mar 12, 2026
f66b941
add manual puback test suite
sbSteveK Mar 13, 2026
f3484fe
forgot to set the acquire puback to 0 for qos 0 publishes
sbSteveK Mar 13, 2026
4875250
puback to publish acknowledgement. aws-c-mqtt -> v0.15.2
sbSteveK Mar 18, 2026
40d5b55
Implement eager control acquisition pattern. Use a bool function to c…
sbSteveK Mar 24, 2026
d9e8a1e
Merge branch 'main' into manual-puback
sbSteveK Mar 24, 2026
f03c1d6
Don't allow acquire after the callback returns. Add more logging
sbSteveK Mar 24, 2026
ed1af55
update docs, make jni/native functions private
sbSteveK Mar 24, 2026
a7d5acc
more doc changes and graalvm fix for jni-config
sbSteveK Mar 24, 2026
ff52d41
no need to clear an exception on our internal functions
sbSteveK Mar 24, 2026
33bf003
remove throws from invoke publish acknowledgement
sbSteveK Mar 25, 2026
168911a
Merge branch 'main' into manual-puback
sbSteveK Mar 30, 2026
61dde73
pr comment changes and add synchronized to wasControlAcquired()
sbSteveK Apr 8, 2026
0a1eceb
Merge branch 'main' into manual-puback
sbSteveK Apr 8, 2026
6ba7534
set and check thread id on publish acquisition
sbSteveK Apr 8, 2026
ccfdf45
updated messaging to explain what threadID is doing.
sbSteveK Apr 8, 2026
9e2fe2b
simplify logic related to whether a manual acknowledgement was taken.…
sbSteveK Apr 10, 2026
507c2f2
change test double call raises to double call returns null
sbSteveK Apr 10, 2026
372c523
other throws tests need to be changed to returns null
sbSteveK Apr 10, 2026
f268233
CLAAAAAAANG FORRRAMMMAAAAAT
sbSteveK Apr 10, 2026
edc7f7b
forgo to update jni-config for grall vm
sbSteveK Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,22 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
}

/**
* Sends a publish acknowledgement packet for a QoS 1 PUBLISH that was previously acquired
* for manual control.
*
* <p>To use manual publish acknowledgement control, call
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within the
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a
* {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the publish acknowledgement.</p>
*
* @param publishAcknowledgementControlHandle An opaque handle obtained from
* {@link PublishReturn#acquirePublishAcknowledgementControl()}.
*/
public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) {
mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId());
}

/**
* Returns the connectivity state for the Mqtt5Client.
* @return True if the client is connected, false otherwise
Expand Down Expand Up @@ -277,4 +293,5 @@ private static native long mqtt5ClientNew(
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,15 @@ public interface LifecycleEvents {
*/
public interface PublishEvents {
/**
* Called when an MQTT PUBLISH packet is received by the client
* Called when an MQTT PUBLISH packet is received by the client.
*
* <p>To take manual control of the publish acknowledgement for a QoS 1 message, call
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within this callback. If you do so,
* the client will NOT automatically send the publish acknowledgement. You are responsible for calling
* {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} later.</p>
*
* <p>If you do not call {@code acquirePublishAcknowledgementControl()} within the callback ,
* the client will automatically send the publish acknowledgement after this callback returns.</p>
*
* @param client The client that has received the message
* @param publishReturn All of the data that was received from the server
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt5;

/**
* An opaque handle representing manual control over a publish acknowledgement for a received
* PUBLISH packet.
*
* <p>This class cannot be instantiated directly. Instances are only created by the CRT library.</p>
*/
public class Mqtt5PublishAcknowledgementControlHandle {

private final long controlId;

/**
* Creates a new Mqtt5PublishAcknowledgementControlHandle.
*
* @param controlId The native publish acknowledgement control ID returned by
* aws_mqtt5_client_acquire_publish_acknowledgement.
*/
Mqtt5PublishAcknowledgementControlHandle(long controlId) {
this.controlId = controlId;
}

/**
* Returns the native publish acknowledgement control ID, used by
* {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
* to pass the control ID to native code.
*
* @return The native publish acknowledgement control ID.
*/
long getControlId() {
return controlId;
}
}
60 changes: 57 additions & 3 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@
public class PublishReturn {
private PublishPacket publishPacket;

/**
* The already-acquired publish acknowledgement control ID, eagerly acquired by native code
* before the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback is invoked.
* For QoS 0 messages this is 0 (no publish acknowledgement required).
* After {@code acquirePublishAcknowledgementControl()} is called, this is set to 0 to prevent
* double-use.
*/
private long controlId;

/**
* The threadID of the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
* This is set at the beginning of the callback and then used to verify
* {@code acquirePublishAcknowledgementControl()} is being called from within.
*/
private long threadID;

/**
* Returns the PublishPacket returned from the server or Null if none was returned.
* @return The PublishPacket returned from the server.
Expand All @@ -22,12 +38,50 @@ public PublishPacket getPublishPacket() {
return publishPacket;
}

/**
* Acquires manual control over the publish acknowledgement for this PUBLISH message,
* preventing the client from automatically sending an acknowledgement. The returned handle can be
* passed to {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
* at a later time to send the publish acknowledgement to the broker.
*
* <p><b>Important:</b> This method must be called within the
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it outside the
* callback (wrong thread) or after it has already been called will return {@code null}.</p>
*
* <p>This method may only be called once per received PUBLISH. Subsequent calls will return
* {@code null}.</p>
*
* <p>If this method is not called, the client will automatically send a publish acknowledgment
* for QoS 1 messages when the callback returns.</p>
*
* @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send
* the acknowledgement, or {@code null} if called outside the callback, called more than
* once, or called on a QoS 0 message.
*/
public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() {
if (controlId == 0 || threadID != Thread.currentThread().getId()) {
return null;
}
long acquiredControlId = controlId;
/* Zero out so it can't be double-called */
controlId = 0;
return new Mqtt5PublishAcknowledgementControlHandle(acquiredControlId);
}

/**
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
* @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1.
* @return A newly created PublishResult
* The controlId is eagerly acquired by native code prior to
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} being called.
* The threadID is set to the calling thread and is used when
* {@link #acquirePublishAcknowledgementControl()} is called to guarantee the result
* is accurate and enforces the requirement of calling it from within the callback.
*
* @param newPublishPacket The PublishPacket data received from the server.
* @param controlId The pre-acquired publish acknowledgement control ID (0 for QoS 0 messages).
*/
private PublishReturn(PublishPacket newPublishPacket) {
private PublishReturn(PublishPacket newPublishPacket, long controlId) {
this.publishPacket = newPublishPacket;
this.controlId = controlId;
this.threadID = Thread.currentThread().getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1563,8 +1563,13 @@
{
"name": "<init>",
"parameterTypes": [
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket"
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket",
"long"
]
},
{
"name": "acquirePublishAcknowledgementControl",
"parameterTypes": []
}
]
},
Expand Down
14 changes: 13 additions & 1 deletion src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -2262,8 +2262,20 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) {
env,
mqtt5_publish_return_properties.return_class,
"<init>",
"(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V");
"(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;J)V");
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id);
/*
* acquirePublishAcknowledgementControl() called by native code after onMessageReceived returns
* to check whether the user took manual control during the callback. Returns a non-null handle
* if control was not yet acquired (native then auto-invokes the PUBACK), or null if the user
* already called it during the callback (user is responsible for invoking the PUBACK).
*/
mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id = (*env)->GetMethodID(
env,
mqtt5_publish_return_properties.return_class,
"acquirePublishAcknowledgementControl",
"()Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle;");
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
}

struct java_aws_mqtt5_on_stopped_return_properties mqtt5_on_stopped_return_properties;
Expand Down
5 changes: 4 additions & 1 deletion src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,10 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop
/* mqtt5.PublishReturn */
struct java_aws_mqtt5_publish_return_properties {
jclass return_class;
jmethodID return_constructor_id;
jmethodID return_constructor_id; /* (PublishPacket, long) */
jmethodID
return_acquire_publish_acknowledgement_control_id; /* ()Lsoftware/.../Mqtt5PublishAcknowledgementControlHandle;
*/
};
extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties;

Expand Down
86 changes: 84 additions & 2 deletions src/native/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,33 @@ static void s_aws_mqtt5_client_java_publish_received(
goto clean_up;
}

/* Make the PublishReturn struct that will hold all of the data that is passed to Java */
/*
* For QoS 1 messages, eagerly acquire the publish acknowledgement control before invoking
* the Java callback. The control ID is passed directly as a long to the PublishReturn
* constructor.
*
* After the callback returns, we call wasControlAcquired() on the PublishReturn object to
* determine whether the user called acquirePublishAcknowledgementControl() during the callback:
* - If wasControlAcquired() returns true, the user took manual control and is responsible
* for calling invokePublishAcknowledgement() later.
* - If wasControlAcquired() returns false (or an exception occurred), the user did NOT take
* control, so we auto-invoke the PUBACK to avoid losing it.
*/
uint64_t control_id = 0;
if (publish->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) {
/* Eagerly acquire the PUBACK control before invoking the Java callback */
control_id = aws_mqtt5_client_acquire_publish_acknowledgement(java_client->client, publish);
}

/* Make the PublishReturn struct that will hold all of the data that is passed to Java.
* The constructor takes (PublishPacket, long) where the long is the pre-acquired control ID
* (0 for QoS 0 messages). */
publish_packet_return_data = (*env)->NewObject(
env,
mqtt5_publish_return_properties.return_class,
mqtt5_publish_return_properties.return_constructor_id,
publish_packet_data);
publish_packet_data,
(jlong)control_id);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */

if (java_client->jni_publish_events) {
Expand All @@ -595,6 +616,28 @@ static void s_aws_mqtt5_client_java_publish_received(
publish_packet_return_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
}

/*
* After the callback returns, call acquirePublishAcknowledgementControl() on the PublishReturn.
* - If the user already called it during the callback, controlId was zeroed out and the
* method returns null. The user is responsible for invoking the PUBACK.
* - If the user did NOT call it, controlId is still non-zero and we get a non-null handle
* back. We then auto-invoke the PUBACK to avoid losing it.
*/
if (control_id != 0 && publish_packet_return_data != NULL) {
jobject handle = (*env)->CallObjectMethod(
env,
publish_packet_return_data,
mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
aws_jni_check_and_clear_exception(env);

if (handle != NULL) {
/* User did NOT call acquirePublishAcknowledgementControl() during the callback;
* auto-invoke the PUBACK. */
aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, control_id, NULL);
}
}

goto clean_up;

clean_up:
Expand Down Expand Up @@ -2190,6 +2233,45 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl
}
}

/*******************************************************************************
* Manual Publish Acknowledgement Control Functions
******************************************************************************/

/**
* Called from Mqtt5Client.mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId).
* Calls aws_mqtt5_client_invoke_publish_acknowledgement to send the publish acknowledgement (PUBACK)
* for a previously acquired manual publish acknowledgement control handle.
*/
JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePublishAcknowledgement(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jlong control_id) {
(void)jni_class;
aws_cache_jni_ids(env);

struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
return;
}

int result = aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, (uint64_t)control_id, NULL);
if (result != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"Mqtt5Client.invokePublishAcknowledgement: aws_mqtt5_client_invoke_publish_acknowledgement failed!",
aws_last_error());
}
}

#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
Expand Down
Loading
Loading