Skip to content

Commit ce1418c

Browse files
Merge branch 'main' into more_algos
2 parents e021bf1 + 66ae106 commit ce1418c

25 files changed

Lines changed: 1096 additions & 52 deletions
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
package software.amazon.awssdk.crt.internal;
6+
7+
/**
8+
* @internal
9+
* IoT Device SDK Metrics Structure. Not for external usage.
10+
*/
11+
public class IoTDeviceSDKMetrics {
12+
private String libraryName;
13+
14+
public IoTDeviceSDKMetrics() {
15+
this.libraryName = "IoTDeviceSDK/Java";
16+
}
17+
18+
public String getLibraryName() {
19+
return libraryName;
20+
}
21+
}

src/main/java/software/amazon/awssdk/crt/mqtt/MqttClientConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
1818
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
1919
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
20+
import software.amazon.awssdk.crt.internal.IoTDeviceSDKMetrics;
2021

2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.function.Consumer;
@@ -77,6 +78,7 @@ private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions
7778
options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null
7879
? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000
7980
: 0);
81+
options.setMetricsEnabled(mqtt5options.getMetricsEnabled());
8082
return options;
8183
}
8284

@@ -162,6 +164,10 @@ private void SetupConfig(MqttConnectionConfig config) throws MqttException {
162164
mqttClientConnectionSetLogin(getNativeHandle(), config.getUsername(), config.getPassword());
163165
}
164166

