Skip to content
Merged
55 changes: 55 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
connection on malformed packets, which the broker's existing decode-
error path enforces. The check covers ClientId, Will Topic, Topic Name,
Topic Filter, Username, and v5 STRING/STRING_PAIR property values.
- The broker now requires `auth_user` and `auth_pass` to be configured as
a pair. Previously, setting only one (e.g. the `-u` CLI flag without
`-P`) silently enabled single-factor authentication: the unconfigured
side was never checked, so any password authenticated against a matching
username (or any username against a matching password). `MqttBroker_Start`
now rejects a partial credential configuration with
`MQTT_CODE_ERROR_BAD_ARG`, and the connect-time gate fails closed as a
defense in depth if only one credential is set. Leaving both NULL still
disables authentication.

* API / Behavior Changes
- `MqttDecode_String` may now return `MQTT_CODE_ERROR_MALFORMED_DATA`
Expand All @@ -29,6 +38,52 @@
because [MQTT-3.1.3.5] defines Password as Binary Data, not a UTF-8
string. A binary password containing bytes that are not valid UTF-8
(e.g., `0xC0`, `0xFF`) would otherwise be incorrectly rejected.
- `MqttClient_Publish` / `MqttClient_Publish_ex` now return the new
`MQTT_CODE_ERROR_PUBLISH_REJECTED` (-21) when a v5 broker rejects a
QoS>0 PUBLISH via a PUBACK (QoS 1), PUBREC, or PUBCOMP (QoS 2) reason
code >= 0x80 (e.g. Not authorized, Quota exceeded, Topic Name invalid,
Payload format invalid). Previously these were reported as
`MQTT_CODE_SUCCESS`, so the application proceeded as if the broker had
accepted the message. The specific reason is available in
`MqttPublish.resp.reason_code`. For QoS 2, a PUBREC reason code >= 0x80
now ends the exchange without sending PUBREL per [MQTT-4.3.3] instead of
timing out. v3.1.1 publishes are unaffected, as is the return value of
the fire-and-forget `MqttClient_Publish_WriteOnly` call itself. Callers
that treat any non-success return as fatal may need to handle this code.
In `WOLFMQTT_MULTITHREAD` builds where a dedicated thread drives reads,
that reading thread now returns `MQTT_CODE_ERROR_PUBLISH_REJECTED` when it
processes a QoS 2 PUBREC rejection (instead of advancing the handshake
with an illegal PUBREL); the originating write-only publish's pending
response is not auto-completed in that case, so it blocks until
`cmd_timeout_ms`. A QoS 1 PUBACK or QoS 2 PUBCOMP rejection is NOT
detected on the write-only path (the publish appears successful), matching
prior behavior; use `MqttClient_Publish`/`_ex` for reliable detection.
- An incoming PUBLISH received by a client with no message callback
registered (`msg_cb == NULL`) now returns `MQTT_CODE_ERROR_CALLBACK`
(-13) instead of `MQTT_CODE_SUCCESS`. Previously the payload was silently
read and discarded, and for QoS 1/2 a PUBACK/PUBREC was still sent,
falsely telling the broker the message was delivered while the application
never saw it. `MqttClient_HandlePacket` no longer populates an ACK in this
case, so no false acknowledgement is sent. This affects standard MQTT
(`MqttClient_Publish_ReadPayload`) and MQTT-SN (`SN_Client_HandlePacket`).
A registered `msg_cb` is now required to receive a PUBLISH; this includes
the receive-into-object pattern via `MqttClient_WaitMessage_ex` /
`SN_Client_WaitMessage_ex`, which previously returned success after
decoding into the caller-supplied object without a callback. A NULL
callback is still valid for a publish-only client that never receives
messages; the error surfaces only if such a client is actually pushed a
PUBLISH. The built-in broker is unaffected (it handles incoming PUBLISH
through its own path, not this one).

