Skip to content

Commit 4875250

Browse files
committed
puback to publish acknowledgement. aws-c-mqtt -> v0.15.2
1 parent f3484fe commit 4875250

7 files changed

Lines changed: 145 additions & 132 deletions

File tree

src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,17 +203,20 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
203203
}
204204

205205
/**
206-
* Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control.
206+
* Sends a publish acknowledgement for a QoS 1 PUBLISH that was previously acquired
207+
* for manual control.
207208
*
208-
* <p>To use manual PUBACK control, call {@link PublishReturn#acquirePubackControl()} within
209-
* the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to
210-
* obtain a {@link Mqtt5PubackControlHandle}. Then call this method to send the PUBACK.</p>
209+
* <p>To use manual publish acknowledgement control, call
210+
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within the
211+
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a
212+
* {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the PUBACK.</p>
211213
*
212-
* @param pubackControlHandle An opaque handle obtained from {@link PublishReturn#acquirePubackControl()}.
213-
* @throws CrtRuntimeException If the native client returns an error when invoking the PUBACK.
214+
* @param publishAcknowledgementControlHandle An opaque handle obtained from
215+
* {@link PublishReturn#acquirePublishAcknowledgementControl()}.
216+
* @throws CrtRuntimeException If the native client returns an error when invoking the publish acknowledgement.
214217
*/
215-
public void invokePuback(Mqtt5PubackControlHandle pubackControlHandle) throws CrtRuntimeException {
216-
mqtt5ClientInternalInvokePuback(getNativeHandle(), pubackControlHandle.getControlId());
218+
public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) throws CrtRuntimeException {
219+
mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId());
217220
}
218221

219222
/**
@@ -291,5 +294,5 @@ private static native long mqtt5ClientNew(
291294
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
292295
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
293296
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
294-
private static native void mqtt5ClientInternalInvokePuback(long client, long controlId);
297+
private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId);
295298
}

src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5PubackControlHandle.java

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
package software.amazon.awssdk.crt.mqtt5;
6+
7+
/**
8+
* An opaque handle representing manual control over a publish acknowledgement for a received
9+
* PUBLISH packet.
10+
*
11+
* <p>This class cannot be instantiated directly. Instances are only created by the CRT library.</p>
12+
*/
13+
public class Mqtt5PublishAcknowledgementControlHandle {
14+
15+
private final long controlId;
16+
17+
/**
18+
* Creates a new Mqtt5PublishAcknowledgementControlHandle. Only called from native/JNI code.
19+
*
20+
* @param controlId The native publish acknowledgement control ID returned by
21+
* aws_mqtt5_client_acquire_publish_acknowledgement.
22+
*/
23+
Mqtt5PublishAcknowledgementControlHandle(long controlId) {
24+
this.controlId = controlId;
25+
}
26+
27+
/**
28+
* Returns the native publish acknowledgement control ID. Used internally by JNI.
29+
*
30+
* @return The native publish acknowledgement control ID.
31+
*/
32+
long getControlId() {
33+
return controlId;
34+
}
35+
}

src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class PublishReturn {
1515
private PublishPacket publishPacket;
1616

1717
/**
18-
* Single-element long array holding the native manual PUBACK control context pointer.
18+
* Single-element long array holding the native manual publish acknowledgement control context pointer.
1919
* Element [0] is the pointer value, valid only during the
2020
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
2121
* QoS 0 results in this being set to 0.
@@ -33,10 +33,10 @@ public PublishPacket getPublishPacket() {
3333
}
3434

3535
/**
36-
* Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the
37-
* client from automatically sending a PUBACK. The returned handle can be passed to
38-
* {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the
39-
* PUBACK to the broker.
36+
* Acquires manual control over the publish acknowledgement (PUBACK) for this PUBLISH message,
37+
* preventing the client from automatically sending a acknowledgement. The returned handle can be passed to
38+
* {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} at a later
39+
* time to send the PUBACK to the broker.
4040
*
4141
* <p><b>Important:</b> This method must be called within the
4242
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the
@@ -48,18 +48,19 @@ public PublishPacket getPublishPacket() {
4848
* <p>If this method is not called, the client will automatically send a PUBACK for QoS 1
4949
* messages when the callback returns.</p>
5050
*
51-
* @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK.
52-
* @throws IllegalStateException if called outside the onMessageReceived callback or called more than once.
51+
* @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send the acknowledgement.
52+
* @throws IllegalStateException if called outside the onMessageReceived callback, called more than once,
53+
* or called on a QoS 0 message.
5354
*/
54-
public synchronized Mqtt5PubackControlHandle acquirePubackControl() {
55+
public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() {
5556
if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) {
5657
throw new IllegalStateException(
57-
"acquirePubackControl() must be called within the onMessageReceived callback and may only be called once.");
58+
"acquirePublishAcknowledgementControl() must be called within the onMessageReceived callback and may only be called once.");
5859
}
59-
long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]);
60+
long controlId = mqtt5AcquirePublishAcknowledgementControl(nativeContextPtrHolder[0]);
6061
/* We set the array element to 0 so it can't be double-called */
6162
nativeContextPtrHolder[0] = 0;
62-
return new Mqtt5PubackControlHandle(controlId);
63+
return new Mqtt5PublishAcknowledgementControlHandle(controlId);
6364
}
6465