167+
if (config.getMetricsEnabled()) {
168+
mqttClientConnectionSetMetrics(getNativeHandle(), new IoTDeviceSDKMetrics());
169+
}
170+
165171
if (config.getMinReconnectTimeoutSecs() != 0L && config.getMaxReconnectTimeoutSecs() != 0L) {
166172
mqttClientConnectionSetReconnectTimeout(getNativeHandle(), config.getMinReconnectTimeoutSecs(),
167173
config.getMaxReconnectTimeoutSecs());
@@ -502,6 +508,9 @@ private static native boolean mqttClientConnectionSetWill(long connection, Strin
502508
private static native void mqttClientConnectionSetLogin(long connection, String username, String password)
503509
throws CrtRuntimeException;
504510

511+
private static native void mqttClientConnectionSetMetrics(long connection, IoTDeviceSDKMetrics metrics)
512+
throws CrtRuntimeException;
513+
505514
private static native void mqttClientConnectionSetReconnectTimeout(long connection, long minTimeout,
506515
long maxTimeout)
507516
throws CrtRuntimeException;

src/main/java/software/amazon/awssdk/crt/mqtt/MqttConnectionConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ public final class MqttConnectionConfig extends CrtResource {
4646
private HttpProxyOptions proxyOptions;
4747
private Consumer<WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
4848

49+
/* metrics */
50+
private boolean metricsEnabled = true;
51+
4952
public MqttConnectionConfig() {}
5053

5154

@@ -538,6 +541,24 @@ public Consumer<WebsocketHandshakeTransformArgs> getWebsocketHandshakeTransform(
538541
return websocketHandshakeTransform;
539542
}
540543

544+
/**
545+
* Enables or disables IoT Device SDK metrics collection. The metrics includes SDK name, version, and platform.
546+
*
547+
* @param enabled true to enable metrics, false to disable
548+
*/
549+
public void setMetricsEnabled(boolean enabled) {
550+
this.metricsEnabled = enabled;
551+
}
552+
553+
/**
554+
* Queries whether IoT Device SDK metrics collection is enabled
555+
*
556+
* @return true if metrics are enabled, false if disabled
557+
*/
558+
public boolean getMetricsEnabled() {
559+
return metricsEnabled;
560+
}
561+
541562
/**
542563
* Creates a (shallow) clone of this config object
543564
*
@@ -567,6 +588,7 @@ public MqttConnectionConfig clone() {
567588
clone.setWebsocketHandshakeTransform(getWebsocketHandshakeTransform());
568589

569590
clone.setReconnectTimeoutSecs(getMinReconnectTimeoutSecs(), getMaxReconnectTimeoutSecs());
591+
clone.setMetricsEnabled(getMetricsEnabled());
570592

571593
// success, bump up the ref count so we can escape the try-with-resources block
572594
clone.addRef();

src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,22 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
202202
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
203203
}
204204

205+
/**
206+
* Sends a publish acknowledgement packet for a QoS 1 PUBLISH that was previously acquired
207+
* for manual control.
208+
*
209+
* <p>To use manual publish acknowledgement control, call
210+
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within the
211+
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a
212+
* {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the publish acknowledgement.</p>
213+
*
214+
* @param publishAcknowledgementControlHandle An opaque handle obtained from
215+
* {@link PublishReturn#acquirePublishAcknowledgementControl()}.
216+
*/
217+
public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) {
218+
mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId());
219+
}
220+
205221
/**
206222
* Returns the connectivity state for the Mqtt5Client.
207223
* @return True if the client is connected, false otherwise
@@ -277,4 +293,5 @@ private static native long mqtt5ClientNew(
277293
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
278294
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
279295
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
296+
private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId);
280297
}

src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
1414
import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;
15+
import software.amazon.awssdk.crt.internal.IoTDeviceSDKMetrics;
1516

1617
import java.util.Map;
1718
import java.util.function.Function;
@@ -45,6 +46,12 @@ public class Mqtt5ClientOptions {
4546
private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
4647
private PublishEvents publishEvents;
4748
private TopicAliasingOptions topicAliasingOptions;
49+
// Indicates whether AWS IoT Metrics are enabled for this client, default to true.
50+
// We don't expose iotDeviceSDKMetrics in the builder, and only allow setting
51+
// metricsEnabled for now.
52+
private boolean metricsEnabled = true;
53+
private IoTDeviceSDKMetrics iotDeviceSDKMetrics;
54+
4855

4956
/**
5057
* Returns the host name of the MQTT server to connect to.
@@ -263,6 +270,24 @@ public TopicAliasingOptions getTopicAliasingOptions() {
263270
return this.topicAliasingOptions;
264271
}
265272

273+
/**
274+
* Returns whether AWS IoT Device SDK metrics collection is enabled
275+
*
276+
* @return true if metrics are enabled, false otherwise
277+
*/
278+
public boolean getMetricsEnabled() {
279+
return this.metricsEnabled;
280+
}
281+
282+
/**
283+
* Enables or disables IoT Device SDK metrics collection. The metrics includes SDK name, version, and platform.
284+
*
285+
* @param enabled true to enable metrics, false to disable
286+
*/
287+
public void setMetricsEnabled(boolean enabled) {
288+
this.metricsEnabled = enabled;
289+
}
290+
266291
/**
267292
* Creates a Mqtt5ClientOptionsBuilder instance
268293
* @param builder The builder to get the Mqtt5ClientOptions values from
@@ -289,6 +314,8 @@ public Mqtt5ClientOptions(Mqtt5ClientOptionsBuilder builder) {
289314
this.websocketHandshakeTransform = builder.websocketHandshakeTransform;
290315
this.publishEvents = builder.publishEvents;
291316
this.topicAliasingOptions = builder.topicAliasingOptions;
317+
this.metricsEnabled = builder.metricsEnabled;
318+
this.iotDeviceSDKMetrics = new IoTDeviceSDKMetrics();
292319
}
293320

294321
/*******************************************************************************
@@ -345,7 +372,15 @@ public interface LifecycleEvents {
345372
*/
346373
public interface PublishEvents {
347374
/**
348-
* Called when an MQTT PUBLISH packet is received by the client
375+
* Called when an MQTT PUBLISH packet is received by the client.
376+
*
377+
* <p>To take manual control of the publish acknowledgement for a QoS 1 message, call
378+
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within this callback. If you do so,
379+
* the client will NOT automatically send the publish acknowledgement. You are responsible for calling
380+
* {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)} later.</p>
381+
*
382+
* <p>If you do not call {@code acquirePublishAcknowledgementControl()} within the callback ,
383+
* the client will automatically send the publish acknowledgement after this callback returns.</p>
349384
*
350385
* @param client The client that has received the message
351386
* @param publishReturn All of the data that was received from the server
@@ -583,6 +618,7 @@ static final public class Mqtt5ClientOptionsBuilder {
583618
private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
584619
private PublishEvents publishEvents;
585620
private TopicAliasingOptions topicAliasingOptions;
621+
private boolean metricsEnabled = true;
586622

587623
/**
588624
* Sets the host name of the MQTT server to connect to.
@@ -850,6 +886,17 @@ public Mqtt5ClientOptionsBuilder withTopicAliasingOptions(TopicAliasingOptions o
850886
return this;
851887
}
852888

889+
/**
890+
* Enables or disables IoT Device SDK metrics collection. The metrics includes SDK name, version, and platform.
891+
*
892+
* @param enabled true to enable metrics, false to disable.
893+
* @return The Mqtt5ClientOptionsBuilder after setting the metrics option
894+
*/
895+
public Mqtt5ClientOptionsBuilder withMetricsEnabled(boolean enabled) {
896+
this.metricsEnabled = enabled;
897+
return this;
898+
}
899+
853900
/**
854901
* Creates a new Mqtt5ClientOptionsBuilder instance
855902
*
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
package software.amazon.awssdk.crt.mqtt5;
6+
7+
/**
8+
* An opaque handle representing manual control over a publish acknowledgement for a received
9+
* PUBLISH packet.
10+
*
11+
* <p>This class cannot be instantiated directly. Instances are only created by the CRT library.</p>
12+
*/
13+
public class Mqtt5PublishAcknowledgementControlHandle {
14+
15+
private final long controlId;
16+
17+
/**
18+
* Creates a new Mqtt5PublishAcknowledgementControlHandle.
19+
*
20+
* @param controlId The native publish acknowledgement control ID returned by
21+
* aws_mqtt5_client_acquire_publish_acknowledgement.
22+
*/
23+
Mqtt5PublishAcknowledgementControlHandle(long controlId) {
24+
this.controlId = controlId;
25+
}
26+
27+
/**
28+
* Returns the native publish acknowledgement control ID, used by
29+
* {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
30+
* to pass the control ID to native code.
31+
*
32+
* @return The native publish acknowledgement control ID.
33+
*/
34+
long getControlId() {
35+
return controlId;
36+
}
37+
}

src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,22 @@
1414
public class PublishReturn {
1515
private PublishPacket publishPacket;
1616

17+
/**
18+
* The already-acquired publish acknowledgement control ID, eagerly acquired by native code
19+
* before the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback is invoked.
20+
* For QoS 0 messages this is 0 (no publish acknowledgement required).
21+
* After {@code acquirePublishAcknowledgementControl()} is called, this is set to 0 to prevent
22+
* double-use.
23+
*/
24+
private long controlId;
25+
26+
/**
27+
* The threadID of the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
28+
* This is set at the beginning of the callback and then used to verify
29+
* {@code acquirePublishAcknowledgementControl()} is being called from within.
30+
*/
31+
private long threadID;
32+
1733
/**
1834
* Returns the PublishPacket returned from the server or Null if none was returned.
1935
* @return The PublishPacket returned from the server.
@@ -22,12 +38,50 @@ public PublishPacket getPublishPacket() {
2238
return publishPacket;
2339
}
2440

41+
/**
42+
* Acquires manual control over the publish acknowledgement for this PUBLISH message,
43+
* preventing the client from automatically sending an acknowledgement. The returned handle can be
44+
* passed to {@link Mqtt5Client#invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle)}
45+
* at a later time to send the publish acknowledgement to the broker.
46+
*
47+
* <p><b>Important:</b> This method must be called within the
48+
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it outside the
49+
* callback (wrong thread) or after it has already been called will return {@code null}.</p>
50+
*
51+
* <p>This method may only be called once per received PUBLISH. Subsequent calls will return
52+
* {@code null}.</p>
53+
*
54+
* <p>If this method is not called, the client will automatically send a publish acknowledgment
55+
* for QoS 1 messages when the callback returns.</p>
56+
*
57+
* @return A {@link Mqtt5PublishAcknowledgementControlHandle} that can be used to manually send
58+
* the acknowledgement, or {@code null} if called outside the callback, called more than
59+
* once, or called on a QoS 0 message.
60+
*/
61+
public synchronized Mqtt5PublishAcknowledgementControlHandle acquirePublishAcknowledgementControl() {
62+
if (controlId == 0 || threadID != Thread.currentThread().getId()) {
63+
return null;
64+
}
65+
long acquiredControlId = controlId;
66+
/* Zero out so it can't be double-called */
67+
controlId = 0;
68+
return new Mqtt5PublishAcknowledgementControlHandle(acquiredControlId);
69+
}
70+
2571
/**
2672
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
27-
* @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1.
28-
* @return A newly created PublishResult
73+
* The controlId is eagerly acquired by native code prior to
74+
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} being called.
75+
* The threadID is set to the calling thread and is used when
76+
* {@link #acquirePublishAcknowledgementControl()} is called to guarantee the result
77+
* is accurate and enforces the requirement of calling it from within the callback.
78+
*
79+
* @param newPublishPacket The PublishPacket data received from the server.
80+
* @param controlId The pre-acquired publish acknowledgement control ID (0 for QoS 0 messages).
2981
*/
30-
private PublishReturn(PublishPacket newPublishPacket) {
82+
private PublishReturn(PublishPacket newPublishPacket, long controlId) {
3183
this.publishPacket = newPublishPacket;
84+
this.controlId = controlId;
85+
this.threadID = Thread.currentThread().getId();
3286
}
3387
}

src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1256,6 +1256,14 @@
12561256
}
12571257
]
12581258
},
1259+
{
1260+
"name": "software.amazon.awssdk.crt.internal.IoTDeviceSDKMetrics",
1261+
"fields": [
1262+
{
1263+
"name": "libraryName"
1264+
}
1265+
]
1266+
},
12591267
{
12601268
"name": "software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions",
12611269
"fields": [
@@ -1306,6 +1314,12 @@
13061314
},
13071315
{
13081316
"name": "topicAliasingOptions"
1317+
},
1318+
{
1319+
"name": "iotDeviceSDKMetrics"
1320+
},
1321+
{
1322+
"name": "metricsEnabled"
13091323
}
13101324
],
13111325
"methods": [
@@ -1549,8 +1563,13 @@
15491563
{
15501564
"name": "<init>",
15511565
"parameterTypes": [
1552-
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket"
1566+
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket",
1567+
"long"
15531568
]
1569+
},
1570+
{
1571+
"name": "acquirePublishAcknowledgementControl",
1572+
"parameterTypes": []
15541573
}
15551574
]
15561575
},

0 commit comments

Comments
 (0)