* Fixes
- `SN_Client_Unsubscribe` now registers its pending UNSUBACK response under
the real Packet Identifier instead of a hard-coded `0`. In
`WOLFMQTT_MULTITHREAD` builds where a dedicated reader thread processes the
UNSUBACK first, `MqttClient_RespList_Find` matches on the decoded packet
id, so the id-0 entry never matched and the response was consumed into the
generic object, leaving the unsubscribing thread blocked until
`cmd_timeout_ms`. The registration now matches `SN_Client_Subscribe`,
`SN_Client_Register`, and `SN_Client_Publish`.

### v2.0.0 (03/20/2026)
Release 2.0.0 has been developed according to wolfSSL's development and QA
Expand Down
66 changes: 63 additions & 3 deletions src/mqtt_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -4585,6 +4585,14 @@ static int BrokerHandle_Connect(BrokerClient* bc, int rx_len,
}
if (broker->auth_user || broker->auth_pass) {
int auth_ok = 1;
/* Defense in depth: MqttBroker_Start rejects a partial credential
* config, but MqttBroker struct fields are public and a caller may
* set them after start or ignore the start error. If only one of the
* pair is configured, fail closed rather than authenticating on the
* configured half and accepting any value for the missing one. */
if (broker->auth_user == NULL || broker->auth_pass == NULL) {
auth_ok = 0;
}
if (broker->auth_user && (
#ifndef WOLFMQTT_STATIC_MEMORY
bc->username == NULL ||
Expand Down Expand Up @@ -5421,7 +5429,46 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
* PUBLISH is fully received and decoded before we
* reach the fan-out); BrokerOutPub_Alloc deep-copies
* pub.total_len from that buffer. */
{
if (sub->client->out_q_count >=
BROKER_MAX_QUEUED_MSGS_PER_SUB) {
/* DoS guard: bound the connected subscriber's outbound
* queue depth. The inflight cap above only limits bytes on
* the wire; a subscriber that stops acking lets QUEUED
* entries accumulate one heap-copied PUBLISH at a time
* until the broker exhausts memory. Disconnect the slow /
* abusive subscriber rather than growing out_q or silently
* dropping accepted QoS 1/2 messages. A persistent session
* is reclaimable on reconnect via the (capped) offline
* queue. Mirrors the static partial-write teardown: tear
* the socket down and clear connected so the match guard
* above skips this client's remaining subscriptions; the
* main loop reaps it on the next read error. */
/* Cache the client before BrokerSend_Disconnect below: that
* write can drive an lws_service spin whose
* LWS_CALLBACK_CLOSED frees this subscriber's BrokerSub
* nodes re-entrantly (the same hazard the next_sub snapshot
* guards against), leaving `sub` dangling. The BrokerClient
* itself survives the write - its free is deferred while the
* WS context is marked processing - so the post-write socket
* teardown must reach it through this cached pointer, never
* through the freed sub node. */
BrokerClient* c = sub->client;
WBLOG_ERR(broker,
"broker: out_q full (%d) -> disconnect sock=%d "
"(from sock=%d)", c->out_q_count,
(int)c->sock, (int)bc->sock);
#ifdef WOLFMQTT_V5
(void)BrokerSend_Disconnect(c,
MQTT_REASON_QUOTA_EXCEEDED);
#endif
if (c->sock != BROKER_SOCKET_INVALID) {
broker->net.close(broker->net.ctx,
c->sock);
c->sock = BROKER_SOCKET_INVALID;
}
c->connected = 0;
}
else {
BrokerOutPub* e = BrokerOutPub_Alloc(topic, payload,
pub.total_len);
if (e == NULL) {
Expand Down Expand Up @@ -6280,6 +6327,19 @@ int MqttBroker_Start(MqttBroker* broker)

#ifdef WOLFMQTT_BROKER_AUTH
if (broker->auth_user || broker->auth_pass) {
/* Username and password are a credential pair: configuring only one
* silently downgrades to single-factor auth, because the unconfigured
* side is never checked and any client-supplied value for it is
* accepted. Reject the partial config at startup so the operator sees
* the mistake instead of shipping a bypassable broker. */
if (broker->auth_user == NULL || broker->auth_pass == NULL) {
WBLOG_ERR(broker,
"broker: auth requires both auth_user and auth_pass "
"(user=%s pass=%s)",
broker->auth_user ? "set" : "(null)",
broker->auth_pass ? "set" : "(null)");
return MQTT_CODE_ERROR_BAD_ARG;
}
/* Reject configured credentials that would be silently rejected
* by BrokerStrCompare's cmp_len guard. Catching this at startup
* avoids a confusing state where every client auth fails. */
Expand All @@ -6299,8 +6359,8 @@ int MqttBroker_Start(MqttBroker* broker)
BROKER_MAX_PASSWORD_LEN);
return MQTT_CODE_ERROR_BAD_ARG;
}
WBLOG_INFO(broker, "broker: auth enabled user=%s",
broker->auth_user ? broker->auth_user : "(null)");
WBLOG_INFO(broker, "broker: auth enabled (user+password) user=%s",
broker->auth_user);
#ifdef ENABLE_MQTT_TLS
#ifndef WOLFMQTT_BROKER_NO_INSECURE
if (broker->use_tls &&
Expand Down
75 changes: 72 additions & 3 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -972,18 +972,34 @@ static int MqttClient_HandlePacket(MqttClient* client,
}

rc = MqttClient_Publish_ReadPayload(client, publish, timeout_ms);
if (rc < 0) {

/* MQTT_CODE_CONTINUE means the payload is not fully read yet. Return
* to the caller and keep publish->props and the read state intact so
* the non-blocking re-entry can resume. */
if (rc == MQTT_CODE_CONTINUE) {
break;
}
/* Note: Getting here means the Publish Read is done */
publish->stat.read = MQTT_MSG_BEGIN; /* reset state */

/* The publish read is terminal here, whether it succeeded or failed
* to deliver (e.g. no msg_cb, or the callback returned an error).
* Reset the read state and free the retained V5 property list for
* every terminal result: the properties are intentionally kept
* through decode/callback and were previously freed only on the
* success path, so an error return would leak the property pool
* (or heap, under WOLFMQTT_DYN_PROP). Also resetting the read state
* keeps a caller that logs the error and retries from re-entering on
* stale MQTT_MSG_PAYLOAD2 state. */
publish->stat.read = MQTT_MSG_BEGIN; /* reset state */
#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(publish->props);
publish->props = NULL;
#endif

if (rc < 0) {
break;
}

/* Handle QoS */
if (packet_qos == MQTT_QOS_0) {
/* we are done, no QoS response */
Expand Down Expand Up @@ -1030,6 +1046,28 @@ static int MqttClient_HandlePacket(MqttClient* client,
break;
}

#ifdef WOLFMQTT_V5
/* A v5 broker rejects a QoS 2 PUBLISH at the PUBREC stage with a
* reason code >= 0x80 (e.g. not authorized, quota exceeded, topic
* name invalid, payload format invalid). Per [MQTT-4.3.3] the
* exchange is then complete and the sender MUST NOT send a PUBREL.
* Surface the rejection instead of advancing the handshake, which
* would emit an illegal PUBREL and then block waiting for a PUBCOMP
* the broker will never send. The QoS 1 PUBACK and the QoS 2
* PUBCOMP reason codes are checked by the caller after the wait.
* Note (WOLFMQTT_MULTITHREAD): when a separate thread drives reads
* and processes this PUBREC, it receives this error directly and
* the publishing thread's PUBCOMP pending response is not marked
* done, so that publish blocks until cmd_timeout_ms. This matches
* the pre-existing behavior (which left the publisher waiting on a
* PUBCOMP after an illegal PUBREL) and is not made worse here. */
if (packet_type == MQTT_PACKET_TYPE_PUBLISH_REC &&
client->protocol_level >= MQTT_CONNECT_PROTOCOL_LEVEL_5 &&
(((MqttPublishResp*)packet_obj)->reason_code & 0x80)) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PUBLISH_REJECTED);
}
#endif

/* Populate information needed for ack */
resp->packet_type = packet_type+1; /* next ack */
resp->packet_id = packet_id;
Expand Down Expand Up @@ -2021,6 +2059,15 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client,
}
} while (!msg_done);

/* No message callback registered to deliver this incoming PUBLISH. The
* payload was drained above to keep the stream in sync, but the application
* never saw it. Return a distinct error instead of success so the caller is
* notified and, for QoS 1/2, MqttClient_HandlePacket does not falsely ACK
* the message as delivered. */
if (rc == MQTT_CODE_SUCCESS && client->msg_cb == NULL) {
rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK);
}

return rc;
}

Expand Down Expand Up @@ -2324,6 +2371,26 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
/* Wait for publish response packet */
rc = MqttClient_WaitType(client, &publish->resp, resp_type,
publish->packet_id, client->cmd_timeout_ms);

#ifdef WOLFMQTT_V5
/* A v5 broker can acknowledge a QoS>0 PUBLISH at the
* protocol layer yet still reject the message via a
* PUBACK/PUBCOMP reason code >= 0x80 (e.g. not authorized,
* quota exceeded, topic name invalid, payload format
* invalid). Surface that as an error so the caller does not
* treat a rejected message as delivered. Mirrors the
* CONNECT/SUBSCRIBE/UNSUBSCRIBE rejection handling. The
* protocol_level guard avoids misreading a stale byte for
* v3.1.1 ACKs, which carry no reason code (same guard the
* PUBREC check in MqttClient_HandlePacket uses). */
if (rc == MQTT_CODE_SUCCESS &&
client->protocol_level >=
MQTT_CONNECT_PROTOCOL_LEVEL_5 &&
(publish->resp.reason_code & 0x80)) {
rc = MQTT_TRACE_ERROR(
MQTT_CODE_ERROR_PUBLISH_REJECTED);
}
#endif
}