6566
/**
@@ -68,17 +69,18 @@ public synchronized Mqtt5PubackControlHandle acquirePubackControl() {
6869
* after the onMessageReceived callback returns to prevent use-after-free.
6970
*
7071
* @param newPublishPacket The PublishPacket data received from the server.
71-
* @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer.
72+
* @param nativeContextPtrHolder Single-element long[] holding the native publish acknowledgement control
73+
* context pointer.
7274
*/
7375
private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) {
7476
this.publishPacket = newPublishPacket;
7577
this.nativeContextPtrHolder = nativeContextPtrHolder;
7678
}
7779

7880
/**
79-
* Calls the native aws_mqtt5_client_acquire_puback function.
80-
* @param nativeContextPtr Pointer to the native manual PUBACK control context.
81-
* @return The native puback control ID.
81+
* Calls the native aws_mqtt5_client_acquire_publish_acknowledgement function.
82+
* @param nativeContextPtr Pointer to the native manual publish acknowledgement control context.
83+
* @return The native publish acknowledgement control ID.
8284
*/
83-
private static native long mqtt5AcquirePubackControl(long nativeContextPtr);
84-
}
85+
private static native long mqtt5AcquirePublishAcknowledgementControl(long nativeContextPtr);
86+
}

src/native/mqtt5_client.c

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ struct aws_mqtt5_client_publish_return_data {
3838
jobject jni_publish_future;
3939
};
4040

41-
/* Context for manual PUBACK control. Valid only during the publish received callback. */
42-
struct manual_puback_control_context {
41+
/* Context for manual publish acknowledgement control. Valid only during the publish received callback. */
42+
struct manual_publish_acknowledgement_control_context {
4343
struct aws_mqtt5_client *client;
4444
const struct aws_mqtt5_packet_publish_view *publish_packet;
4545
};
@@ -586,12 +586,12 @@ static void s_aws_mqtt5_client_java_publish_received(
586586
goto clean_up;
587587
}
588588

589-
/* Create manual PUBACK control context (valid only during this callback)
589+
/* Create manual publish acknowledgement control context (valid only during this callback).
590590
* We clean this before clean_up since we don't want to create this early and we can be sure to
591591
* hit the aws_mem_release AFTER we return from the publish events callback.
592592
*/
593-
struct manual_puback_control_context *control_context =
594-
aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context));
593+
struct manual_publish_acknowledgement_control_context *control_context =
594+
aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_publish_acknowledgement_control_context));
595595

596596
/*
597597
* Create a single-element long[] array to hold the native context pointer.
@@ -603,14 +603,15 @@ static void s_aws_mqtt5_client_java_publish_received(
603603
control_context->publish_packet = publish;
604604

605605
context_ptr_holder = (*env)->NewLongArray(env, 1);
606-
/* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */
606+
/* If allocation failed, Java will throw IllegalStateException on acquirePublishAcknowledgementControl(). */
607607
if (context_ptr_holder != NULL) {
608608
/*
609-
* Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no PUBACK
610-
* to send, so acquirePubackControl() should throw IllegalStateException. We pass 0 (null).
609+
* Only expose the context pointer for QoS 1 publishes. For QoS 0, there is no publish
610+
* acknowledgement to send, so acquirePublishAcknowledgementControl() should throw
611+
* IllegalStateException. We pass 0 (null).
611612
*/
612613
jlong context_ptr_value = (publish->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) ? 0 : (jlong)(uintptr_t)control_context;
613-
/* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */
614+
/* Assigning the pointer to the allocated manual_publish_acknowledgement_control_context to the array */
614615
(*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value);
615616
}
616617

