Skip to content

Commit 0d4795f

Browse files
Fix MQTT QoS 2 Subscriber Handling (PUBREC/PUBREL/PUBCOMP) and Add Tests (#90)
* Fix QoS handling in handlePacket for MQTT messages * Add QoS 2 message handling test case * Update CHANGELOG.md to reflect improvements in QoS handling and test coverage
1 parent e151f82 commit 0d4795f

3 files changed

Lines changed: 86 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
66
[Semantic Versioning](https://semver.org/).
77

8+
## Unreleased
9+
10+
### Changed
11+
12+
* Improved test coverage for all QoS levels (0, 1, 2)
13+
* Increased robustness and correctness in client-side handling of QoS 1 and QoS 2 messages (proper handling of PUBACK, PUBREC, PUBREL, PUBCOMP sequences)
14+
* Merged and cleaned up tests to ensure MQTT protocol compliance for all QoS scenarios
15+
816
## [3.3.0] - 2025-12-14
917

1018
### Added

src/PubSubClient.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -393,10 +393,13 @@ bool PubSubClient::handlePacket(uint8_t hdrLen, size_t length) {
393393
ERROR_PSC_PRINTF_P("handlePacket(): Missing msgId in QoS 1/2 message\n");
394394
return false;
395395
}
396+
uint8_t publishQos = MQTT_HDR_GET_QOS(_buffer[0]); // save QoS before _buffer[0] is overwritten
396397
uint16_t msgId = (_buffer[payloadOffset] << 8) + _buffer[payloadOffset + 1];
397398
callback(topic, payload + 2, payloadLen - 2); // remove the msgId from the callback payload
398399

399-
_buffer[0] = MQTTPUBACK;
400+
// QoS 1: respond with PUBACK
401+
// QoS 2: respond with PUBREC (first step of the QoS 2 subscriber handshake)
402+
_buffer[0] = (publishQos == MQTT_QOS1) ? MQTTPUBACK : MQTTPUBREC;
400403
_buffer[1] = 2;
401404
_buffer[2] = (msgId >> 8);
402405
_buffer[3] = (msgId & 0xFF);
@@ -415,20 +418,36 @@ bool PubSubClient::handlePacket(uint8_t hdrLen, size_t length) {
415418
// No futher action here, as resending is not supported.
416419
break;
417420
case MQTTPUBREC:
418-
// MQTT Publish Received (QoS 2 publish received, part 1): See section 3.5 MQTT v3.1.1 protocol specification
421+
// MQTT Publish Received (QoS 2 publisher handshake, part 1): broker acknowledges our QoS 2 PUBLISH.
422+
// See section 3.5 MQTT v3.1.1 protocol specification.
419423
if (length < 4) {
420424
ERROR_PSC_PRINTF_P("handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
421425
return false;
422426
}
423-
// MQTT Publish Release (QoS 2 publish received, part 2): See section 3.6 MQTT v3.1.1 protocol specification
424-
_buffer[0] = MQTTPUBREL | 2; // PUBREL with bit 1 set
425-
// bytes 1-3 of PUBREL are the same as of PUBREC
427+
// MQTT Publish Release (QoS 2 publisher handshake, part 2): See section 3.6 MQTT v3.1.1 protocol specification
428+
_buffer[0] = MQTTPUBREL | 2; // PUBREL fixed header: bit 1 must be set per spec
429+
// bytes 1-3 of PUBREL are the same as of PUBREC (remaining length + msgId)
430+
if (_client->write(_buffer, 4) == 4) {
431+
_lastOutActivity = millis();
432+
}
433+
break;
434+
case MQTTPUBREL:
435+
// MQTT Publish Release (QoS 2 subscriber handshake, part 2): broker releases the message to us.
436+
// See section 3.6 MQTT v3.1.1 protocol specification.
437+
if (length < 4) {
438+
ERROR_PSC_PRINTF_P("handlePacket(): Received PUBREL packet with length %zu, expected at least 4 bytes\n", length);
439+
return false;
440+
}
441+
// MQTT Publish Complete (QoS 2 subscriber handshake, part 3): See section 3.7 MQTT v3.1.1 protocol specification
442+
_buffer[0] = MQTTPUBCOMP;
443+
// bytes 1-3 of PUBCOMP are the same as of PUBREL (remaining length + msgId)
426444
if (_client->write(_buffer, 4) == 4) {
427445
_lastOutActivity = millis();
428446
}
429447
break;
430448
case MQTTPUBCOMP:
431-
// MQTT Publish Complete (QoS 2 publish received, part 3): See section 3.7 MQTT v3.1.1 protocol specification
449+
// MQTT Publish Complete (QoS 2 publisher handshake, part 3): broker confirms delivery of our QoS 2 PUBLISH.
450+
// See section 3.7 MQTT v3.1.1 protocol specification.
432451
if (length < 4) {
433452
ERROR_PSC_PRINTF_P("handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
434453
return false;

tests/src/receive_spec.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ int test_receive_oversized_message();
2828
int test_resize_buffer();
2929
int test_receive_oversized_stream_message();
3030
int test_receive_qos1();
31+
int test_receive_qos2();
3132

3233
void reset_callback() {
3334
callback_called = false;
@@ -330,6 +331,57 @@ int test_receive_qos1() {
330331
END_IT
331332
}
332333

334+
int test_receive_qos2() {
335+
IT("receives a qos2 message - responds PUBREC then PUBCOMP");
336+
reset_callback();
337+
338+
ShimClient shimClient;
339+
shimClient.setAllowConnect(true);
340+
341+
byte connack[] = {0x20, 0x02, 0x00, 0x00};
342+
shimClient.respond(connack, 4);
343+
344+
PubSubClient client(server, 1883, callback, shimClient);
345+
bool rc = client.connect("client_test1");
346+
IS_TRUE(rc);
347+
348+
// QoS 2 PUBLISH from broker (0x34 = MQTTPUBLISH | QoS2 bits)
349+
// Fixed header 0x34, remaining length 0x10 (16), topic len 0x0005, topic "topic",
350+
// msgId 0x1234, payload "payload"
351+
byte publish[] = {0x34, 0x10, 0x0, 0x5, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x34, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64};
352+
shimClient.respond(publish, 18);
353+
354+
// Client must respond with PUBREC (0x50), remaining length 2, msgId 0x1234
355+
byte pubrec[] = {0x50, 0x02, 0x12, 0x34};
356+
shimClient.expect(pubrec, 4);
357+
358+
rc = client.loop();
359+
IS_TRUE(rc);
360+
IS_TRUE(callback_called);
361+
IS_TRUE(strcmp(lastTopic, "topic") == 0);
362+
IS_TRUE(memcmp(lastPayload, "payload", 7) == 0);
363+
IS_TRUE(lastLength == 7);
364+
IS_FALSE(shimClient.error());
365+
366+
reset_callback();
367+
368+
// Broker sends PUBREL (0x62 = MQTTPUBREL | bit1), remaining length 2, msgId 0x1234
369+
byte pubrel[] = {0x62, 0x02, 0x12, 0x34};
370+
shimClient.respond(pubrel, 4);
371+
372+
// Client must respond with PUBCOMP (0x70), remaining length 2, msgId 0x1234
373+
byte pubcomp[] = {0x70, 0x02, 0x12, 0x34};
374+
shimClient.expect(pubcomp, 4);
375+
376+
rc = client.loop();
377+
IS_TRUE(rc);
378+
IS_FALSE(callback_called); // callback must NOT fire again on PUBREL
379+
380+
IS_FALSE(shimClient.error());
381+
382+
END_IT
383+
}
384+
333385
int main() {
334386
SUITE("Receive");
335387
test_receive_callback();
@@ -340,6 +392,7 @@ int main() {
340392
test_resize_buffer();
341393
test_receive_oversized_stream_message();
342394
test_receive_qos1();
395+
test_receive_qos2();
343396

344397
FINISH
345398
}

0 commit comments

Comments
 (0)