Skip to content

Commit a301c3d

Browse files
authored
Fix MQTTv5 correlation data leak (#973)
1 parent a147b32 commit a301c3d

2 files changed

Lines changed: 106 additions & 1 deletion

File tree

src/native/mqtt5_packets.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1136,7 +1136,7 @@ struct aws_mqtt5_packet_publish_view_java_jni *aws_mqtt5_packet_publish_view_cre
11361136
mqtt5_publish_packet_properties.publish_payload_field_id,
11371137
s_publish_packet_string,
11381138
"payload",
1139-
&java_packet->correlation_data_buf,
1139+
&java_packet->payload_buf,
11401140
&java_packet->payload_cursor,
11411141
true,
11421142
&was_value_set) == AWS_OP_ERR) {

src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2163,6 +2163,111 @@ public void Op_DirectPacketBuilders() throws Exception {
21632163
CrtResource.waitForNoResources();
21642164
}
21652165

2166+
/* Full Publish Packet Tests */
2167+
private void doPublishWithFullPacket() {
2168+
try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
2169+
AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
2170+
TlsContext tlsContext = new TlsContext(tlsOptions)) {
2171+
2172+
String testUUID = UUID.randomUUID().toString();
2173+
String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
2174+
String testResponseTopic = "test/MQTT5_Binding_Java_Response_" + testUUID;
2175+
String testContentType = "application/json";
2176+
byte[] testPayload = "{\"message\": \"Hello World\"}".getBytes();
2177+
byte[] testCorrelationData = "correlation-data-12345".getBytes();
2178+
2179+
Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
2180+
LifecycleEvents_Futured events = new LifecycleEvents_Futured();
2181+
builder.withLifecycleEvents(events);
2182+
builder.withTlsContext(tlsContext);
2183+
2184+
PublishEvents_Futured publishEvents = new PublishEvents_Futured();
2185+
builder.withPublishEvents(publishEvents);
2186+
2187+
// Build a publish packet with all available fields populated
2188+
PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder();
2189+
publishPacketBuilder.withTopic(testTopic);
2190+
publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE);
2191+
publishPacketBuilder.withPayload(testPayload);
2192+
publishPacketBuilder.withRetain(false);
2193+
publishPacketBuilder.withPayloadFormat(PublishPacket.PayloadFormatIndicator.UTF8);
2194+
publishPacketBuilder.withMessageExpiryIntervalSeconds(60L);
2195+
publishPacketBuilder.withResponseTopic(testResponseTopic);
2196+
publishPacketBuilder.withCorrelationData(testCorrelationData);
2197+
publishPacketBuilder.withContentType(testContentType);
2198+
2199+
ArrayList<UserProperty> userProperties = new ArrayList<UserProperty>();
2200+
userProperties.add(new UserProperty("test-name-1", "test-value-1"));
2201+
userProperties.add(new UserProperty("test-name-2", "test-value-2"));
2202+
publishPacketBuilder.withUserProperties(userProperties);
2203+
2204+
SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);
2205+
2206+
try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
2207+
client.start();
2208+
events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2209+
2210+
// Subscribe to the topic first
2211+
client.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2212+
2213+
// Publish the message with all fields
2214+
client.publish(publishPacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2215+
2216+
// Wait for the publish to be received
2217+
publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2218+
2219+
// Verify the received publish packet has the expected fields
2220+
PublishPacket receivedPacket = publishEvents.publishPacket;
2221+
assertNotNull("Received publish packet should not be null", receivedPacket);
2222+
2223+
// Verify topic
2224+
assertEquals("Topic should match", testTopic, receivedPacket.getTopic());
2225+
2226+
// Verify QoS
2227+
assertEquals("QoS should match", QOS.AT_LEAST_ONCE, receivedPacket.getQOS());
2228+
2229+
// Verify payload
2230+
assertTrue("Payload should match", java.util.Arrays.equals(testPayload, receivedPacket.getPayload()));
2231+
2232+
// Verify payload format indicator
2233+
assertEquals("Payload format should match",
2234+
PublishPacket.PayloadFormatIndicator.UTF8, receivedPacket.getPayloadFormat());
2235+
2236+
// Verify response topic
2237+
assertEquals("Response topic should match", testResponseTopic, receivedPacket.getResponseTopic());
2238+
2239+
// Verify correlation data
2240+
assertTrue("Correlation data should match",
2241+
java.util.Arrays.equals(testCorrelationData, receivedPacket.getCorrelationData()));
2242+
2243+
// Verify content type
2244+
assertEquals("Content type should match", testContentType, receivedPacket.getContentType());
2245+
2246+
// Verify user properties
2247+
assertNotNull("User properties should not be null", receivedPacket.getUserProperties());
2248+
assertEquals("User properties count should match", 2, receivedPacket.getUserProperties().size());
2249+
2250+
client.stop();
2251+
events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2252+
}
2253+
} catch (Exception ex) {
2254+
throw new RuntimeException(ex);
2255+
}
2256+
}
2257+
2258+
/* Publish with full packet - all fields populated */
2259+
@Test
2260+
public void Op_PublishWithFullPacket() throws Exception {
2261+
skipIfNetworkUnavailable();
2262+
Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT,
2263+
AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
2264+
2265+
TestUtils.doRetryableTest(this::doPublishWithFullPacket, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES,
2266+
TEST_RETRY_SLEEP_MILLIS);
2267+
2268+
CrtResource.waitForNoResources();
2269+
}
2270+
21662271
/**
21672272
* ============================================================
21682273
* Error Operation Tests

0 commit comments

Comments
 (0)