@@ -636,7 +637,7 @@ static void s_aws_mqtt5_client_java_publish_received(
636637

637638
/*
638639
* Invalidate the context pointer in the long[] array now that the callback has returned.
639-
* This prevents use-after-free if acquirePubackControl() is called after the callback.
640+
* This prevents use-after-free if acquirePublishAcknowledgementControl() is called after the callback.
640641
* SetLongArrayRegion requires no extra JNI method ID.
641642
*/
642643
if (context_ptr_holder != NULL) {
@@ -2225,18 +2226,18 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl
22252226
}
22262227

22272228
/*******************************************************************************
2228-
* Manual PUBACK Control Functions
2229+
* Manual Publish Acknowledgement Control Functions
22292230
******************************************************************************/
22302231

22312232
/**
2232-
* Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr).
2233-
* Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK
2234-
* for the received PUBLISH packet. Returns the puback_control_id as a jlong.
2233+
* Called from PublishReturn.mqtt5AcquirePublishAcknowledgementControl(long nativeContextPtr).
2234+
* Calls aws_mqtt5_client_acquire_publish_acknowledgement to take manual control of the publish
2235+
* acknowledgement (PUBACK) for the received PUBLISH packet. Returns the pub_ack_control_id as a jlong.
22352236
*
22362237
* This must be called within the onMessageReceived callback while the
22372238
* native context pointer is still valid.
22382239
*/
2239-
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl(
2240+
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePublishAcknowledgementControl(
22402241
JNIEnv *env,
22412242
jclass jni_class,
22422243
jlong native_context_ptr) {
@@ -2246,53 +2247,58 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt
22462247
if (native_context_ptr == 0) {
22472248
aws_jni_throw_runtime_exception(
22482249
env,
2249-
"PublishReturn.acquirePubackControl: context is no longer valid. "
2250-
"acquirePubackControl() must be called within the onMessageReceived callback.");
2250+
"PublishReturn.acquirePublishAcknowledgementControl: context is no longer valid. "
2251+
"acquirePublishAcknowledgementControl() must be called within the onMessageReceived callback.");
22512252
return 0;
22522253
}
22532254

2254-
struct manual_puback_control_context *context =
2255-
(struct manual_puback_control_context *)(uintptr_t)native_context_ptr;
2255+
struct manual_publish_acknowledgement_control_context *context =
2256+
(struct manual_publish_acknowledgement_control_context *)(uintptr_t)native_context_ptr;
22562257

22572258
if (!context->client || !context->publish_packet) {
22582259
aws_jni_throw_runtime_exception(
2259-
env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context");
2260+
env,
2261+
"PublishReturn.acquirePublishAcknowledgementControl: invalid native publish acknowledgement control "
2262+
"context");
22602263
return 0;
22612264
}
22622265

2263-
uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet);
2266+
uint64_t control_id = aws_mqtt5_client_acquire_publish_acknowledgement(context->client, context->publish_packet);
22642267
return (jlong)control_id;
22652268
}
22662269

22672270
/**
2268-
* Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId).
2269-
* Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired
2270-
* manual PUBACK control handle.
2271+
* Called from Mqtt5Client.mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId).
2272+
* Calls aws_mqtt5_client_invoke_publish_acknowledgement to send the publish acknowledgement (PUBACK)
2273+
* for a previously acquired manual publish acknowledgement control handle.
22712274
*/
2272-
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback(
2273-
JNIEnv *env,
2274-
jclass jni_class,
2275-
jlong jni_client,
2276-
jlong control_id) {
2275+
JNIEXPORT void JNICALL
2276+
Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePublishAcknowledgement(
2277+
JNIEnv *env,
2278+
jclass jni_class,
2279+
jlong jni_client,
2280+
jlong control_id) {
22772281
(void)jni_class;
22782282
aws_cache_jni_ids(env);
22792283

22802284
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
22812285
if (!java_client) {
22822286
s_aws_mqtt5_client_log_and_throw_exception(
2283-
env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
2287+
env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
22842288
return;
22852289
}
22862290
if (!java_client->client) {
22872291
s_aws_mqtt5_client_log_and_throw_exception(
2288-
env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
2292+
env, "Mqtt5Client.invokePublishAcknowledgement: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
22892293
return;
22902294
}
22912295

2292-
int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL);
2296+
int result = aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, (uint64_t)control_id, NULL);
22932297
if (result != AWS_OP_SUCCESS) {
22942298
s_aws_mqtt5_client_log_and_throw_exception(
2295-
env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error());
2299+
env,
2300+
"Mqtt5Client.invokePublishAcknowledgement: aws_mqtt5_client_invoke_publish_acknowledgement failed!",
2301+
aws_last_error());
22962302
}
22972303
}
22982304

0 commit comments

Comments
 (0)