Skip to content

Commit c334e2e

Browse files
committed
#571 Sync reconnect example crashes on first reconnect
1 parent 3d9c5a3 commit c334e2e

5 files changed

Lines changed: 54 additions & 19 deletions

File tree

src/async_client.cpp

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ async_client::~async_client() { MQTTAsync_destroy(&cli_); }
9292
// 'context' should be the address of the async_client object that
9393
// registered the callback.
9494

95-
// Callback for MQTTAsync_setConnected()
95+
// Callback from the C library for when the client is initially
96+
// connected or reconnected.
97+
//
98+
// Installed via MQTTAsync_setConnected()
99+
//
96100
// This is installed with the normal callbacks and with a call to
97101
// reconnect() to indicate that it succeeded. It signals the client's
98102
// connect token then calls any registered callbacks.
@@ -120,14 +124,23 @@ void async_client::on_connected(void* context, char* cause)
120124
if (connHandler)
121125
connHandler(cause_str);
122126

123-
if (que)
124-
que->put(connected_event{cause_str});
127+
if (que) {
128+
try {
129+
que->put(connected_event{cause_str});
130+
}
131+
catch (const queue_closed&) {
132+
}
133+
}
125134
}
126135
}
127136

128-
// Callback for when the connection is lost.
137+
// Callback for when the connection is (unecpectedly) lost. This
138+
// typically happens if the network goes down or if the server drops the
139+
// connection due to a protocol violation.
140+
//
129141
// This is called from the MQTTAsync_connectionLost registered via
130142
// MQTTAsync_setCallbacks().
143+
//
131144
// It calls the registered handlers then, if there's a consumer queue, it
132145
// places a null pointer in the queue to alert the consumer to a closed
133146
// connection.
@@ -151,8 +164,13 @@ void async_client::on_connection_lost(void* context, char* cause)
151164
if (connLostHandler)
152165
connLostHandler(cause_str);
153166

154-
if (que)
155-
que->put(connection_lost_event{cause_str});
167+
if (que) {
168+
try {
169+
que->put(connection_lost_event{cause_str});
170+
}
171+
catch (const queue_closed&) {
172+
}
173+
}
156174
}
157175
}
158176

@@ -176,8 +194,13 @@ void async_client::on_disconnected(
176194
if (disconnectedHandler)
177195
disconnectedHandler(props, ReasonCode(reasonCode));
178196

179-
if (que)
180-
que->put(disconnected_event{std::move(props), ReasonCode(reasonCode)});
197+
if (que) {
198+
try {
199+
que->put(disconnected_event{std::move(props), ReasonCode(reasonCode)});
200+
}
201+
catch (const queue_closed&) {
202+
}
203+
}
181204
}
182205
}
183206

@@ -208,8 +231,13 @@ int async_client::on_message_arrived(
208231
if (cb)
209232
cb->message_arrived(m);
210233

211-
if (que)
212-
que->put(m);
234+
if (que) {
235+
try {
236+
que->put(m);
237+
}
238+
catch (const queue_closed&) {
239+
}
240+
}
213241
}
214242

215243
MQTTAsync_freeMessage(&msg);
@@ -854,6 +882,10 @@ token_ptr async_client::unsubscribe(
854882

855883
void async_client::start_consuming()
856884
{
885+
// Don't do anything if the consumer queue is already up.
886+
if (que_ && !que_->closed())
887+
return;
888+
857889
// Make sure callbacks don't happen while we update the que, etc
858890
disable_callbacks();
859891

src/client.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,12 @@ unsubscribe_response client::unsubscribe(
170170

171171
void client::disconnect()
172172
{
173-
cli_.stop_consuming();
174173
if (!cli_.disconnect()->wait_for(timeout_))
175174
throw timeout_error();
176175
}
177176

178177
void client::disconnect(int timeoutMS)
179178
{
180-
cli_.stop_consuming();
181179
if (!cli_.disconnect(timeoutMS)->wait_for(timeout_))
182180
throw timeout_error();
183181
}

test/unit/test_async_client.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,8 @@ TEST_CASE("async_client publish 5 args", "[client]")
578578

579579
const void* payload{PAYLOAD.data()};
580580
const size_t payload_size{PAYLOAD.size()};
581-
delivery_token_ptr token_pub{cli.publish(TOPIC, payload, payload_size, GOOD_QOS, RETAINED)
581+
delivery_token_ptr token_pub{
582+
cli.publish(TOPIC, payload, payload_size, GOOD_QOS, RETAINED)
582583
};
583584
REQUIRE(token_pub);
584585
token_pub->wait_for(TIMEOUT);
@@ -737,7 +738,8 @@ TEST_CASE("async_client subscribe many topics 2 args_single", "[client]")
737738
cli.connect()->wait();
738739
REQUIRE(cli.is_connected());
739740

740-
mqtt::const_string_collection_ptr TOPIC_1_COLL{mqtt::string_collection::create({"TOPIC0"})
741+
mqtt::const_string_collection_ptr TOPIC_1_COLL{
742+
mqtt::string_collection::create({"TOPIC0"})
741743
};
742744
iasync_client::qos_collection GOOD_QOS_1_COLL{0};
743745
try {

test/unit/test_async_client_v3.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ class async_client_v3_test : public CppUnit::TestFixture
534534

535535
// NOTE: Only tokens for messages with QOS=1 and QOS=2 are kept. That's
536536
// why the vector's size does not account for QOS=0 message tokens
537-
std::vector<mqtt::delivery_token_ptr> tokens_pending{cli.get_pending_delivery_tokens()
537+
std::vector<mqtt::delivery_token_ptr> tokens_pending{
538+
cli.get_pending_delivery_tokens()
538539
};
539540
CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(tokens_pending.size()));
540541

@@ -831,7 +832,8 @@ class async_client_v3_test : public CppUnit::TestFixture
831832
CPPUNIT_ASSERT(cli.is_connected());
832833

833834
mqtt::test::dummy_action_listener listener;
834-
mqtt::token_ptr token_sub{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)
835+
mqtt::token_ptr token_sub{
836+
cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)
835837
};
836838
CPPUNIT_ASSERT(token_sub);
837839
token_sub->wait_for(TIMEOUT);

test/unit/test_create_options.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ TEST_CASE("create_options_builder default ctor", "[options]")
6565

6666
TEST_CASE("create_options_builder sets", "[options]")
6767
{
68-
const auto opts =
69-
create_options_builder().send_while_disconnected().delete_oldest_messages().finalize(
70-
);
68+
const auto opts = create_options_builder()
69+
.send_while_disconnected()
70+
.delete_oldest_messages()
71+
.finalize();
7172

7273
REQUIRE(opts.get_send_while_disconnected());
7374
REQUIRE(opts.get_delete_oldest_messages());

0 commit comments

Comments
 (0)