Skip to content

Commit 83d451e

Browse files
authored
Capture fields from PUBLISH for req-resp stream operations (#888)
1 parent d2e698a commit 83d451e

6 files changed

Lines changed: 217 additions & 22 deletions

File tree

src/main/java/software/amazon/awssdk/crt/iot/IncomingPublishEvent.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,25 @@
55

66
package software.amazon.awssdk.crt.iot;
77

8+
import software.amazon.awssdk.crt.mqtt5.packets.UserProperty;
9+
10+
import java.util.List;
11+
12+
/**
13+
* An event that describes an incoming publish message received on a streaming operation.
14+
*/
815
public class IncomingPublishEvent {
916

1017
private final byte[] payload;
1118

1219
private final String topic;
1320

21+
private String contentType;
22+
23+
private List<UserProperty> userProperties;
24+
25+
private Long messageExpiryIntervalSeconds;
26+
1427
private IncomingPublishEvent(byte[] payload, String topic) {
1528
this.payload = payload;
1629
this.topic = topic;
@@ -33,4 +46,31 @@ public byte[] getPayload() {
3346
public String getTopic() {
3447
return topic;
3548
}
49+
50+
/**
51+
* Gets the content type of the IncomingPublishEvent.
52+
*
53+
* @return Content type of the IncomingPublishEvent.
54+
*/
55+
public String getContentType() {
56+
return contentType;
57+
}
58+
59+
/**
60+
* Gets the user properties of the IncomingPublishEvent.
61+
*
62+
* @return User properties of the IncomingPublishEvent.
63+
*/
64+
public List<UserProperty> getUserProperties() {
65+
return userProperties;
66+
}
67+
68+
/**
69+
* Gets the message expiry interval seconds of the IncomingPublishEvent.
70+
*
71+
* @return Message expiry interval seconds of the IncomingPublishEvent.
72+
*/
73+
public Long getMessageExpiryIntervalSeconds() {
74+
return messageExpiryIntervalSeconds;
75+
}
3676
}

src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,17 @@
10151015
"java.lang.String"
10161016
]
10171017
}
1018+
],
1019+
"fields": [
1020+
{
1021+
"name": "contentType"
1022+
},
1023+
{
1024+
"name": "userProperties"
1025+
},
1026+
{
1027+
"name": "messageExpiryIntervalSeconds"
1028+
}
10181029
]
10191030
},
10201031
{

src/native/java_class_ids.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,6 +2407,19 @@ static void s_cache_incoming_publish_event_properties(JNIEnv *env) {
24072407

24082408
incoming_publish_event_properties.constructor_method_id = (*env)->GetMethodID(
24092409
env, incoming_publish_event_properties.incoming_publish_event_class, "<init>", "([BLjava/lang/String;)V");
2410+
2411+
incoming_publish_event_properties.content_type_id = (*env)->GetFieldID(
2412+
env, incoming_publish_event_properties.incoming_publish_event_class, "contentType", "Ljava/lang/String;");
2413+
2414+
incoming_publish_event_properties.user_properties_field_id = (*env)->GetFieldID(
2415+
env, incoming_publish_event_properties.incoming_publish_event_class, "userProperties", "Ljava/util/List;");
2416+
2417+
incoming_publish_event_properties.message_expiry_interval_seconds_id = (*env)->GetFieldID(
2418+
env,
2419+
incoming_publish_event_properties.incoming_publish_event_class,
2420+
"messageExpiryIntervalSeconds",
2421+
"Ljava/lang/Long;");
2422+
24102423
AWS_FATAL_ASSERT(incoming_publish_event_properties.constructor_method_id);
24112424
}
24122425

src/native/java_class_ids.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,9 @@ extern struct java_mqtt_request_response_properties mqtt_request_response_proper
10061006
struct java_incoming_publish_event_properties {
10071007
jclass incoming_publish_event_class;
10081008
jmethodID constructor_method_id;
1009+
jfieldID content_type_id;
1010+
jfieldID user_properties_field_id;
1011+
jfieldID message_expiry_interval_seconds_id;
10091012
};
10101013
extern struct java_incoming_publish_event_properties incoming_publish_event_properties;
10111014

src/native/mqtt_request_response.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "java_class_ids.h"
1212
#include "mqtt5_client_jni.h"
13+
#include "mqtt5_utils.h"
1314
#include "mqtt_connection.h"
1415

1516
/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
@@ -726,6 +727,37 @@ static void s_aws_mqtt_streaming_operation_incoming_publish_callback(
726727
goto done;
727728
}
728729

730+
if (s_set_jni_string_field_in_packet(
731+
env,
732+
publish_event->content_type,
733+
java_incoming_publish_event,
734+
incoming_publish_event_properties.content_type_id,
735+
"content type",
736+
true)) {
737+
goto done;
738+
}
739+
740+
if (publish_event->user_properties != NULL && publish_event->user_property_count > 0) {
741+
if (s_set_user_properties_field(
742+
env,
743+
publish_event->user_property_count,
744+
publish_event->user_properties,
745+
java_incoming_publish_event,
746+
incoming_publish_event_properties.user_properties_field_id)) {
747+
goto done;
748+
}
749+
}
750+
751+
if (s_set_jni_uint32_t_field_in_packet(
752+
env,
753+
publish_event->message_expiry_interval_seconds,
754+
java_incoming_publish_event,
755+
incoming_publish_event_properties.message_expiry_interval_seconds_id,
756+
"message expiry interval seconds",
757+
true)) {
758+
goto done;
759+
}
760+
729761
(*env)->CallVoidMethod(
730762
env,
731763
binding->java_incoming_publish_event_callback,

src/test/java/software/amazon/awssdk/crt/test/MqttRequestResponseClientTests.java

Lines changed: 118 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn;
2222
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
2323
import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket;
24+
import software.amazon.awssdk.crt.mqtt5.packets.UserProperty;
2425

2526
import java.nio.charset.StandardCharsets;
27+
import java.util.ArrayList;
2628
import java.util.UUID;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.ExecutionException;
@@ -699,14 +701,21 @@ public void ShadowUpdatedStreamingOperationOpenCloseMqtt311() {
699701
doStreamingOperationOpenClosedTest(MqttVersion.Mqtt311);
700702
}
701703

702-
void doStreamingOperationIncomingPublishTest(MqttVersion version) {
704+
public interface StreamOperationPublishHelper {
705+
String getTopic();
706+
PublishPacket createMqtt5PublishPacket();
707+
MqttMessage createMqtt3Message();
708+
void assertPublishEvent(IncomingPublishEvent event);
709+
}
710+
711+
void doStreamingOperationIncomingPublishTest(MqttVersion version, StreamOperationPublishHelper helper) {
703712
this.context = new TestContext(version, null);
704713
CompletableFuture<Boolean> subscribed = new CompletableFuture<>();
705714
CompletableFuture<IncomingPublishEvent> publishEvent = new CompletableFuture<>();
706715

707-
String fakeShadowTopic = "not/a/shadow/topic/" + (UUID.randomUUID()).toString();
716+
String topic = helper.getTopic();
708717
StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
709-
.withTopic(fakeShadowTopic)
718+
.withTopic(topic)
710719
.withSubscriptionStatusEventCallback((event) -> {
711720
if (event.getType() == SubscriptionStatusEventType.SUBSCRIPTION_ESTABLISHED) {
712721
subscribed.complete(true);
@@ -723,48 +732,135 @@ void doStreamingOperationIncomingPublishTest(MqttVersion version) {
723732
try {
724733
subscribed.get();
725734
} catch (Exception ex) {
726-
Assert.assertTrue(false);
735+
Assert.fail(ex.getMessage());
727736
}
728737

729-
String originalPayloadAsString = "IncomingPublishTest";
730-
byte[] originalPayload = originalPayloadAsString.getBytes(StandardCharsets.UTF_8);
731-
732738
if (version == MqttVersion.Mqtt5) {
733-
PublishPacket publishPacket = new PublishPacket.PublishPacketBuilder()
734-
.withPayload(originalPayload)
735-
.withTopic(fakeShadowTopic)
736-
.withQOS(QOS.AT_LEAST_ONCE)
737-
.build();
739+
PublishPacket publishPacket = helper.createMqtt5PublishPacket();
738740
this.context.mqtt5Client.publish(publishPacket);
739741
} else {
740-
this.context.mqtt311Client.publish(new MqttMessage(fakeShadowTopic, originalPayload, QualityOfService.AT_LEAST_ONCE));
742+
this.context.mqtt311Client.publish(helper.createMqtt3Message());
741743
}
742744

743745
try {
744746
IncomingPublishEvent event = publishEvent.get();
747+
helper.assertPublishEvent(event);
748+
} catch (Exception | Error ex) {
749+
Assert.fail(ex.getMessage());
750+
} finally {
751+
stream.close();
752+
}
753+
}
754+
755+
class StreamOperationPublishHelper_RequiredFieldsOnly implements StreamOperationPublishHelper {
756+
private final String payload = "IncomingPublishTest";
757+
private final String topic = "not/a/shadow/topic/" + (UUID.randomUUID()).toString();
758+
759+
@Override
760+
public String getTopic() {
761+
return topic;
762+
}
763+
764+
@Override
765+
public PublishPacket createMqtt5PublishPacket() {
766+
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
767+
return new PublishPacket.PublishPacketBuilder()
768+
.withPayload(payloadBytes)
769+
.withTopic(topic)
770+
.withQOS(QOS.AT_LEAST_ONCE)
771+
.build();
772+
}
773+
774+
@Override
775+
public MqttMessage createMqtt3Message() {
776+
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
777+
return new MqttMessage(topic, payloadBytes, QualityOfService.AT_LEAST_ONCE);
778+
}
779+
780+
@Override
781+
public void assertPublishEvent(IncomingPublishEvent event) {
745782
Assert.assertNotNull(event);
746-
String payloadAsString = new String(event.getPayload(), StandardCharsets.UTF_8);
747-
Assert.assertEquals(originalPayloadAsString, payloadAsString);
748-
Assert.assertEquals(fakeShadowTopic, event.getTopic());
749-
} catch (Exception ex) {
750-
Assert.assertTrue(false);
783+
String eventPayload = new String(event.getPayload(), StandardCharsets.UTF_8);
784+
Assert.assertEquals("payload", payload, eventPayload);
785+
Assert.assertEquals("topic", topic, event.getTopic());
786+
787+
Assert.assertNull("content-type", event.getContentType());
788+
Assert.assertNull("user-properties", event.getUserProperties());
789+
Assert.assertNull("message-expiry-interval-seconds", event.getMessageExpiryIntervalSeconds());
751790
}
791+
}
752792

753-
stream.close();
793+
class StreamOperationPublishHelper_AllFields implements StreamOperationPublishHelper {
794+
private final String payload = "IncomingPublishTest";
795+
private final String topic = "not/a/shadow/topic/" + (UUID.randomUUID()).toString();
796+
private final String contentType = "application/json";
797+
private ArrayList<UserProperty> userProperties;
798+
799+
@Override
800+
public String getTopic() {
801+
return topic;
802+
}
803+
804+
@Override
805+
public PublishPacket createMqtt5PublishPacket() {
806+
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
807+
808+
userProperties = new ArrayList<UserProperty>();
809+
userProperties.add(new UserProperty("Hello", "World"));
810+
811+
return new PublishPacket.PublishPacketBuilder()
812+
.withPayload(payloadBytes)
813+
.withTopic(topic)
814+
.withQOS(QOS.AT_LEAST_ONCE)
815+
.withContentType(contentType)
816+
.withUserProperties(userProperties)
817+
.withMessageExpiryIntervalSeconds(100L)
818+
.build();
819+
}
820+
821+
@Override
822+
public MqttMessage createMqtt3Message() {
823+
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
824+
return new MqttMessage(topic, payloadBytes, QualityOfService.AT_LEAST_ONCE);
825+
}
826+
827+
@Override
828+
public void assertPublishEvent(IncomingPublishEvent event) {
829+
Assert.assertNotNull(event);
830+
String eventPayload = new String(event.getPayload(), StandardCharsets.UTF_8);
831+
Assert.assertEquals("payload", payload, eventPayload);
832+
Assert.assertEquals("topic", topic, event.getTopic());
833+
Assert.assertEquals("content-type", contentType, event.getContentType());
834+
Assert.assertEquals("user-properties size", userProperties.size(), event.getUserProperties().size());
835+
for (int i = 0; i < userProperties.size(); i++) {
836+
Assert.assertEquals("user-property " + i + " key", userProperties.get(i).key, event.getUserProperties().get(i).key);
837+
Assert.assertEquals("user-property " + i + " value", userProperties.get(i).value, event.getUserProperties().get(i).value);
838+
}
839+
Assert.assertNotNull("message-expiry-interval-seconds should not be null", event.getMessageExpiryIntervalSeconds());
840+
// We can't check for the exact value here as it'll be decremented by the server part.
841+
Assert.assertTrue("message-expiry-interval-seconds should be a positive number", event.getMessageExpiryIntervalSeconds() > 0);
842+
}
843+
}
844+
845+
@Test
846+
public void StreamingOperationIncomingPublishMqtt5_RequiredFieldsOnly() {
847+
skipIfNetworkUnavailable();
848+
849+
doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt5, new StreamOperationPublishHelper_RequiredFieldsOnly());
754850
}
755851

756852
@Test
757-
public void StreamingOperationIncomingPublishMqtt5() {
853+
public void StreamingOperationIncomingPublishMqtt5_AllFields() {
758854
skipIfNetworkUnavailable();
759855

760-
doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt5);
856+
doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt5, new StreamOperationPublishHelper_AllFields());
761857
}
762858

763859
@Test
764860
public void StreamingOperationIncomingPublishMqtt311() {
765861
skipIfNetworkUnavailable();
766862

767-
doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt311);
863+
doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt311, new StreamOperationPublishHelper_RequiredFieldsOnly());
768864
}
769865

770866
void doStreamingOperationReopenFailureTest(MqttVersion version) {

0 commit comments

Comments
 (0)