Skip to content

Commit 9e2fe2b

Browse files
committed
simplify logic related to whether a manual acknowledgement was taken. Return null instead of throwing exception
1 parent ccfdf45 commit 9e2fe2b

4 files changed

Lines changed: 34 additions & 76 deletions

File tree

src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,6 @@ public class PublishReturn {
3030
*/
3131
private long threadID;
3232

33-
/**
34-
* Set to true when {@code acquirePublishAcknowledgementControl()} is called, indicating that
35-
* the user has taken manual control of the publish acknowledgement. Native code reads this
36-
* via {@code wasControlAcquired()} after the callback returns to decide whether to
37-
* auto-invoke the publish acknowledgement.
38-
*/
39-
private boolean controlAcquired;
40-
4133
/**
4234
* Returns the PublishPacket returned from the server or Null if none was returned.
4335
* @return The PublishPacket returned from the server.
@@ -53,58 +45,29 @@ public PublishPacket getPublishPacket() {
5345
* at a later time to send the publish acknowledgement to the broker.
5446
*
5547
* <p><b>Important:</b> This method must be called within the
56-
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the
57-
* callback returns will throw an {@link IllegalStateException}.</p>
48+
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it outside the
49+
* callback (wrong thread) or after it has already been called will return {@code null}.</p>
5850
*
59-
* <p>This method may only be called once per received PUBLISH. Subsequent calls will throw
60-
* an {@link IllegalStateException}.</p>
51+
* <p>This method may only be called once per received PUBLISH. Subsequent calls will return
52+
* {@code null}.</p>
6153
*
6254
* <p>If this method is not called, the client will automatically send a publish acknowledgment
6355
* for QoS 1 messages when the callback returns.</p>
6456
*
65-
* @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send the acknowledgement.
66-
* @throws IllegalStateException if called outside the onMessageReceived callback, called more than once,
67-
* or called on a QoS 0 message.
57+
* @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send
58+
* the acknowledgement, or {@code null} if called outside the callback, called more than
59+
* once, or called on a QoS 0 message.
6860
*/
6961
public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() {
7062
if (controlId == 0 || threadID != Thread.currentThread().getId()) {
71-
throw new IllegalStateException(
72-
"acquirePublishAcknowledgementControl() must be called within the onMessageReceived callback and may only be called once.");
63+
return null;
7364
}
7465
long acquiredControlId = controlId;
7566
/* Zero out so it can't be double-called */
7667
controlId = 0;
77-
controlAcquired = true;
7868
return new Mqtt5PublishAcknowledgementControlHandle(acquiredControlId);
7969
}
8070

81-
/**
82-
* Returns whether the user called {@link #acquirePublishAcknowledgementControl()} during the
83-
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
84-
*
85-
* <p>This is called by native/JNI code after the callback returns to determine whether to
86-
* automatically invoke the publish acknowledgement (PUBACK). If this returns {@code false},
87-
* native code will auto-invoke the PUBACK; if {@code true}, the user is responsible for
88-
* calling {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}.</p>
89-
*
90-
* @return {@code true} if the user acquired manual control of the PUBACK, {@code false} otherwise.
91-
*/
92-
private synchronized boolean wasControlAcquired() {
93-
return controlAcquired;
94-
}
95-
96-
/**
97-
* Called by native/JNI code after the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived}
98-
* callback returns to prevent post-callback use of
99-
* {@link #acquirePublishAcknowledgementControl()}.
100-
*
101-
* <p>Zeroes out {@code controlId} so that any saved reference to this {@code PublishReturn}
102-
* cannot call {@code acquirePublishAcknowledgementControl()} after the callback has returned.</p>
103-
*/
104-
private synchronized void invalidateAfterCallback() {
105-
controlId = 0;
106-
}
107-
10871
/**
10972
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
11073
* The controlId is eagerly acquired by native code prior to
@@ -119,7 +82,6 @@ private synchronized void invalidateAfterCallback() {
11982
private PublishReturn(PublishPacket newPublishPacket, long controlId) {
12083
this.publishPacket = newPublishPacket;
12184
this.controlId = controlId;
122-
this.controlAcquired = false;
12385
this.threadID = Thread.currentThread().getId();
12486
}
12587
}

src/native/java_class_ids.c

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2265,20 +2265,17 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) {
22652265
"(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;J)V");
22662266
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id);
22672267
/*
2268-
* wasControlAcquired() called by native code after onMessageReceived returns to determine
2269-
* whether the user called acquirePublishAcknowledgementControl() during the callback.
2270-
* Returns true if the user took manual control of the PUBACK, false otherwise.
2268+
* acquirePublishAcknowledgementControl() called by native code after onMessageReceived returns
2269+
* to check whether the user took manual control during the callback. Returns a non-null handle
2270+
* if control was not yet acquired (native then auto-invokes the PUBACK), or null if the user
2271+
* already called it during the callback (user is responsible for invoking the PUBACK).
22712272
*/
2272-
mqtt5_publish_return_properties.return_was_control_acquired_id =
2273-
(*env)->GetMethodID(env, mqtt5_publish_return_properties.return_class, "wasControlAcquired", "()Z");
2274-
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_was_control_acquired_id);
2275-
/*
2276-
* invalidateAfterCallback() called by native code after onMessageReceived returns to zero
2277-
* out controlId, preventing post-callback calls to acquirePublishAcknowledgementControl().
2278-
*/
2279-
mqtt5_publish_return_properties.return_invalidate_after_callback_id =
2280-
(*env)->GetMethodID(env, mqtt5_publish_return_properties.return_class, "invalidateAfterCallback", "()V");
2281-
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_invalidate_after_callback_id);
2273+
mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id = (*env)->GetMethodID(
2274+
env,
2275+
mqtt5_publish_return_properties.return_class,
2276+
"acquirePublishAcknowledgementControl",
2277+
"()Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5PublishAcknowledgementControlHandle;");
2278+
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
22822279
}
22832280

