-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathMqtt5Client.java
More file actions
297 lines (264 loc) · 12.6 KB
/
Mqtt5Client.java
File metadata and controls
297 lines (264 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt5;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket;
import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket;
import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket;
import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket;
import software.amazon.awssdk.crt.mqtt5.packets.UnsubAckPacket;
import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket;
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder;
/**
* This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities
* via the AWS Common Runtime
*
* One Mqtt5Client class creates one connection.
*
*/
public class Mqtt5Client extends CrtResource {
/**
* A private reference to the websocket handshake from the MQTT5 client options
*/
private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
/**
* A boolean that holds whether the client's current state is connected or not
*/
private boolean isConnected;
/**
* A private config used to save config for mqtt3 connection creation
*/
private Mqtt5ClientOptions clientOptions;
/**
* Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created,
* changing the settings will not cause a change in already created Mqtt5Client's.
*
* @param options The Mqtt5Options class to use to configure the new Mqtt5Client.
* @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure
*/
public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException {
clientOptions = options;
ClientBootstrap bootstrap = options.getBootstrap();
SocketOptions socketOptions = options.getSocketOptions();
TlsContext tlsContext = options.getTlsContext();
HttpProxyOptions proxyOptions = options.getHttpProxyOptions();
ConnectPacket connectionOptions = options.getConnectOptions();
this.websocketHandshakeTransform = options.getWebsocketHandshakeTransform();
if (bootstrap == null) {
bootstrap = ClientBootstrap.getOrCreateStaticDefault();
}
if (connectionOptions == null) {
ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder();
connectionOptions = connectBuilder.build();
}
acquireNativeHandle(mqtt5ClientNew(
options,
connectionOptions,
bootstrap,
this
));
if (bootstrap != null) {
addReferenceTo(bootstrap);
}
if (socketOptions != null) {
addReferenceTo(socketOptions);
}
if (tlsContext != null) {
addReferenceTo(tlsContext);
}
if (proxyOptions != null) {
if (proxyOptions.getTlsContext() != null) {
addReferenceTo(proxyOptions.getTlsContext());
}
}
isConnected = false;
}
/**
* Cleans up the native resources associated with this client. The client is unusable after this call
*/
@Override
protected void releaseNativeHandle() {
if (!isNull()) {
mqtt5ClientDestroy(getNativeHandle());
}
}
/**
* Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
* Resources that wait are responsible for calling releaseReferences() manually.
*/
@Override
protected boolean canReleaseReferencesImmediately() { return false; }
/**
* Notifies the Mqtt5Client that you want it maintain connectivity to the configured endpoint.
* The client will attempt to stay connected using the properties of the reconnect-related parameters
* in the Mqtt5Client configuration.
*
* This is an asynchronous operation.
*
* @throws CrtRuntimeException If the native client returns an error when starting
*/
public void start() throws CrtRuntimeException {
mqtt5ClientInternalStart(getNativeHandle());
}
/**
* Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any
* existing connection and halting any reconnect attempts.
*
* This is an asynchronous operation.
*
* @param disconnectPacket (optional) Properties of a DISCONNECT packet to send as part of the shutdown process. When
* disconnectPacket is null, no DISCONNECT packets will be sent.
* @throws CrtRuntimeException If the native client is unable to initialize the stop process.
*/
public void stop(DisconnectPacket disconnectPacket) throws CrtRuntimeException {
mqtt5ClientInternalStop(getNativeHandle(), disconnectPacket);
}
/**
* Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any
* existing connection and halting any reconnect attempts. No DISCONNECT packets will be sent.
*
* This is an asynchronous operation.
*
* @throws CrtRuntimeException If the native client is unable to initialize the stop process.
*/
public void stop() throws CrtRuntimeException {
stop(null);
}
/**
* Tells the Mqtt5Client to attempt to send a PUBLISH packet.
*
* Will return a future containing a PublishPacket if the publish is successful.
* The data in the PublishPacket varies depending on the QoS of the Publish. For QoS 0, the PublishPacket
* will not contain data. For QoS 1, the PublishPacket will contain a PubAckPacket.
* See PublishPacket class documentation for more info.
*
* @param publishPacket PUBLISH packet to send to the server
* @return A future that will be rejected with an error or resolved with a PublishResult response
*/
public CompletableFuture<PublishResult> publish(PublishPacket publishPacket) {
CompletableFuture<PublishResult> publishFuture = new CompletableFuture<>();
mqtt5ClientInternalPublish(getNativeHandle(), publishPacket, publishFuture);
return publishFuture;
}
/**
* Tells the Mqtt5Client to attempt to subscribe to one or more topic filters.
*
* @param subscribePacket SUBSCRIBE packet to send to the server
* @return a future that will be rejected with an error or resolved with the SUBACK response
*/
public CompletableFuture<SubAckPacket> subscribe(SubscribePacket subscribePacket) {
CompletableFuture<SubAckPacket> subscribeFuture = new CompletableFuture<>();
mqtt5ClientInternalSubscribe(getNativeHandle(), subscribePacket, subscribeFuture);
return subscribeFuture;
}
/**
* Tells the Mqtt5Client to attempt to unsubscribe from one or more topic filters.
*
* @param unsubscribePacket UNSUBSCRIBE packet to send to the server
* @return a future that will be rejected with an error or resolved with the UNSUBACK response
*/
public CompletableFuture<UnsubAckPacket> unsubscribe(UnsubscribePacket unsubscribePacket) {
CompletableFuture<UnsubAckPacket> unsubscribeFuture = new CompletableFuture<>();
mqtt5ClientInternalUnsubscribe(getNativeHandle(), unsubscribePacket, unsubscribeFuture);
return unsubscribeFuture;
}
/**
* Returns statistics about the current state of the Mqtt5Client's queue of operations.
* @return Current state of the client's queue of operations.
*/
public Mqtt5ClientOperationStatistics getOperationStatistics() {
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
}
/**
* Sends a publish acknowledgement packet for a QoS 1 PUBLISH that was previously acquired
* for manual control.
*
* <p>To use manual publish acknowledgement control, call
* {@link PublishReturn#acquirePublishAcknowledgementControl()} within the
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to obtain a
* {@link Mqtt5PublishAcknowledgementControlHandle}. Then call this method to send the publish acknowledgement.</p>
*
* @param publishAcknowledgementControlHandle An opaque handle obtained from
* {@link PublishReturn#acquirePublishAcknowledgementControl()}.
*/
public void invokePublishAcknowledgement(Mqtt5PublishAcknowledgementControlHandle publishAcknowledgementControlHandle) {
mqtt5ClientInternalInvokePublishAcknowledgement(getNativeHandle(), publishAcknowledgementControlHandle.getControlId());
}
/**
* Returns the connectivity state for the Mqtt5Client.
* @return True if the client is connected, false otherwise
*/
public synchronized boolean getIsConnected() {
return isConnected;
}
/**
* Sets the connectivity state of the Mqtt5Client. Is used by JNI.
* @param connected The current connectivity state of the Mqtt5Client
*/
private synchronized void setIsConnected(boolean connected) {
isConnected = connected;
}
/*******************************************************************************
* Mqtt5 to Mqtt3 Adapter
******************************************************************************/
/**
* Returns the Mqtt5ClientOptions used for the Mqtt5Client
*
* @return Mqtt5ClientOptions
*/
public Mqtt5ClientOptions getClientOptions()
{
return clientOptions;
}
/*******************************************************************************
* websocket methods
******************************************************************************/
/**
* Called from native when a websocket handshake request is being prepared.
* @param handshakeRequest The HttpRequest being prepared
* @param nativeUserData Native data
*/
private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) {
CompletableFuture<HttpRequest> future = new CompletableFuture<>();
future.whenComplete((x, throwable) -> {
mqtt5ClientInternalWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null,
throwable, nativeUserData);
});
Mqtt5WebsocketHandshakeTransformArgs args = new Mqtt5WebsocketHandshakeTransformArgs(this, handshakeRequest, future);
Consumer<Mqtt5WebsocketHandshakeTransformArgs> transform = this.websocketHandshakeTransform;
if (transform != null) {
transform.accept(args);
} else {
args.complete(handshakeRequest);
}
}
/*******************************************************************************
* native methods
******************************************************************************/
private static native long mqtt5ClientNew(
Mqtt5ClientOptions options,
ConnectPacket connect_options,
ClientBootstrap bootstrap,
Mqtt5Client client
) throws CrtRuntimeException;
private static native void mqtt5ClientDestroy(long client);
private static native void mqtt5ClientInternalStart(long client);
private static native void mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options);
private static native void mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture<PublishResult> publish_result);
private static native void mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture<SubAckPacket> subscribe_suback);
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
private static native void mqtt5ClientInternalInvokePublishAcknowledgement(long client, long controlId);
}