#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
Expand Down Expand Up @@ -3202,6 +3269,8 @@ const char* MqttClient_ReturnCodeToString(int return_code)
return "Error (Broker rejected subscription)";
case MQTT_CODE_ERROR_UNSUBSCRIBE_REJECTED:
return "Error (Broker rejected unsubscribe)";
case MQTT_CODE_ERROR_PUBLISH_REJECTED:
return "Error (Broker rejected publish)";
#if defined(ENABLE_MQTT_CURL)
case MQTT_CODE_ERROR_CURL:
return "Error (libcurl)";
Expand Down
10 changes: 9 additions & 1 deletion src/mqtt_sn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ static int SN_Client_HandlePacket(MqttClient* client, SN_MsgType packet_type,
return rc;
};
}
else {
/* No callback registered to deliver this PUBLISH. Return a
* distinct error instead of reporting success: for QoS 0 this
* replaces a silent discard, and for QoS>0 it also avoids
* falsely ACKing a message the application never saw. */
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK);
}

/* Handle Qos */
if (p_pub->qos > MQTT_QOS_0) {
Expand Down Expand Up @@ -1837,7 +1844,8 @@ int SN_Client_Unsubscribe(MqttClient *client, SN_Unsubscribe *unsubscribe)
/* inform other threads of expected response */
rc = MqttClient_RespList_Add(client,
(MqttPacketType)SN_MSG_TYPE_UNSUBACK,
0, &unsubscribe->pendResp, &unsubscribe->ack);
unsubscribe->packet_id, &unsubscribe->pendResp,
&unsubscribe->ack);
wm_SemUnlock(&client->lockClient);
}
if (rc != 0) {
Expand Down
Loading
Loading