22842281
struct java_aws_mqtt5_on_stopped_return_properties mqtt5_on_stopped_return_properties;

src/native/java_class_ids.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -928,9 +928,9 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop
928928
/* mqtt5.PublishReturn */
929929
struct java_aws_mqtt5_publish_return_properties {
930930
jclass return_class;
931-
jmethodID return_constructor_id; /* (PublishPacket, long) */
932-
jmethodID return_was_control_acquired_id; /* ()Z true if user called acquirePublishAcknowledgementControl() */
933-
jmethodID return_invalidate_after_callback_id; /* ()V zeroes controlId to prevent post-callback use */
931+
jmethodID return_constructor_id; /* (PublishPacket, long) */
932+
jmethodID return_acquire_publish_acknowledgement_control_id; /* ()Lsoftware/.../Mqtt5PublishAcknowledgementControlHandle;
933+
*/
934934
};
935935
extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties;
936936

src/native/mqtt5_client.c

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -618,23 +618,22 @@ static void s_aws_mqtt5_client_java_publish_received(
618618
}
619619

620620
/*
621-
* After the callback returns:
622-
* 1. Call invalidateAfterCallback() to zero out controlId in the PublishReturn, preventing
623-
* any post-callback call to acquirePublishAcknowledgementControl() from succeeding.
624-
* 2. Call wasControlAcquired() to check whether the user took manual control during the
625-
* callback. If they did NOT, auto-invoke the PUBACK to avoid losing it.
621+
* After the callback returns, call acquirePublishAcknowledgementControl() on the PublishReturn.
622+
* - If the user already called it during the callback, controlId was zeroed out and the
623+
* method returns null. The user is responsible for invoking the PUBACK.
624+
* - If the user did NOT call it, controlId is still non-zero and we get a non-null handle
625+
* back. We then auto-invoke the PUBACK to avoid losing it.
626626
*/
627627
if (control_id != 0 && publish_packet_return_data != NULL) {
628-
/* Invalidate the PublishReturn to prevent post-callback use */
629-
(*env)->CallVoidMethod(
630-
env, publish_packet_return_data, mqtt5_publish_return_properties.return_invalidate_after_callback_id);
631-
632-
/* Check whether the user called acquirePublishAcknowledgementControl() during the callback */
633-
jboolean user_acquired_control = (*env)->CallBooleanMethod(
634-
env, publish_packet_return_data, mqtt5_publish_return_properties.return_was_control_acquired_id);
628+
jobject handle = (*env)->CallObjectMethod(
629+
env,
630+
publish_packet_return_data,
631+
mqtt5_publish_return_properties.return_acquire_publish_acknowledgement_control_id);
632+
aws_jni_check_and_clear_exception(env);
635633

636-
if (!user_acquired_control) {
637-
/* User did NOT call acquirePublishAcknowledgementControl(); auto-invoke the PUBACK */
634+
if (handle != NULL) {
635+
/* User did NOT call acquirePublishAcknowledgementControl() during the callback;
636+
* auto-invoke the PUBACK. */
638637
aws_mqtt5_client_invoke_publish_acknowledgement(java_client->client, control_id, NULL);
639638
}
640639
}

0 commit comments

Comments
 (0)